Merge pull request #61 from djmitche/issue55

Implement the sync server
This commit is contained in:
Dustin J. Mitchell 2020-11-26 20:00:22 -05:00 committed by GitHub
commit e32b9d1bf6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 2372 additions and 229 deletions

1400
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,5 +7,7 @@
- [Replica Storage](./storage.md)
- [Task Database](./taskdb.md)
- [Tasks](./tasks.md)
- [Synchronization](./sync.md)
- [Synchronization and the Sync Server](./sync.md)
- [Synchronization Model](./sync-model.md)
* [Server-Replica Protocol](./sync-protocol.md)
- [Planned Functionality](./plans.md)

128
docs/src/sync-model.md Normal file
View file

@ -0,0 +1,128 @@
# Synchronization Model
The [task database](./taskdb.md) also implements synchronization.
Synchronization occurs between disconnected replicas, mediated by a server.
The replicas never communicate directly with one another.
The server does not have access to the task data; it sees only opaque blobs of data with a small amount of metadata.
The synchronization process is a critical part of the task database's functionality, and it cannot function efficiently without occasional synchronization operations
## Operational Transforms
Synchronization is based on [operational transformation](https://en.wikipedia.org/wiki/Operational_transformation).
This section will assume some familiarity with the concept.
## State and Operations
At a given time, the set of tasks in a replica's storage is the essential "state" of that replica.
All modifications to that state occur via operations, as defined in [Replica Storage](./storage.md).
We can draw a network, or graph, with the nodes representing states and the edges representing operations.
For example:
```text
o -- State: {abc-d123: 'get groceries', priority L}
|
| -- Operation: set abc-d123 priority to H
|
o -- State: {abc-d123: 'get groceries', priority H}
```
For those familiar with distributed version control systems, a state is analogous to a revision, while an operation is analogous to a commit.
Fundamentally, synchronization involves all replicas agreeing on a single, linear sequence of operations and the state that those operations create.
Since the replicas are not connected, each may have additional operations that have been applied locally, but which have not yet been agreed on.
The synchronization process uses operational transformation to "linearize" those operations.
This process is analogous (vaguely) to rebasing a sequence of Git commits.
### Versions
Occasionally, database states are given a name (that takes the form of a UUID).
The system as a whole (all replicas) constructs a branch-free sequence of versions and the operations that separate each version from the next.
The version with the nil UUID is implicitly the empty database.
The server stores the operations to change a state from a "parent" version to a "child" version, and provides that information as needed to replicas.
Replicas use this information to update their local task databases, and to generate new versions to send to the server.
Replicas generate a new version to transmit local changes to the server.
The changes are represented as a sequence of operations with the state resulting from the final operation corresponding to the version.
In order to keep the versions in a single sequence, the server will only accept a proposed version from a replica if its parent version matches the latest version on the server.
In the non-conflict case (such as with a single replica), then, a replica's synchronization process involves gathering up the operations it has accumulated since its last synchronization; bundling those operations into a version; and sending that version to the server.
### Replica Invariant
The replica's [storage](./storage.md) contains the current state in `tasks`, the as-yet un-synchronized operations in `operations`, and the last version at which synchronization occurred in `base_version`.
The replica's un-synchronized operations are already reflected in its local `tasks`, so the following invariant holds:
> Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical
> to `tasks`.
### Transformation
When the latest version on the server contains operations that are not present in the replica, then the states have diverged.
For example:
```text
o -- version N
w|\a
o o
x| \b
o o
y| \c
o o -- replica's local state
z|
o -- version N+1
```
(diagram notation: `o` designates a state, lower-case letters designate operations, and versions are presented as if they were numbered sequentially)
In this situation, the replica must "rebase" the local operations onto the latest version from the server and try again.
This process is performed using operational transformation (OT).
The result of this transformation is a sequence of operations based on the latest version, and a sequence of operations the replica can apply to its local task database to reach the same state
Continuing the example above, the resulting operations are shown with `'`:
```text
o -- version N
w|\a
o o
x| \b
o o
y| \c
o o -- replica's intermediate local state
z| |w'
o-N+1 o
a'\ |x'
o o
b'\ |y'
o o
c'\|z'
o -- version N+2
```
The replica applies w' through z' locally, and sends a' through c' to the server as the operations to generate version N+2.
Either path through this graph, a-b-c-w'-x'-y'-z' or a'-b'-c'-w-x-y-z, must generate *precisely* the same final state at version N+2.
Careful selection of the operations and the transformation function ensure this.
See the comments in the source code for the details of how this transformation process is implemented.
## Synchronization Process
To perform a synchronization, the replica first requests the child version of `base_version` from the server (GetChildVersion).
It applies that version to its local `tasks`, rebases its local `operations` as described above, and updates `base_version`.
The replica repeats this process until the server indicates no additional child versions exist.
If there are no un-synchronized local operations, the process is complete.
Otherwise, the replica creates a new version containing its local operations, giving its `base_version` as the parent version, and transmits that to the server (AddVersion).
In most cases, this will succeed, but if another replica has created a new version in the interim, then the new version will conflict with that other replica's new version and the server will respond with the new expected parent version.
In this case, the process repeats.
If the server indicates a conflict twice with the same expected base version, that is an indication that the replica has diverged (something serious has gone wrong).
## Servers
A replica depends on periodic synchronization for performant operation.
Without synchronization, its list of pending operations would grow indefinitely, and tasks could never be expired.
So all replicas, even "singleton" replicas which do not replicate task data with any other replica, must synchronize periodically.
TaskChampion provides a `LocalServer` for this purpose.
It implements the `get_child_version` and `add_version` operations as described, storing data on-disk locally, all within the `task` binary.

92
docs/src/sync-protocol.md Normal file
View file

@ -0,0 +1,92 @@
# Server-Replica Protocol
The server-replica protocol is defined abstractly in terms of request/response transactions from the replica to the server.
This is made concrete in an HTTP representation.
The protocol builds on the model presented in the previous chapter, and in particular on the synchronization process.
## Clients
From the server's perspective, replicas are indistinguishable, so this protocol uses the term "client" to refer generically to all replicas replicating a single task history.
## Server
For each client, the server is responsible for storing the task history, in the form of a branch-free sequence of versions.
For each client, it stores a set of versions as well as the latest version ID, defaulting to the nil UUID.
Each version has a version ID, a parent version ID, and a history segment (opaque data containing the operations for that version).
The server should maintain the following invariants:
1. Given a client c, c.latestVersion is nil or exists in the set of versions.
1. Given versions v1 and v2 for a client, with v1.versionId != v2.versionId and v1.parentVersionId != nil, v1.parentVersionId != v2.parentVersionId.
In other words, versions do not branch.
Note that versions form a linked list beginning with the version stored in he client.
This linked list need not continue back to a version with v.parentVersionId = nil.
It may end at any point when v.parentVersionId is not found in the set of Versions.
This observation allows the server to discard older versions.
## Transactions
### AddVersion
The AddVersion transaction requests that the server add a new version to the client's task history.
The request contains the following;
* parent version ID
* history segment
The server determines whether the new version is acceptable, atomically with respect to other requests for the same client.
If it has no versions for the client, it accepts the version.
If it already has one or more versions for the client, then it accepts the version only if the given parent version ID matches its stored latest parent ID.
If the version is accepted, the server generates a new version ID for it.
The version is added to the set of versions for the client, the client's latest version ID is set to the new version ID.
The new version ID is returned in the response to the client.
If the version is not accepted, the server makes no changes, but responds to the client with a conflict indication containing the latest version ID.
The client may then "rebase" its operations and try again.
Note that if a client receives two conflict responses with the same parent version ID, it is an indication that the client's version history has diverged from that on the server.
### GetChildVersion
The GetChildVersion transaction is a read-only request for a version.
The request consists of a parent version ID.
The server searches its set of versions for a version with the given parent ID.
If found, it returns the version's
* version ID,
* parent version ID (matching that in the request), and
* history segment.
If not found, the server returns a negative response.
## HTTP Representation
The transactions above are realized for an HTTP server at `<origin>` using the HTTP requests and responses described here.
The `origin` *should* be an HTTPS endpoint on general principle, but nothing in the functonality or security of the protocol depends on connection encryption.
The replica identifies itself to the server using a `clientId` in the form of a UUID.
### AddVersion
The request is a `POST` to `<origin>/client/<clientId>/add-version/<parentVersionId>`.
The request body contains the history segment, optionally encoded using any encoding supported by actix-web.
The content-type must be `application/vnd.taskchampion.history-segment`.
The success response is a 200 OK with an empty body.
The new version ID appears in the `X-Version-Id` header.
On conflict, the response is a 409 CONFLICT with an empty body.
The expected parent version ID appears in the `X-Parent-Version-Id` header.
Other error responses (4xx or 5xx) may be returned and should be treated appropriately to their meanings in the HTTP specification.
### GetChildVersion
The request is a `GET` to `<origin>/client/<clientId>/get-child-version/<parentVersionId>`.
The response is 404 NOT FOUND if no such version exists.
Otherwise, the response is a 200 OK.
The version's history segment is returned in the response body, with content-type `application/vnd.taskchampion.history-segment`.
The version ID appears in the `X-Version-Id` header.
The response body may be encoded, in accordance with any `Accept-Encoding` header in the request.

View file

@ -1,128 +1,7 @@
# Synchronization
# Synchronization and the Sync Server
The [task database](./taskdb.md) also implements synchronization.
Synchronization occurs between disconnected replicas, mediated by a server.
The replicas never communicate directly with one another.
The server does not have access to the task data; it sees only opaque blobs of data with a small amount of metadata.
This section covers *synchronization* of *replicas* containing the same set of tasks.
A replica is can perform all operations locally without connecting to a sync server, then share those operations with other replicas when it connects.
Sync is a critical feature of TaskChampion, allowing users to consult and update the same task list on multiple devices, without requiring constant connection.
The synchronization process is a critical part of the task database's functionality, and it cannot function efficiently without occasional synchronization operations
## Operational Transformations
Synchronization is based on [operational transformation](https://en.wikipedia.org/wiki/Operational_transformation).
This section will assume some familiarity with the concept.
## State and Operations
At a given time, the set of tasks in a replica's storage is the essential "state" of that replica.
All modifications to that state occur via operations, as defined in [Replica Storage](./storage.md).
We can draw a network, or graph, with the nodes representing states and the edges representing operations.
For example:
```text
o -- State: {abc-d123: 'get groceries', priority L}
|
| -- Operation: set abc-d123 priority to H
|
o -- State: {abc-d123: 'get groceries', priority H}
```
For those familiar with distributed version control systems, a state is analogous to a revision, while an operation is analogous to a commit.
Fundamentally, synchronization involves all replicas agreeing on a single, linear sequence of operations and the state that those operations create.
Since the replicas are not connected, each may have additional operations that have been applied locally, but which have not yet been agreed on.
The synchronization process uses operational transformation to "linearize" those operations.
This process is analogous (vaguely) to rebasing a sequence of Git commits.
### Versions
Occasionally, database states are given a name (that takes the form of a UUID).
The system as a whole (all replicas) constructs a branch-free sequence of versions and the operations that separate each version from the next.
The version with the nil UUID is implicitly the empty database.
The server stores the operations to change a state from a "parent" version to a "child" version, and provides that information as needed to replicas.
Replicas use this information to update their local task databases, and to generate new versions to send to the server.
Replicas generate a new version to transmit local changes to the server.
The changes are represented as a sequence of operations with the state resulting from the final operation corresponding to the version.
In order to keep the versions in a single sequence, the server will only accept a proposed version from a replica if its parent version matches the latest version on the server.
In the non-conflict case (such as with a single replica), then, a replica's synchronization process involves gathering up the operations it has accumulated since its last synchronization; bundling those operations into a version; and sending that version to the server.
### Replica Invariant
The replica's [storage](./storage.md) contains the current state in `tasks`, the as-yet un-synchronized operations in `operations`, and the last version at which synchronization occurred in `base_version`.
The replica's un-synchronized operations are already reflected in its local `tasks`, so the following invariant holds:
> Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical
> to `tasks`.
### Transformation
When the latest version on the server contains operations that are not present in the replica, then the states have diverged.
For example:
```text
o -- version N
w|\a
o o
x| \b
o o
y| \c
o o -- replica's local state
z|
o -- version N+1
```
(diagram notation: `o` designates a state, lower-case letters designate operations, and versions are presented as if they were numbered sequentially)
In this situation, the replica must "rebase" the local operations onto the latest version from the server and try again.
This process is performed using operational transformation (OT).
The result of this transformation is a sequence of operations based on the latest version, and a sequence of operations the replica can apply to its local task database to reach the same state
Continuing the example above, the resulting operations are shown with `'`:
```text
o -- version N
w|\a
o o
x| \b
o o
y| \c
o o -- replica's intermediate local state
z| |w'
o-N+1 o
a'\ |x'
o o
b'\ |y'
o o
c'\|z'
o -- version N+2
```
The replica applies w' through z' locally, and sends a' through c' to the server as the operations to generate version N+2.
Either path through this graph, a-b-c-w'-x'-y'-z' or a'-b'-c'-w-x-y-z, must generate *precisely* the same final state at version N+2.
Careful selection of the operations and the transformation function ensure this.
See the comments in the source code for the details of how this transformation process is implemented.
## Synchronization Process
To perform a synchronization, the replica first requests the child version of `base_version` from the server (`get_child_version`).
It applies that version to its local `tasks`, rebases its local `operations` as described above, and updates `base_version`.
The replica repeats this process until the server indicates no additional child versions exist.
If there are no un-synchronized local operations, the process is complete.
Otherwise, the replica creates a new version containing its local operations, giving its `base_version` as the parent version, and transmits that to the server (`add_version`).
In most cases, this will succeed, but if another replica has created a new version in the interim, then the new version will conflict with that other replica's new version and the server will respond with the new expected parent version.
In this case, the process repeats.
If the server indicates a conflict twice with the same expected base version, that is an indication that the replica has diverged (something serious has gone wrong).
## Servers
A replica depends on periodic synchronization for performant operation.
Without synchronization, its list of pending operations would grow indefinitely, and tasks could never be expired.
So all replicas, even "singleton" replicas which do not replicate task data with any other replica, must synchronize periodically.
TaskChampion provides a `LocalServer` for this purpose.
It implements the `get_child_version` and `add_version` operations as described, storing data on-disk locally, all within the `task` binary.
This is a complex topic, and the section is broken into several chapters, beginning at the lower levels of the implementation and working up.

View file

@ -7,3 +7,10 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = "3.3.0"
failure = "0.1.8"
taskchampion = { path = "../taskchampion" }
futures = "0.3.8"
[dev-dependencies]
actix-rt = "1.1.1"

View file

@ -0,0 +1,188 @@
use crate::api::{
failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER,
VERSION_ID_HEADER,
};
use crate::server::{add_version, AddVersionResult, ClientId, VersionId};
use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result};
use futures::StreamExt;
/// Max history segment size: 100MB
const MAX_SIZE: usize = 100 * 1024 * 1024;
/// Add a new version, after checking prerequisites. The history segment should be transmitted in
/// the request entity body and must have content-type
/// `application/vnd.taskchampion.history-segment`. The content can be encoded in any of the
/// formats supported by actix-web.
///
/// On success, the response is a 200 OK with the new version ID in the `X-Version-Id` header. If
/// the version cannot be added due to a conflict, the response is a 409 CONFLICT with the expected
/// parent version ID in the `X-Parent-Version-Id` header.
///
/// Returns other 4xx or 5xx responses on other errors.
#[post("/client/{client_id}/add-version/{parent_version_id}")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<ServerState>,
web::Path((client_id, parent_version_id)): web::Path<(ClientId, VersionId)>,
mut payload: web::Payload,
) -> Result<HttpResponse> {
// check content-type
if req.content_type() != HISTORY_SEGMENT_CONTENT_TYPE {
return Err(error::ErrorBadRequest("Bad content-type"));
}
// read the body in its entirety
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
return Err(error::ErrorBadRequest("overflow"));
}
body.extend_from_slice(&chunk);
}
if body.is_empty() {
return Err(error::ErrorBadRequest("Empty body"));
}
// note that we do not open the transaction until the body has been read
// completely, to avoid blocking other storage access while that data is
// in transit.
let mut txn = server_state.txn().map_err(failure_to_ise)?;
let client = txn
.get_client(client_id)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
let result = add_version(txn, client_id, client, parent_version_id, body.to_vec())
.map_err(failure_to_ise)?;
Ok(match result {
AddVersionResult::Ok(version_id) => HttpResponse::Ok()
.header(VERSION_ID_HEADER, version_id.to_string())
.body(""),
AddVersionResult::ExpectedParentVersion(parent_version_id) => HttpResponse::Conflict()
.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string())
.body(""),
})
}
#[cfg(test)]
mod test {
use crate::api::ServerState;
use crate::app_scope;
use crate::storage::{InMemoryStorage, Storage};
use actix_web::{http::StatusCode, test, App};
use taskchampion::Uuid;
#[actix_rt::test]
async fn test_success() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, Uuid::nil())
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!("/client/{}/add-version/{}", client_id, parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header(
"Content-Type",
"application/vnd.taskchampion.history-segment",
)
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
// the returned version ID is random, but let's check that it's not
// the passed parent version ID, at least
let new_version_id = resp.headers().get("X-Version-Id").unwrap();
assert!(new_version_id != &version_id.to_string());
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
#[actix_rt::test]
async fn test_conflict() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, version_id)
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!("/client/{}/add-version/{}", client_id, parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header(
"Content-Type",
"application/vnd.taskchampion.history-segment",
)
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::CONFLICT);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(
resp.headers().get("X-Parent-Version-Id").unwrap(),
&version_id.to_string()
);
}
#[actix_rt::test]
async fn test_bad_content_type() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!("/client/{}/add-version/{}", client_id, parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header("Content-Type", "not/correct")
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[actix_rt::test]
async fn test_empty_body() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!("/client/{}/add-version/{}", client_id, parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header(
"Content-Type",
"application/vnd.taskchampion.history-segment",
)
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}

View file

@ -0,0 +1,138 @@
use crate::api::{
failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER,
VERSION_ID_HEADER,
};
use crate::server::{get_child_version, ClientId, VersionId};
use actix_web::{error, get, web, HttpResponse, Result};
/// Get a child version.
///
/// On succcess, the response is the same sequence of bytes originally sent to the server,
/// with content-type `application/vnd.taskchampion.history-segment`. The `X-Version-Id` and
/// `X-Parent-Version-Id` headers contain the corresponding values.
///
/// If no such child exists, returns a 404 with no content.
/// Returns other 4xx or 5xx responses on other errors.
#[get("/client/{client_id}/get-child-version/{parent_version_id}")]
pub(crate) async fn service(
server_state: web::Data<ServerState>,
web::Path((client_id, parent_version_id)): web::Path<(ClientId, VersionId)>,
) -> Result<HttpResponse> {
let mut txn = server_state.txn().map_err(failure_to_ise)?;
txn.get_client(client_id)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
let result = get_child_version(txn, client_id, parent_version_id).map_err(failure_to_ise)?;
if let Some(result) = result {
Ok(HttpResponse::Ok()
.content_type(HISTORY_SEGMENT_CONTENT_TYPE)
.header(VERSION_ID_HEADER, result.version_id.to_string())
.header(
PARENT_VERSION_ID_HEADER,
result.parent_version_id.to_string(),
)
.body(result.history_segment))
} else {
Err(error::ErrorNotFound("no such version"))
}
}
#[cfg(test)]
mod test {
use crate::api::ServerState;
use crate::app_scope;
use crate::storage::{InMemoryStorage, Storage};
use actix_web::{http::StatusCode, test, App};
use taskchampion::Uuid;
#[actix_rt::test]
async fn test_success() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, Uuid::new_v4())
.unwrap();
txn.add_version(client_id, version_id, parent_version_id, b"abcd".to_vec())
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!(
"/client/{}/get-child-version/{}",
client_id, parent_version_id
);
let req = test::TestRequest::get().uri(&uri).to_request();
let mut resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get("X-Version-Id").unwrap(),
&version_id.to_string()
);
assert_eq!(
resp.headers().get("X-Parent-Version-Id").unwrap(),
&parent_version_id.to_string()
);
assert_eq!(
resp.headers().get("Content-Type").unwrap(),
&"application/vnd.taskchampion.history-segment".to_string()
);
use futures::StreamExt;
let (bytes, _) = resp.take_body().into_future().await;
assert_eq!(bytes.unwrap().unwrap().as_ref(), b"abcd");
}
#[actix_rt::test]
async fn test_client_not_found() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!(
"/client/{}/get-child-version/{}",
client_id, parent_version_id
);
let req = test::TestRequest::get().uri(&uri).to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
#[actix_rt::test]
async fn test_version_not_found() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// create the client, but not the version
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, Uuid::new_v4())
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!(
"/client/{}/get-child-version/{}",
client_id, parent_version_id
);
let req = test::TestRequest::get().uri(&uri).to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
}

View file

@ -0,0 +1,30 @@
use crate::storage::Storage;
use actix_web::{error, http::StatusCode, web, Scope};
use std::sync::Arc;
mod add_version;
mod get_child_version;
/// The content-type for history segments (opaque blobs of bytes)
pub(crate) const HISTORY_SEGMENT_CONTENT_TYPE: &str =
"application/vnd.taskchampion.history-segment";
/// The header names for version ID
pub(crate) const VERSION_ID_HEADER: &str = "X-Version-Id";
/// The header names for parent version ID
pub(crate) const PARENT_VERSION_ID_HEADER: &str = "X-Parent-Version-Id";
/// The type containing a reference to the Storage object in the Actix state.
pub(crate) type ServerState = Arc<Box<dyn Storage>>;
pub(crate) fn api_scope() -> Scope {
web::scope("")
.service(get_child_version::service)
.service(add_version::service)
}
/// Convert a failure::Error to an Actix ISE
fn failure_to_ise(err: failure::Error) -> impl actix_web::ResponseError {
error::InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
}

View file

@ -1,86 +0,0 @@
#![allow(clippy::new_without_default)]
use std::collections::HashMap;
type Blob = Vec<u8>;
struct User {
// versions, indexed at v-1
versions: Vec<Blob>,
snapshots: HashMap<u64, Blob>,
}
pub struct Server {
users: HashMap<String, User>,
}
pub enum VersionAdd {
// OK, version added
Ok,
// Rejected, must be based on the the given version
ExpectedVersion(u64),
}
impl User {
fn new() -> User {
User {
versions: vec![],
snapshots: HashMap::new(),
}
}
fn get_versions(&self, since_version: u64) -> Vec<Blob> {
let last_version = self.versions.len();
if last_version == since_version as usize {
return vec![];
}
self.versions[since_version as usize..last_version].to_vec()
}
fn add_version(&mut self, version: u64, blob: Blob) -> VersionAdd {
// of by one here: client wants to send version 1 first
let expected_version = self.versions.len() as u64 + 1;
if version != expected_version {
return VersionAdd::ExpectedVersion(expected_version);
}
self.versions.push(blob);
VersionAdd::Ok
}
fn add_snapshot(&mut self, version: u64, blob: Blob) {
self.snapshots.insert(version, blob);
}
}
impl Server {
pub fn new() -> Server {
Server {
users: HashMap::new(),
}
}
fn get_user_mut(&mut self, username: &str) -> &mut User {
self.users
.entry(username.to_string())
.or_insert_with(User::new)
}
/// Get a vector of all versions after `since_version`
pub fn get_versions(&self, username: &str, since_version: u64) -> Vec<Blob> {
self.users
.get(username)
.map(|user| user.get_versions(since_version))
.unwrap_or_default()
}
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
pub fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd {
self.get_user_mut(username).add_version(version, blob)
}
pub fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) {
self.get_user_mut(username).add_snapshot(version, blob);
}
}

54
sync-server/src/main.rs Normal file
View file

@ -0,0 +1,54 @@
use crate::storage::{InMemoryStorage, Storage};
use actix_web::{get, web, App, HttpServer, Responder, Scope};
use api::{api_scope, ServerState};
mod api;
mod server;
mod storage;
// TODO: use hawk to sign requests
#[get("/")]
async fn index() -> impl Responder {
// TODO: add version here
"TaskChampion sync server"
}
/// Return a scope defining the URL rules for this server, with access to
/// the given ServerState.
pub(crate) fn app_scope(server_state: ServerState) -> Scope {
web::scope("")
.data(server_state)
.service(index)
.service(api_scope())
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
HttpServer::new(move || App::new().service(app_scope(server_state.clone())))
.bind("127.0.0.1:8080")?
.run()
.await
}
#[cfg(test)]
mod test {
use super::*;
use crate::api::ServerState;
use crate::storage::{InMemoryStorage, Storage};
use actix_web::{test, App};
#[actix_rt::test]
async fn test_index_get() {
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let req = test::TestRequest::get().uri("/").to_request();
let resp = test::call_service(&mut app, req).await;
assert!(resp.status().is_success());
}
}

197
sync-server/src/server.rs Normal file
View file

@ -0,0 +1,197 @@
//! This module implements the core logic of the server: handling transactions, upholding
//! invariants, and so on.
use crate::storage::{Client, StorageTxn};
use failure::Fallible;
use taskchampion::Uuid;
/// The distinguished value for "no version"
pub const NO_VERSION_ID: VersionId = Uuid::nil();
pub(crate) type HistorySegment = Vec<u8>;
pub(crate) type ClientId = Uuid;
pub(crate) type VersionId = Uuid;
/// Response to get_child_version
#[derive(Clone, PartialEq, Debug)]
pub(crate) struct GetVersionResult {
pub(crate) version_id: Uuid,
pub(crate) parent_version_id: Uuid,
pub(crate) history_segment: HistorySegment,
}
pub(crate) fn get_child_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_id: ClientId,
parent_version_id: VersionId,
) -> Fallible<Option<GetVersionResult>> {
Ok(txn
.get_version_by_parent(client_id, parent_version_id)?
.map(|version| GetVersionResult {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
history_segment: version.history_segment,
}))
}
/// Response to add_version
#[derive(Clone, PartialEq, Debug)]
pub(crate) enum AddVersionResult {
/// OK, version added with the given ID
Ok(VersionId),
/// Rejected; expected a version with the given parent version
ExpectedParentVersion(VersionId),
}
pub(crate) fn add_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_id: ClientId,
client: Client,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> Fallible<AddVersionResult> {
// check if this version is acceptable, under the protection of the transaction
if client.latest_version_id != NO_VERSION_ID && parent_version_id != client.latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(
client.latest_version_id,
));
}
// invent a version ID
let version_id = Uuid::new_v4();
// update the DB
txn.add_version(client_id, version_id, parent_version_id, history_segment)?;
txn.set_client_latest_version_id(client_id, version_id)?;
txn.commit()?;
Ok(AddVersionResult::Ok(version_id))
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::{InMemoryStorage, Storage};
#[test]
fn gcv_not_found() -> Fallible<()> {
let storage = InMemoryStorage::new();
let txn = storage.txn()?;
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
assert_eq!(get_child_version(txn, client_id, parent_version_id)?, None);
Ok(())
}
#[test]
fn gcv_found() -> Fallible<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
txn.add_version(
client_id,
version_id,
parent_version_id,
history_segment.clone(),
)?;
assert_eq!(
get_child_version(txn, client_id, parent_version_id)?,
Some(GetVersionResult {
version_id,
parent_version_id,
history_segment,
})
);
Ok(())
}
#[test]
fn av_conflict() -> Fallible<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
let existing_parent_version_id = Uuid::new_v4();
let client = Client {
latest_version_id: existing_parent_version_id,
};
assert_eq!(
add_version(
txn,
client_id,
client,
parent_version_id,
history_segment.clone()
)?,
AddVersionResult::ExpectedParentVersion(existing_parent_version_id)
);
// verify that the storage wasn't updated
txn = storage.txn()?;
assert_eq!(txn.get_client(client_id)?, None);
assert_eq!(
txn.get_version_by_parent(client_id, parent_version_id)?,
None
);
Ok(())
}
fn test_av_success(latest_version_id_nil: bool) -> Fallible<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
let client = Client {
latest_version_id: if latest_version_id_nil {
Uuid::nil()
} else {
parent_version_id
},
};
let result = add_version(
txn,
client_id,
client,
parent_version_id,
history_segment.clone(),
)?;
if let AddVersionResult::Ok(new_version_id) = result {
// check that it invented a new version ID
assert!(new_version_id != parent_version_id);
// verify that the storage was updated
txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, new_version_id);
let version = txn
.get_version_by_parent(client_id, parent_version_id)?
.unwrap();
assert_eq!(version.version_id, new_version_id);
assert_eq!(version.parent_version_id, parent_version_id);
assert_eq!(version.history_segment, history_segment);
} else {
panic!("did not get Ok from add_version");
}
Ok(())
}
#[test]
fn av_success_with_existing_history() -> Fallible<()> {
test_av_success(true)
}
#[test]
fn av_success_nil_latest_version_id() -> Fallible<()> {
test_av_success(false)
}
}

View file

@ -0,0 +1,90 @@
use super::{Client, Storage, StorageTxn, Uuid, Version};
use failure::Fallible;
use std::collections::HashMap;
use std::sync::{Mutex, MutexGuard};
struct Inner {
/// Clients, indexed by client_id
clients: HashMap<Uuid, Client>,
/// Versions, indexed by (client_id, parent_version_id)
versions: HashMap<(Uuid, Uuid), Version>,
}
pub(crate) struct InMemoryStorage(Mutex<Inner>);
impl InMemoryStorage {
pub(crate) fn new() -> Self {
Self(Mutex::new(Inner {
clients: HashMap::new(),
versions: HashMap::new(),
}))
}
}
struct InnerTxn<'a>(MutexGuard<'a, Inner>);
/// In-memory storage for testing and experimentation.
///
/// NOTE: this does not implement transaction rollback.
impl Storage for InMemoryStorage {
fn txn<'a>(&'a self) -> Fallible<Box<dyn StorageTxn + 'a>> {
Ok(Box::new(InnerTxn(self.0.lock().expect("poisoned lock"))))
}
}
impl<'a> StorageTxn for InnerTxn<'a> {
fn get_client(&mut self, client_id: Uuid) -> Fallible<Option<Client>> {
Ok(self.0.clients.get(&client_id).cloned())
}
fn set_client_latest_version_id(
&mut self,
client_id: Uuid,
latest_version_id: Uuid,
) -> Fallible<()> {
if let Some(client) = self.0.clients.get_mut(&client_id) {
client.latest_version_id = latest_version_id;
} else {
self.0
.clients
.insert(client_id, Client { latest_version_id });
}
Ok(())
}
fn get_version_by_parent(
&mut self,
client_id: Uuid,
parent_version_id: Uuid,
) -> Fallible<Option<Version>> {
Ok(self
.0
.versions
.get(&(client_id, parent_version_id))
.cloned())
}
fn add_version(
&mut self,
client_id: Uuid,
version_id: Uuid,
parent_version_id: Uuid,
history_segment: Vec<u8>,
) -> Fallible<()> {
// TODO: verify it doesn't exist (`.entry`?)
let version = Version {
version_id,
parent_version_id,
history_segment,
};
self.0
.versions
.insert((client_id, version.parent_version_id), version);
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
Ok(())
}
}

View file

@ -0,0 +1,56 @@
use failure::Fallible;
use taskchampion::Uuid;
mod inmemory;
pub(crate) use inmemory::InMemoryStorage;
#[derive(Clone, PartialEq, Debug)]
pub(crate) struct Client {
pub(crate) latest_version_id: Uuid,
}
#[derive(Clone, PartialEq, Debug)]
pub(crate) struct Version {
pub(crate) version_id: Uuid,
pub(crate) parent_version_id: Uuid,
pub(crate) history_segment: Vec<u8>,
}
pub(crate) trait StorageTxn {
/// Get information about the given client
fn get_client(&mut self, client_id: Uuid) -> Fallible<Option<Client>>;
/// Set the client's latest_version_id (creating the client if necessary)
fn set_client_latest_version_id(
&mut self,
client_id: Uuid,
latest_version_id: Uuid,
) -> Fallible<()>;
/// Get a version, indexed by parent version id
fn get_version_by_parent(
&mut self,
client_id: Uuid,
parent_version_id: Uuid,
) -> Fallible<Option<Version>>;
/// Add a version (that must not already exist)
fn add_version(
&mut self,
client_id: Uuid,
version_id: Uuid,
parent_version_id: Uuid,
history_segment: Vec<u8>,
) -> Fallible<()>;
/// Commit any changes made in the transaction. It is an error to call this more than
/// once. It is safe to skip this call for read-only operations.
fn commit(&mut self) -> Fallible<()>;
}
/// A trait for objects able to act as storage. Most of the interesting behavior is in the
/// [`crate::storage::StorageTxn`] trait.
pub(crate) trait Storage: Send + Sync {
/// Begin a transaction
fn txn<'a>(&'a self) -> Fallible<Box<dyn StorageTxn + 'a>>;
}