From eadce9f15ad144bfecafd21ee48c7819e28881f5 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Tue, 28 Sep 2021 19:17:20 -0400 Subject: [PATCH 1/7] Add documentation for snapshots --- docs/src/SUMMARY.md | 1 + docs/src/snapshots.md | 48 ++++++++++++++++++++++ docs/src/sync-protocol.md | 85 +++++++++++++++++++++++++++++++++++---- 3 files changed, 126 insertions(+), 8 deletions(-) create mode 100644 docs/src/snapshots.md diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 568eb57cf..061dbad5b 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -19,5 +19,6 @@ * [Tasks](./tasks.md) * [Synchronization and the Sync Server](./sync.md) * [Synchronization Model](./sync-model.md) + * [Snapshots](./snapshots.md) * [Server-Replica Protocol](./sync-protocol.md) * [Planned Functionality](./plans.md) diff --git a/docs/src/snapshots.md b/docs/src/snapshots.md new file mode 100644 index 000000000..1e608dba3 --- /dev/null +++ b/docs/src/snapshots.md @@ -0,0 +1,48 @@ +# Snapshots + +The basic synchronization model described in the previous page has a few shortcomings: + * servers must store an ever-increasing quantity of versions + * a new replica must download all versions since the beginning in order to derive the current state + +Snapshots allow TaskChampion to avoid both of these issues. +A snapshot is a copy of the task database at a specific version. +It is created by a replica, encrypted, and stored on the server. +A new replica can simply download a recent snapshot and apply any additional versions synchronized since that snapshot was made. +Servers can delete and reclaim space used by older versions, as long as newer snapshots are available. + +## Snapshot Heuristics + +A server implementation must answer a few questions: + * How often should snapshots be made? + * When can versions be deleted? + * When can snapshots be deleted? + +A critical invariant is that at least one snapshot must exist for any database that does not have a child of the nil version. +This ensures that a new replica can always derive the latest state. + +Aside from that invariant, the server implementation can vary in its answers to these questions, with the following considerations: + +Snapshots should be made frequently enough that a new replica can initialize quickly. + +Existing replicas will fail to synchronize if they request a child version that has been deleted. +This failure can cause data loss if the replica had local changes. +It's conceivable that replicas may not sync for weeks or months if, for example, they are located on a home computer while the user is on holiday. + +## Requesting New Snapshots + +The server requests snapshots from replicas, indicating an urgency for the request. +Some replicas, such as those running on PCs or servers, can produce a snapshot even at low urgency. +Other replicas, in more restricted environments such as mobile devices, will only produce a snapshot at high urgency. +This saves resources in these restricted environments. + +A snapshot must be made on a replica with no unsynchronized operations. +As such, it only makes sense to request a snapshot in response to a successful AddVersion request. + +## Handling Deleted Versions + +When a replica requests a child version, the response must distinguish two cases: + + 1. No such child version exists because the replica is up-to-date. + 1. No such child version exists because it has been deleted, and the replica must re-initialize itself. + +The details of this logic are covered in the [Server-Replica Protocol](./sync-protocol.md). diff --git a/docs/src/sync-protocol.md b/docs/src/sync-protocol.md index db02d03a2..437f44b1e 100644 --- a/docs/src/sync-protocol.md +++ b/docs/src/sync-protocol.md @@ -7,26 +7,36 @@ The protocol builds on the model presented in the previous chapter, and in parti ## Clients -From the server's perspective, replicas are indistinguishable, so this protocol uses the term "client" to refer generically to all replicas replicating a single task history. +From the server's perspective, replicas accessing the same task history are indistinguishable, so this protocol uses the term "client" to refer generically to all replicas replicating a single task history. Each client is identified and authenticated with a "client key", known only to the server and to the replicas replicating the task history. ## Server For each client, the server is responsible for storing the task history, in the form of a branch-free sequence of versions. +It also stores the latest snapshot, if any exists. + + * versions: a set of {versionId: UUID, parentVersionId: UUID, historySegment: bytes} + * latestVersionId: UUID + * snapshotVersionId: UUID + * snapshot: bytes For each client, it stores a set of versions as well as the latest version ID, defaulting to the nil UUID. Each version has a version ID, a parent version ID, and a history segment (opaque data containing the operations for that version). -The server should maintain the following invariants: +The server should maintain the following invariants for each client: -1. Given a client c, c.latestVersion is nil or exists in the set of versions. -1. Given versions v1 and v2 for a client, with v1.versionId != v2.versionId and v1.parentVersionId != nil, v1.parentVersionId != v2.parentVersionId. +1. latestVersionId is nil or exists in the set of versions. +2. Given versions v1 and v2 for a client, with v1.versionId != v2.versionId and v1.parentVersionId != nil, v1.parentVersionId != v2.parentVersionId. In other words, versions do not branch. +3. If snapshotVersionId is nil, then there is a version with parentVersionId == nil. +4. If snapshotVersionId is not nil, then there is a version with parentVersionId = snapshotVersionId. -Note that versions form a linked list beginning with the version stored in he client. +Note that versions form a linked list beginning with the latestVersionId stored for the client. This linked list need not continue back to a version with v.parentVersionId = nil. It may end at any point when v.parentVersionId is not found in the set of Versions. This observation allows the server to discard older versions. +The third invariant prevents the server from discarding versions if there is no snapshot. +The fourth invariant prevents the server from discarding versions newer than the snapshot. ## Transactions @@ -45,6 +55,7 @@ If it already has one or more versions for the client, then it accepts the versi If the version is accepted, the server generates a new version ID for it. The version is added to the set of versions for the client, the client's latest version ID is set to the new version ID. The new version ID is returned in the response to the client. +The response may also include a request for a snapshot, with associated urgency. If the version is not accepted, the server makes no changes, but responds to the client with a conflict indication containing the latest version ID. The client may then "rebase" its operations and try again. @@ -61,7 +72,32 @@ If found, it returns the version's * parent version ID (matching that in the request), and * history segment. -If not found, the server returns a negative response. +The response is either a version (success, _not-found_, or _gone_, as determined by the first of the following to apply: +* If a version with parentVersionId equal to the requested parentVersionId exists, it is returned. +* If the requested parentVersionId is the nil UUID .. + * ..and snapshotVersionId is nil, the response is _not-found_ (the client has no versions). + * ..and snapshotVersionId is not nil, the response is _gone_ (the first version has been deleted). +* If a version with versionId equal to the requested parentVersionId exists, the response is _not-found_ (the client is up-to-date) +* Otherwise, the response is _gone_ (the requested version has been deleted). + +### AddSnapshot + +The AddSnapshot transaction requests that the server store a new snapshot, generated by the client. +The request contains the following: + + * version ID at which the snapshot was made + * snapshot data (opaque to the server) + +The server should validate that the snapshot is for an existing version and is newer than any existing snapshot. +It may also validate that the snapshot is for a "recent" version (e.g., one of the last 5 versions). +If a snapshot already exists for the given version, the server may keep or discard the new snapshot but should return a success indication to the client. + +The server response is empty. + +### GetSnapshot + +The GetSnapshot transaction requests that the server provide the latest snapshot. +The response contains the snapshot version ID and the snapshot data, if those exist. ## HTTP Representation @@ -79,6 +115,7 @@ The content-type must be `application/vnd.taskchampion.history-segment`. The success response is a 200 OK with an empty body. The new version ID appears in the `X-Version-Id` header. +If included, a snapshot request appears in the `X-Snapshot-Request` header with value `urgency=low` or `urgency=high`. On conflict, the response is a 409 CONFLICT with an empty body. The expected parent version ID appears in the `X-Parent-Version-Id` header. @@ -88,8 +125,40 @@ Other error responses (4xx or 5xx) may be returned and should be treated appropr ### GetChildVersion The request is a `GET` to `/v1/client/get-child-version/`. -The response is 404 NOT FOUND if no such version exists. -Otherwise, the response is a 200 OK. + +The response is determined as described above. +The _not-found_ response is 404 NOT FOUND. +The _gone_ response is 410 GONE. +Neither has a response body. + +On success, the response is a 200 OK. The version's history segment is returned in the response body, with content-type `application/vnd.taskchampion.history-segment`. The version ID appears in the `X-Version-Id` header. The response body may be encoded, in accordance with any `Accept-Encoding` header in the request. + +On failure, a client should treat a 404 NOT FOUND as indicating that it is up-to-date. +Clients should treat a 410 GONE as a synchronization error. +If the client has pending changes to send to the server, based on a now-removed version, then those changes cannot be reconciled and will be lost. +The client should, optionally after consulting the user, download and apply the latest snapshot. + +### AddSnapshot + +The request is a `POST` to `/v1/client/add-snapshot/`. +The request body contains the snapshot data, optionally encoded using any encoding supported by actix-web. +The content-type must be `application/vnd.taskchampion.snapshot`. + +If the version is invalid, as described above, the response should be 400 BAD REQUEST. +The server response should be 200 OK on success. + +### GetSnapshot + +The request is a `GET` to `/v1/client/snapshot`. + +The response is a 200 OK. +The snapshot is returned in the response body, with content-type `application/vnd.taskchampion.snapshot`. +The version ID appears in the `X-Version-Id` header. +The response body may be encoded, in accordance with any `Accept-Encoding` header in the request. + +After downloading and decrypting a snapshot, a client must replace its entire local task database with the content of the snapshot. +Any local operations that had not yet been synchronized must be discarded. +After the snapshot is applied, the client should begin the synchronization process again, starting from the snapshot version. From 8d2be3b495b615788d09b28380ca3f7933e054ed Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 29 Sep 2021 02:19:57 +0000 Subject: [PATCH 2/7] add get_version to server storage api --- sync-server/src/storage/inmemory.rs | 119 ++++++++++++++++++++++++++-- sync-server/src/storage/mod.rs | 7 ++ sync-server/src/storage/sqlite.rs | 80 +++++++++++++------ 3 files changed, 173 insertions(+), 33 deletions(-) diff --git a/sync-server/src/storage/inmemory.rs b/sync-server/src/storage/inmemory.rs index 143b2e760..7aef5f98c 100644 --- a/sync-server/src/storage/inmemory.rs +++ b/sync-server/src/storage/inmemory.rs @@ -6,8 +6,11 @@ struct Inner { /// Clients, indexed by client_key clients: HashMap, - /// Versions, indexed by (client_key, parent_version_id) + /// Versions, indexed by (client_key, version_id) versions: HashMap<(Uuid, Uuid), Version>, + + /// Child versions, indexed by (client_key, parent_version_id) + children: HashMap<(Uuid, Uuid), Uuid>, } pub struct InMemoryStorage(Mutex); @@ -18,6 +21,7 @@ impl InMemoryStorage { Self(Mutex::new(Inner { clients: HashMap::new(), versions: HashMap::new(), + children: HashMap::new(), })) } } @@ -66,11 +70,23 @@ impl<'a> StorageTxn for InnerTxn<'a> { client_key: Uuid, parent_version_id: Uuid, ) -> anyhow::Result> { - Ok(self - .0 - .versions - .get(&(client_key, parent_version_id)) - .cloned()) + if let Some(parent_version_id) = self.0.children.get(&(client_key, parent_version_id)) { + Ok(self + .0 + .versions + .get(&(client_key, *parent_version_id)) + .cloned()) + } else { + Ok(None) + } + } + + fn get_version( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result> { + Ok(self.0.versions.get(&(client_key, version_id)).cloned()) } fn add_version( @@ -86,9 +102,12 @@ impl<'a> StorageTxn for InnerTxn<'a> { parent_version_id, history_segment, }; + self.0 + .children + .insert((client_key, version.parent_version_id), version.version_id); self.0 .versions - .insert((client_key, version.parent_version_id), version); + .insert((client_key, version.version_id), version); Ok(()) } @@ -96,3 +115,89 @@ impl<'a> StorageTxn for InnerTxn<'a> { Ok(()) } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_emtpy_dir() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let maybe_client = txn.get_client(Uuid::new_v4())?; + assert!(maybe_client.is_none()); + Ok(()) + } + + #[test] + fn test_get_client_empty() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let maybe_client = txn.get_client(Uuid::new_v4())?; + assert!(maybe_client.is_none()); + Ok(()) + } + + #[test] + fn test_client_storage() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + let latest_version_id = Uuid::new_v4(); + txn.new_client(client_key, latest_version_id)?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + + let latest_version_id = Uuid::new_v4(); + txn.set_client_latest_version_id(client_key, latest_version_id)?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + + Ok(()) + } + + #[test] + fn test_gvbp_empty() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?; + assert!(maybe_version.is_none()); + Ok(()) + } + + #[test] + fn test_add_version_and_get_version() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + let history_segment = b"abc".to_vec(); + txn.add_version( + client_key, + version_id, + parent_version_id, + history_segment.clone(), + )?; + + let expected = Version { + version_id, + parent_version_id, + history_segment, + }; + + let version = txn + .get_version_by_parent(client_key, parent_version_id)? + .unwrap(); + assert_eq!(version, expected); + + let version = txn.get_version(client_key, version_id)?.unwrap(); + assert_eq!(version, expected); + + Ok(()) + } +} diff --git a/sync-server/src/storage/mod.rs b/sync-server/src/storage/mod.rs index 30b961fa1..121999892 100644 --- a/sync-server/src/storage/mod.rs +++ b/sync-server/src/storage/mod.rs @@ -43,6 +43,13 @@ pub trait StorageTxn { parent_version_id: Uuid, ) -> anyhow::Result>; + /// Get a version, indexed by its own version id + fn get_version( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result>; + /// Add a version (that must not already exist) fn add_version( &mut self, diff --git a/sync-server/src/storage/sqlite.rs b/sync-server/src/storage/sqlite.rs index 0d4f694b1..e1310a55c 100644 --- a/sync-server/src/storage/sqlite.rs +++ b/sync-server/src/storage/sqlite.rs @@ -71,6 +71,7 @@ impl SqliteStorage { let queries = vec![ "CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);", "CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_key STRING, parent_version_id STRING, history_segment BLOB);", + "CREATE INDEX IF NOT EXISTS versions_by_parent ON versions (parent_version_id);", ]; for q in queries { txn.execute(q, []).context("Creating table")?; @@ -100,6 +101,34 @@ impl Txn { .transaction() .map_err(|_e| SqliteError::CreateTransactionFailed) } + + /// Implementation for queries from the versions table + fn get_version_impl( + &mut self, + query: &'static str, + client_key: Uuid, + version_id_arg: Uuid, + ) -> anyhow::Result> { + let t = self.get_txn()?; + let r = t + .query_row( + query, + params![&StoredUuid(version_id_arg), &StoredUuid(client_key)], + |r| { + let version_id: StoredUuid = r.get("version_id")?; + let parent_version_id: StoredUuid = r.get("parent_version_id")?; + + Ok(Version { + version_id: version_id.0, + parent_version_id: parent_version_id.0, + history_segment: r.get("history_segment")?, + }) + }, + ) + .optional() + .context("Get version query")?; + Ok(r) + } } impl StorageTxn for Txn { @@ -148,24 +177,20 @@ impl StorageTxn for Txn { client_key: Uuid, parent_version_id: Uuid, ) -> anyhow::Result> { - let t = self.get_txn()?; - let r = t.query_row( + self.get_version_impl( "SELECT version_id, parent_version_id, history_segment FROM versions WHERE parent_version_id = ? AND client_key = ?", - params![&StoredUuid(parent_version_id), &StoredUuid(client_key)], - |r| { - let version_id: StoredUuid = r.get("version_id")?; - let parent_version_id: StoredUuid = r.get("parent_version_id")?; - - Ok(Version{ - version_id: version_id.0, - parent_version_id: parent_version_id.0, - history_segment: r.get("history_segment")?, - })} - ) - .optional() - .context("Get version query") - ?; - Ok(r) + client_key, + parent_version_id) + } + fn get_version( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result> { + self.get_version_impl( + "SELECT version_id, parent_version_id, history_segment FROM versions WHERE version_id = ? AND client_key = ?", + client_key, + version_id) } fn add_version( @@ -260,7 +285,7 @@ mod test { } #[test] - fn test_add_version_and_gvbp() -> anyhow::Result<()> { + fn test_add_version_and_get_version() -> anyhow::Result<()> { let tmp_dir = TempDir::new()?; let storage = SqliteStorage::new(&tmp_dir.path())?; let mut txn = storage.txn()?; @@ -275,18 +300,21 @@ mod test { parent_version_id, history_segment.clone(), )?; + + let expected = Version { + version_id, + parent_version_id, + history_segment, + }; + let version = txn .get_version_by_parent(client_key, parent_version_id)? .unwrap(); + assert_eq!(version, expected); + + let version = txn.get_version(client_key, version_id)?.unwrap(); + assert_eq!(version, expected); - assert_eq!( - version, - Version { - version_id, - parent_version_id, - history_segment, - } - ); Ok(()) } } From 2570956710f943a0994c5e08388a469c2fc454bb Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 29 Sep 2021 02:48:39 +0000 Subject: [PATCH 3/7] [breaking] Add snapshot support to server storage This refactors the storage API pretty substantially, and represents a breaking change to the schema used by the sqlite storage --- .changelogs/2021-10-03-server-storage.md | 2 + Cargo.lock | 1 + sync-server/Cargo.toml | 1 + sync-server/src/server.rs | 3 +- sync-server/src/storage/inmemory.rs | 134 +++++++++++++--- sync-server/src/storage/mod.rs | 40 ++++- sync-server/src/storage/sqlite.rs | 196 +++++++++++++++++++---- 7 files changed, 312 insertions(+), 65 deletions(-) create mode 100644 .changelogs/2021-10-03-server-storage.md diff --git a/.changelogs/2021-10-03-server-storage.md b/.changelogs/2021-10-03-server-storage.md new file mode 100644 index 000000000..7834601d5 --- /dev/null +++ b/.changelogs/2021-10-03-server-storage.md @@ -0,0 +1,2 @@ +- The SQLite server storage schema has changed incompatibly, in order to add support for snapshots. + As this is not currently ready for production usage, no migration path is provided except deleting the existing database. diff --git a/Cargo.lock b/Cargo.lock index 4c0d541ac..bb79505da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3016,6 +3016,7 @@ dependencies = [ "actix-rt", "actix-web", "anyhow", + "chrono", "clap", "env_logger 0.8.4", "futures", diff --git a/sync-server/Cargo.toml b/sync-server/Cargo.toml index c0d26656e..25ac111bf 100644 --- a/sync-server/Cargo.toml +++ b/sync-server/Cargo.toml @@ -19,6 +19,7 @@ clap = "^2.33.0" log = "^0.4.14" env_logger = "^0.8.3" rusqlite = { version = "0.25", features = ["bundled"] } +chrono = { version = "^0.4.10", features = ["serde"] } [dev-dependencies] actix-rt = "^1.1.1" diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 9bc007cc4..c20d0b955 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -71,7 +71,6 @@ pub(crate) fn add_version<'a>( // update the DB txn.add_version(client_key, version_id, parent_version_id, history_segment)?; - txn.set_client_latest_version_id(client_key, version_id)?; txn.commit()?; Ok(AddVersionResult::Ok(version_id)) @@ -102,6 +101,7 @@ mod test { let parent_version_id = Uuid::new_v4(); let history_segment = b"abcd".to_vec(); + txn.new_client(client_key, version_id)?; txn.add_version( client_key, version_id, @@ -130,6 +130,7 @@ mod test { let existing_parent_version_id = Uuid::new_v4(); let client = Client { latest_version_id: existing_parent_version_id, + snapshot: None, }; assert_eq!( diff --git a/sync-server/src/storage/inmemory.rs b/sync-server/src/storage/inmemory.rs index 7aef5f98c..03b36c47a 100644 --- a/sync-server/src/storage/inmemory.rs +++ b/sync-server/src/storage/inmemory.rs @@ -1,4 +1,4 @@ -use super::{Client, Storage, StorageTxn, Uuid, Version}; +use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version}; use std::collections::HashMap; use std::sync::{Mutex, MutexGuard}; @@ -6,6 +6,9 @@ struct Inner { /// Clients, indexed by client_key clients: HashMap, + /// Snapshot data, indexed by client key + snapshots: HashMap>, + /// Versions, indexed by (client_key, version_id) versions: HashMap<(Uuid, Uuid), Version>, @@ -20,6 +23,7 @@ impl InMemoryStorage { pub fn new() -> Self { Self(Mutex::new(Inner { clients: HashMap::new(), + snapshots: HashMap::new(), versions: HashMap::new(), children: HashMap::new(), })) @@ -46,23 +50,44 @@ impl<'a> StorageTxn for InnerTxn<'a> { if self.0.clients.get(&client_key).is_some() { return Err(anyhow::anyhow!("Client {} already exists", client_key)); } - self.0 - .clients - .insert(client_key, Client { latest_version_id }); + self.0.clients.insert( + client_key, + Client { + latest_version_id, + snapshot: None, + }, + ); Ok(()) } - fn set_client_latest_version_id( + fn set_snapshot( &mut self, client_key: Uuid, - latest_version_id: Uuid, + snapshot: Snapshot, + data: Vec, ) -> anyhow::Result<()> { - if let Some(client) = self.0.clients.get_mut(&client_key) { - client.latest_version_id = latest_version_id; - Ok(()) - } else { - Err(anyhow::anyhow!("Client {} does not exist", client_key)) + let mut client = self + .0 + .clients + .get_mut(&client_key) + .ok_or_else(|| anyhow::anyhow!("no such client"))?; + client.snapshot = Some(snapshot); + self.0.snapshots.insert(client_key, data); + Ok(()) + } + + fn get_snapshot_data( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result>> { + // sanity check + let client = self.0.clients.get(&client_key); + let client = client.ok_or_else(|| anyhow::anyhow!("no such client"))?; + if Some(&version_id) != client.snapshot.as_ref().map(|snap| &snap.version_id) { + return Err(anyhow::anyhow!("unexpected snapshot_version_id")); } + Ok(self.0.snapshots.get(&client_key).cloned()) } fn get_version_by_parent( @@ -102,12 +127,21 @@ impl<'a> StorageTxn for InnerTxn<'a> { parent_version_id, history_segment, }; + + if let Some(client) = self.0.clients.get_mut(&client_key) { + client.latest_version_id = version_id; + if let Some(ref mut snap) = client.snapshot { + snap.versions_since += 1; + } + } else { + return Err(anyhow::anyhow!("Client {} does not exist", client_key)); + } + self.0 .children - .insert((client_key, version.parent_version_id), version.version_id); - self.0 - .versions - .insert((client_key, version.version_id), version); + .insert((client_key, parent_version_id), version_id); + self.0.versions.insert((client_key, version_id), version); + Ok(()) } @@ -119,15 +153,7 @@ impl<'a> StorageTxn for InnerTxn<'a> { #[cfg(test)] mod test { use super::*; - - #[test] - fn test_emtpy_dir() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let maybe_client = txn.get_client(Uuid::new_v4())?; - assert!(maybe_client.is_none()); - Ok(()) - } + use chrono::Utc; #[test] fn test_get_client_empty() -> anyhow::Result<()> { @@ -149,12 +175,25 @@ mod test { let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); let latest_version_id = Uuid::new_v4(); - txn.set_client_latest_version_id(client_key, latest_version_id)?; + txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?; let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: Utc::now(), + versions_since: 4, + }; + txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + assert_eq!(client.snapshot.unwrap(), snap); Ok(()) } @@ -177,6 +216,8 @@ mod test { let version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); let history_segment = b"abc".to_vec(); + + txn.new_client(client_key, parent_version_id)?; txn.add_version( client_key, version_id, @@ -200,4 +241,47 @@ mod test { Ok(()) } + + #[test] + fn test_snapshots() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + + txn.new_client(client_key, Uuid::new_v4())?; + assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: Utc::now(), + versions_since: 3, + }; + txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(), + vec![9, 8, 9] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap)); + + let snap2 = Snapshot { + version_id: Uuid::new_v4(), + timestamp: Utc::now(), + versions_since: 10, + }; + txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap2.version_id)? + .unwrap(), + vec![0, 2, 4, 6] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2)); + + // check that mismatched version is detected + assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err()); + + Ok(()) + } } diff --git a/sync-server/src/storage/mod.rs b/sync-server/src/storage/mod.rs index 121999892..c52624898 100644 --- a/sync-server/src/storage/mod.rs +++ b/sync-server/src/storage/mod.rs @@ -1,4 +1,4 @@ -use serde::{Deserialize, Serialize}; +use chrono::{DateTime, Utc}; use uuid::Uuid; #[cfg(debug_assertions)] @@ -10,12 +10,27 @@ pub use inmemory::InMemoryStorage; mod sqlite; pub use self::sqlite::SqliteStorage; -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug)] pub struct Client { + /// The latest version for this client (may be the nil version) pub latest_version_id: Uuid, + /// Data about the latest snapshot for this client + pub snapshot: Option, } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug)] +pub struct Snapshot { + /// ID of the version at which this snapshot was made + pub version_id: Uuid, + + /// Timestamp at which this snapshot was set + pub timestamp: DateTime, + + /// Number of versions since this snapshot was made + pub versions_since: u32, +} + +#[derive(Clone, PartialEq, Debug)] pub struct Version { pub version_id: Uuid, pub parent_version_id: Uuid, @@ -29,13 +44,22 @@ pub trait StorageTxn { /// Create a new client with the given latest_version_id fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()>; - /// Set the client's latest_version_id - fn set_client_latest_version_id( + /// Set the client's most recent snapshot. + fn set_snapshot( &mut self, client_key: Uuid, - latest_version_id: Uuid, + snapshot: Snapshot, + data: Vec, ) -> anyhow::Result<()>; + /// Get the data for the most recent snapshot. The version_id + /// is used to verify that the snapshot is for the correct version. + fn get_snapshot_data( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result>>; + /// Get a version, indexed by parent version id fn get_version_by_parent( &mut self, @@ -50,7 +74,9 @@ pub trait StorageTxn { version_id: Uuid, ) -> anyhow::Result>; - /// Add a version (that must not already exist) + /// Add a version (that must not already exist), and + /// - update latest_version_id + /// - increment snapshot.versions_since fn add_version( &mut self, client_key: Uuid, diff --git a/sync-server/src/storage/sqlite.rs b/sync-server/src/storage/sqlite.rs index e1310a55c..47a07014a 100644 --- a/sync-server/src/storage/sqlite.rs +++ b/sync-server/src/storage/sqlite.rs @@ -1,5 +1,6 @@ -use super::{Client, Storage, StorageTxn, Uuid, Version}; +use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version}; use anyhow::Context; +use chrono::{TimeZone, Utc}; use rusqlite::types::{FromSql, ToSql}; use rusqlite::{params, Connection, OptionalExtension}; use std::path::Path; @@ -30,24 +31,6 @@ impl ToSql for StoredUuid { } } -/// Stores [`Client`] in SQLite -impl FromSql for Client { - fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { - let o: Client = serde_json::from_str(value.as_str()?) - .map_err(|_| rusqlite::types::FromSqlError::InvalidType)?; - Ok(o) - } -} - -/// Parses Operation stored as JSON in string column -impl ToSql for Client { - fn to_sql(&self) -> rusqlite::Result> { - let s = serde_json::to_string(&self) - .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?; - Ok(s.into()) - } -} - /// An on-disk storage backend which uses SQLite pub struct SqliteStorage { db_file: std::path::PathBuf, @@ -69,12 +52,19 @@ impl SqliteStorage { let txn = con.transaction()?; let queries = vec![ - "CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);", + "CREATE TABLE IF NOT EXISTS clients ( + client_key STRING PRIMARY KEY, + latest_version_id STRING, + snapshot_version_id STRING, + versions_since_snapshot INTEGER, + snapshot_timestamp INTEGER, + snapshot BLOB);", "CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_key STRING, parent_version_id STRING, history_segment BLOB);", "CREATE INDEX IF NOT EXISTS versions_by_parent ON versions (parent_version_id);", ]; for q in queries { - txn.execute(q, []).context("Creating table")?; + txn.execute(q, []) + .context("Error while creating SQLite tables")?; } txn.commit()?; } @@ -126,7 +116,7 @@ impl Txn { }, ) .optional() - .context("Get version query")?; + .context("Error getting version")?; Ok(r) } } @@ -136,17 +126,42 @@ impl StorageTxn for Txn { let t = self.get_txn()?; let result: Option = t .query_row( - "SELECT latest_version_id FROM clients WHERE client_key = ? LIMIT 1", + "SELECT + latest_version_id, + snapshot_timestamp, + versions_since_snapshot, + snapshot_version_id + FROM clients + WHERE client_key = ? + LIMIT 1", [&StoredUuid(client_key)], |r| { let latest_version_id: StoredUuid = r.get(0)?; + let snapshot_timestamp: Option = r.get(1)?; + let versions_since_snapshot: Option = r.get(2)?; + let snapshot_version_id: Option = r.get(3)?; + + // if all of the relevant fields are non-NULL, return a snapshot + let snapshot = match ( + snapshot_timestamp, + versions_since_snapshot, + snapshot_version_id, + ) { + (Some(ts), Some(vs), Some(v)) => Some(Snapshot { + version_id: v.0, + timestamp: Utc.timestamp(ts, 0), + versions_since: vs, + }), + _ => None, + }; Ok(Client { latest_version_id: latest_version_id.0, + snapshot, }) }, ) .optional() - .context("Get client query")?; + .context("Error getting client")?; Ok(result) } @@ -158,18 +173,66 @@ impl StorageTxn for Txn { "INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)", params![&StoredUuid(client_key), &StoredUuid(latest_version_id)], ) - .context("Create client query")?; + .context("Error creating/updating client")?; t.commit()?; Ok(()) } - fn set_client_latest_version_id( + fn set_snapshot( &mut self, client_key: Uuid, - latest_version_id: Uuid, + snapshot: Snapshot, + data: Vec, ) -> anyhow::Result<()> { - // Implementation is same as new_client - self.new_client(client_key, latest_version_id) + let t = self.get_txn()?; + + t.execute( + "UPDATE clients + SET + snapshot_version_id = ?, + snapshot_timestamp = ?, + versions_since_snapshot = ?, + snapshot = ? + WHERE client_key = ?", + params![ + &StoredUuid(snapshot.version_id), + snapshot.timestamp.timestamp(), + snapshot.versions_since, + data, + &StoredUuid(client_key), + ], + ) + .context("Error creating/updating snapshot")?; + t.commit()?; + Ok(()) + } + + fn get_snapshot_data( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result>> { + let t = self.get_txn()?; + let r = t + .query_row( + "SELECT snapshot, snapshot_version_id FROM clients WHERE client_key = ?", + params![&StoredUuid(client_key)], + |r| { + let v: StoredUuid = r.get("snapshot_version_id")?; + let d: Vec = r.get("snapshot")?; + Ok((v.0, d)) + }, + ) + .optional() + .context("Error getting snapshot")?; + r.map(|(v, d)| { + if v != version_id { + return Err(anyhow::anyhow!("unexpected snapshot_version_id")); + } + + Ok(d) + }) + .transpose() } fn get_version_by_parent( @@ -182,6 +245,7 @@ impl StorageTxn for Txn { client_key, parent_version_id) } + fn get_version( &mut self, client_key: Uuid, @@ -209,9 +273,19 @@ impl StorageTxn for Txn { StoredUuid(client_key), StoredUuid(parent_version_id), history_segment - ], + ] ) - .context("Add version query")?; + .context("Error adding version")?; + t.execute( + "UPDATE clients + SET + latest_version_id = ?, + versions_since_snapshot = versions_since_snapshot + 1 + WHERE client_key = ?", + params![StoredUuid(version_id), StoredUuid(client_key),], + ) + .context("Error updating client for new version")?; + t.commit()?; Ok(()) } @@ -228,6 +302,7 @@ impl StorageTxn for Txn { #[cfg(test)] mod test { use super::*; + use chrono::DateTime; use pretty_assertions::assert_eq; use tempfile::TempDir; @@ -264,12 +339,25 @@ mod test { let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); let latest_version_id = Uuid::new_v4(); - txn.set_client_latest_version_id(client_key, latest_version_id)?; + txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?; let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: "2014-11-28T12:00:09Z".parse::>().unwrap(), + versions_since: 4, + }; + txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + assert_eq!(client.snapshot.unwrap(), snap); Ok(()) } @@ -317,4 +405,48 @@ mod test { Ok(()) } + + #[test] + fn test_snapshots() -> anyhow::Result<()> { + let tmp_dir = TempDir::new()?; + let storage = SqliteStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + + txn.new_client(client_key, Uuid::new_v4())?; + assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: "2013-10-08T12:00:09Z".parse::>().unwrap(), + versions_since: 3, + }; + txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(), + vec![9, 8, 9] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap)); + + let snap2 = Snapshot { + version_id: Uuid::new_v4(), + timestamp: "2014-11-28T12:00:09Z".parse::>().unwrap(), + versions_since: 10, + }; + txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap2.version_id)? + .unwrap(), + vec![0, 2, 4, 6] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2)); + + // check that mismatched version is detected + assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err()); + + Ok(()) + } } From 53d1f8dbc2df92761b47b7dd221b91577962ff41 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 29 Sep 2021 22:41:40 +0000 Subject: [PATCH 4/7] update get_child_version to distinguish gone and not-found --- sync-server/src/api/get_child_version.rs | 48 +++++--- sync-server/src/server.rs | 137 ++++++++++++++++++++--- 2 files changed, 152 insertions(+), 33 deletions(-) diff --git a/sync-server/src/api/get_child_version.rs b/sync-server/src/api/get_child_version.rs index 917c65126..fd12e1243 100644 --- a/sync-server/src/api/get_child_version.rs +++ b/sync-server/src/api/get_child_version.rs @@ -2,7 +2,7 @@ use crate::api::{ client_key_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER, }; -use crate::server::{get_child_version, VersionId}; +use crate::server::{get_child_version, GetVersionResult, VersionId}; use actix_web::{error, get, web, HttpRequest, HttpResponse, Result}; /// Get a child version. @@ -23,27 +23,31 @@ pub(crate) async fn service( let client_key = client_key_header(&req)?; - txn.get_client(client_key) + let client = txn + .get_client(client_key) .map_err(failure_to_ise)? .ok_or_else(|| error::ErrorNotFound("no such client"))?; - let result = get_child_version(txn, client_key, parent_version_id).map_err(failure_to_ise)?; - if let Some(result) = result { - Ok(HttpResponse::Ok() + return match get_child_version(txn, client_key, client, parent_version_id) + .map_err(failure_to_ise)? + { + GetVersionResult::Success { + version_id, + parent_version_id, + history_segment, + } => Ok(HttpResponse::Ok() .content_type(HISTORY_SEGMENT_CONTENT_TYPE) - .header(VERSION_ID_HEADER, result.version_id.to_string()) - .header( - PARENT_VERSION_ID_HEADER, - result.parent_version_id.to_string(), - ) - .body(result.history_segment)) - } else { - Err(error::ErrorNotFound("no such version")) - } + .header(VERSION_ID_HEADER, version_id.to_string()) + .header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string()) + .body(history_segment)), + GetVersionResult::NotFound => Err(error::ErrorNotFound("no such version")), + GetVersionResult::Gone => Err(error::ErrorGone("version has been deleted")), + }; } #[cfg(test)] mod test { + use crate::server::NO_VERSION_ID; use crate::storage::{InMemoryStorage, Storage}; use crate::Server; use actix_web::{http::StatusCode, test, App}; @@ -113,7 +117,7 @@ mod test { } #[actix_rt::test] - async fn test_version_not_found() { + async fn test_version_not_found_and_gone() { let client_key = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); let storage: Box = Box::new(InMemoryStorage::new()); @@ -126,12 +130,26 @@ mod test { let server = Server::new(storage); let mut app = test::init_service(App::new().service(server.service())).await; + // the child of an unknown parent_version_id is GONE let uri = format!("/v1/client/get-child-version/{}", parent_version_id); let req = test::TestRequest::get() .uri(&uri) .header("X-Client-Key", client_key.to_string()) .to_request(); let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::GONE); + assert_eq!(resp.headers().get("X-Version-Id"), None); + assert_eq!(resp.headers().get("X-Parent-Version-Id"), None); + + // but the child of the nil parent_version_id is NOT FOUND, since + // there is no snapshot. The tests in crate::server test more + // corner cases. + let uri = format!("/v1/client/get-child-version/{}", NO_VERSION_ID); + let req = test::TestRequest::get() + .uri(&uri) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let resp = test::call_service(&mut app, req).await; assert_eq!(resp.status(), StatusCode::NOT_FOUND); assert_eq!(resp.headers().get("X-Version-Id"), None); assert_eq!(resp.headers().get("X-Parent-Version-Id"), None); diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index c20d0b955..2b8f92445 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -1,5 +1,6 @@ //! This module implements the core logic of the server: handling transactions, upholding -//! invariants, and so on. +//! invariants, and so on. This does not implement the HTTP-specific portions; those +//! are in [`crate::api`]. See the protocol documentation for details. use crate::storage::{Client, StorageTxn}; use uuid::Uuid; @@ -10,26 +11,53 @@ pub(crate) type HistorySegment = Vec; pub(crate) type ClientKey = Uuid; pub(crate) type VersionId = Uuid; -/// Response to get_child_version +/// Response to get_child_version. See the protocol documentation. #[derive(Clone, PartialEq, Debug)] -pub(crate) struct GetVersionResult { - pub(crate) version_id: Uuid, - pub(crate) parent_version_id: Uuid, - pub(crate) history_segment: HistorySegment, +pub(crate) enum GetVersionResult { + NotFound, + Gone, + Success { + version_id: Uuid, + parent_version_id: Uuid, + history_segment: HistorySegment, + }, } +/// Implementation of the GetChildVersion protocol transaction pub(crate) fn get_child_version<'a>( mut txn: Box, client_key: ClientKey, + client: Client, parent_version_id: VersionId, -) -> anyhow::Result> { - Ok(txn - .get_version_by_parent(client_key, parent_version_id)? - .map(|version| GetVersionResult { +) -> anyhow::Result { + // If a version with parentVersionId equal to the requested parentVersionId exists, it is returned. + if let Some(version) = txn.get_version_by_parent(client_key, parent_version_id)? { + return Ok(GetVersionResult::Success { version_id: version.version_id, parent_version_id: version.parent_version_id, history_segment: version.history_segment, - })) + }); + } + + // If the requested parentVersionId is the nil UUID .. + if parent_version_id == NO_VERSION_ID { + return Ok(match client.snapshot { + // ..and snapshotVersionId is nil, the response is _not-found_ (the client has no + // versions). + None => GetVersionResult::NotFound, + // ..and snapshotVersionId is not nil, the response is _gone_ (the first version has + // been deleted). + Some(_) => GetVersionResult::Gone, + }); + } + + // If a version with versionId equal to the requested parentVersionId exists, the response is _not-found_ (the client is up-to-date) + if txn.get_version(client_key, parent_version_id)?.is_some() { + return Ok(GetVersionResult::NotFound); + } + + // Otherwise, the response is _gone_ (the requested version has been deleted). + Ok(GetVersionResult::Gone) } /// Response to add_version @@ -41,6 +69,7 @@ pub(crate) enum AddVersionResult { ExpectedParentVersion(VersionId), } +/// Implementation of the AddVersion protocol transaction pub(crate) fn add_version<'a>( mut txn: Box, client_key: ClientKey, @@ -79,16 +108,87 @@ pub(crate) fn add_version<'a>( #[cfg(test)] mod test { use super::*; - use crate::storage::{InMemoryStorage, Storage}; + use crate::storage::{InMemoryStorage, Snapshot, Storage}; + use chrono::{TimeZone, Utc}; use pretty_assertions::assert_eq; #[test] - fn gcv_not_found() -> anyhow::Result<()> { + fn gcv_not_found_initial() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let txn = storage.txn()?; + let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); + txn.new_client(client_key, NO_VERSION_ID)?; + + // when no snapshot exists, the first version is NotFound + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!( + get_child_version(txn, client_key, client, NO_VERSION_ID)?, + GetVersionResult::NotFound + ); + Ok(()) + } + + #[test] + fn gcv_gone_initial() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + + txn.new_client(client_key, Uuid::new_v4())?; + txn.set_snapshot( + client_key, + Snapshot { + version_id: Uuid::new_v4(), + versions_since: 0, + timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40), + }, + vec![1, 2, 3], + )?; + + // when a snapshot exists, the first version is GONE + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!( + get_child_version(txn, client_key, client, NO_VERSION_ID)?, + GetVersionResult::Gone + ); + Ok(()) + } + + #[test] + fn gcv_not_found_up_to_date() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + + // add a parent version, but not the requested child version let parent_version_id = Uuid::new_v4(); - assert_eq!(get_child_version(txn, client_key, parent_version_id)?, None); + txn.new_client(client_key, parent_version_id)?; + txn.add_version(client_key, parent_version_id, NO_VERSION_ID, vec![])?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!( + get_child_version(txn, client_key, client, parent_version_id)?, + GetVersionResult::NotFound + ); + Ok(()) + } + + #[test] + fn gcv_gone() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + + // make up a parent version id, but neither that version + // nor its child exists (presumed to have been deleted) + let parent_version_id = Uuid::new_v4(); + txn.new_client(client_key, Uuid::new_v4())?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!( + get_child_version(txn, client_key, client, parent_version_id)?, + GetVersionResult::Gone + ); Ok(()) } @@ -109,13 +209,14 @@ mod test { history_segment.clone(), )?; + let client = txn.get_client(client_key)?.unwrap(); assert_eq!( - get_child_version(txn, client_key, parent_version_id)?, - Some(GetVersionResult { + get_child_version(txn, client_key, client, parent_version_id)?, + GetVersionResult::Success { version_id, parent_version_id, history_segment, - }) + } ); Ok(()) } From e2f79edad6fe805bccd67e8127f3032563870002 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 29 Sep 2021 23:52:58 +0000 Subject: [PATCH 5/7] add get_snapshot API method --- sync-server/src/api/get_snapshot.rs | 111 ++++++++++++++++++++++++++++ sync-server/src/api/mod.rs | 5 ++ sync-server/src/server.rs | 56 ++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 sync-server/src/api/get_snapshot.rs diff --git a/sync-server/src/api/get_snapshot.rs b/sync-server/src/api/get_snapshot.rs new file mode 100644 index 000000000..6de0be427 --- /dev/null +++ b/sync-server/src/api/get_snapshot.rs @@ -0,0 +1,111 @@ +use crate::api::{ + client_key_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE, VERSION_ID_HEADER, +}; +use crate::server::get_snapshot; +use actix_web::{error, get, web, HttpRequest, HttpResponse, Result}; + +/// Get a snapshot. +/// +/// If a snapshot for this client exists, it is returned with content-type +/// `application/vnd.taskchampion.snapshot`. The `X-Version-Id` header contains the version of the +/// snapshot. +/// +/// If no snapshot exists, returns a 404 with no content. Returns other 4xx or 5xx responses on +/// other errors. +#[get("/v1/client/snapshot")] +pub(crate) async fn service( + req: HttpRequest, + server_state: web::Data, +) -> Result { + let mut txn = server_state.txn().map_err(failure_to_ise)?; + + let client_key = client_key_header(&req)?; + + let client = txn + .get_client(client_key) + .map_err(failure_to_ise)? + .ok_or_else(|| error::ErrorNotFound("no such client"))?; + + if let Some((version_id, data)) = + get_snapshot(txn, client_key, client).map_err(failure_to_ise)? + { + Ok(HttpResponse::Ok() + .content_type(SNAPSHOT_CONTENT_TYPE) + .header(VERSION_ID_HEADER, version_id.to_string()) + .body(data)) + } else { + Err(error::ErrorNotFound("no snapshot")) + } +} + +#[cfg(test)] +mod test { + use crate::storage::{InMemoryStorage, Snapshot, Storage}; + use crate::Server; + use actix_web::{http::StatusCode, test, App}; + use chrono::{TimeZone, Utc}; + use pretty_assertions::assert_eq; + use uuid::Uuid; + + #[actix_rt::test] + async fn test_not_found() { + let client_key = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + + // set up the storage contents.. + { + let mut txn = storage.txn().unwrap(); + txn.new_client(client_key, Uuid::new_v4()).unwrap(); + } + + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = "/v1/client/snapshot"; + let req = test::TestRequest::get() + .uri(uri) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[actix_rt::test] + async fn test_success() { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let snapshot_data = vec![1, 2, 3, 4]; + let storage: Box = Box::new(InMemoryStorage::new()); + + // set up the storage contents.. + { + let mut txn = storage.txn().unwrap(); + txn.new_client(client_key, Uuid::new_v4()).unwrap(); + txn.set_snapshot( + client_key, + Snapshot { + version_id, + versions_since: 3, + timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40), + }, + snapshot_data.clone(), + ) + .unwrap(); + } + + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = "/v1/client/snapshot"; + let req = test::TestRequest::get() + .uri(uri) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let mut resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + use futures::StreamExt; + let (bytes, _) = resp.take_body().into_future().await; + assert_eq!(bytes.unwrap().unwrap().as_ref(), snapshot_data); + } +} diff --git a/sync-server/src/api/mod.rs b/sync-server/src/api/mod.rs index 917dda83e..87666a35a 100644 --- a/sync-server/src/api/mod.rs +++ b/sync-server/src/api/mod.rs @@ -5,11 +5,15 @@ use std::sync::Arc; mod add_version; mod get_child_version; +mod get_snapshot; /// The content-type for history segments (opaque blobs of bytes) pub(crate) const HISTORY_SEGMENT_CONTENT_TYPE: &str = "application/vnd.taskchampion.history-segment"; +/// The content-type for snapshots (opaque blobs of bytes) +pub(crate) const SNAPSHOT_CONTENT_TYPE: &str = "application/vnd.taskchampion.snapshot"; + /// The header name for version ID pub(crate) const VERSION_ID_HEADER: &str = "X-Version-Id"; @@ -26,6 +30,7 @@ pub(crate) fn api_scope() -> Scope { web::scope("") .service(get_child_version::service) .service(add_version::service) + .service(get_snapshot::service) } /// Convert a failure::Error to an Actix ISE diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 2b8f92445..0ed3f4e7b 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -105,6 +105,20 @@ pub(crate) fn add_version<'a>( Ok(AddVersionResult::Ok(version_id)) } +/// Implementation of the GetSnapshot protocol transaction +pub(crate) fn get_snapshot<'a>( + mut txn: Box, + client_key: ClientKey, + client: Client, +) -> anyhow::Result)>> { + Ok(if let Some(snap) = client.snapshot { + txn.get_snapshot_data(client_key, snap.version_id)? + .map(|data| (snap.version_id, data)) + } else { + None + }) +} + #[cfg(test)] mod test { use super::*; @@ -302,4 +316,46 @@ mod test { fn av_success_nil_latest_version_id() -> anyhow::Result<()> { test_av_success(false) } + + #[test] + fn get_snapshot_found() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let data = vec![1, 2, 3]; + let snapshot_version_id = Uuid::new_v4(); + + txn.new_client(client_key, snapshot_version_id)?; + txn.set_snapshot( + client_key, + Snapshot { + version_id: snapshot_version_id, + versions_since: 3, + timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40), + }, + data.clone(), + )?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!( + get_snapshot(txn, client_key, client)?, + Some((snapshot_version_id, data.clone())) + ); + + Ok(()) + } + + #[test] + fn get_snapshot_not_found() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + + txn.new_client(client_key, NO_VERSION_ID)?; + let client = txn.get_client(client_key)?.unwrap(); + + assert_eq!(get_snapshot(txn, client_key, client)?, None); + + Ok(()) + } } From d1da8eee52d51f8752fbc92362f7963417a3b4a1 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 30 Sep 2021 02:45:59 +0000 Subject: [PATCH 6/7] Add add_snapshot API method --- sync-server/src/api/add_snapshot.rs | 191 +++++++++++++++++ sync-server/src/api/mod.rs | 2 + sync-server/src/lib.rs | 7 + sync-server/src/server.rs | 308 +++++++++++++++++++++++++++- 4 files changed, 507 insertions(+), 1 deletion(-) create mode 100644 sync-server/src/api/add_snapshot.rs diff --git a/sync-server/src/api/add_snapshot.rs b/sync-server/src/api/add_snapshot.rs new file mode 100644 index 000000000..49de42cf5 --- /dev/null +++ b/sync-server/src/api/add_snapshot.rs @@ -0,0 +1,191 @@ +use crate::api::{client_key_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE}; +use crate::server::{add_snapshot, VersionId, NO_VERSION_ID}; +use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; +use futures::StreamExt; + +/// Max snapshot size: 100MB +const MAX_SIZE: usize = 100 * 1024 * 1024; + +/// Add a new snapshot, after checking prerequisites. The snapshot should be transmitted in the +/// request entity body and must have content-type `application/vnd.taskchampion.snapshot`. The +/// content can be encoded in any of the formats supported by actix-web. +/// +/// On success, the response is a 200 OK. Even in a 200 OK, the snapshot may not appear in a +/// subsequent `GetSnapshot` call. +/// +/// Returns other 4xx or 5xx responses on other errors. +#[post("/v1/client/add-snapshot/{version_id}")] +pub(crate) async fn service( + req: HttpRequest, + server_state: web::Data, + web::Path((version_id,)): web::Path<(VersionId,)>, + mut payload: web::Payload, +) -> Result { + // check content-type + if req.content_type() != SNAPSHOT_CONTENT_TYPE { + return Err(error::ErrorBadRequest("Bad content-type")); + } + + let client_key = client_key_header(&req)?; + + // read the body in its entirety + let mut body = web::BytesMut::new(); + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + // limit max size of in-memory payload + if (body.len() + chunk.len()) > MAX_SIZE { + return Err(error::ErrorBadRequest("Snapshot over maximum allowed size")); + } + body.extend_from_slice(&chunk); + } + + if body.is_empty() { + return Err(error::ErrorBadRequest("No snapshot supplied")); + } + + // note that we do not open the transaction until the body has been read + // completely, to avoid blocking other storage access while that data is + // in transit. + let mut txn = server_state.txn().map_err(failure_to_ise)?; + + // get, or create, the client + let client = match txn.get_client(client_key).map_err(failure_to_ise)? { + Some(client) => client, + None => { + txn.new_client(client_key, NO_VERSION_ID) + .map_err(failure_to_ise)?; + txn.get_client(client_key).map_err(failure_to_ise)?.unwrap() + } + }; + + add_snapshot(txn, client_key, client, version_id, body.to_vec()).map_err(failure_to_ise)?; + Ok(HttpResponse::Ok().body("")) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::{InMemoryStorage, Storage}; + use crate::Server; + use actix_web::{http::StatusCode, test, App}; + use pretty_assertions::assert_eq; + use uuid::Uuid; + + #[actix_rt::test] + async fn test_success() -> anyhow::Result<()> { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + + // set up the storage contents.. + { + let mut txn = storage.txn().unwrap(); + txn.new_client(client_key, version_id).unwrap(); + txn.add_version(client_key, version_id, NO_VERSION_ID, vec![])?; + } + + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header("Content-Type", "application/vnd.taskchampion.snapshot") + .header("X-Client-Key", client_key.to_string()) + .set_payload(b"abcd".to_vec()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + // read back that snapshot + let uri = "/v1/client/snapshot"; + let req = test::TestRequest::get() + .uri(uri) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let mut resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + use futures::StreamExt; + let (bytes, _) = resp.take_body().into_future().await; + assert_eq!(bytes.unwrap().unwrap().as_ref(), b"abcd"); + + Ok(()) + } + + #[actix_rt::test] + async fn test_not_added_200() { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + + // set up the storage contents.. + { + let mut txn = storage.txn().unwrap(); + txn.new_client(client_key, NO_VERSION_ID).unwrap(); + } + + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + // add a snapshot for a nonexistent version + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header("Content-Type", "application/vnd.taskchampion.snapshot") + .header("X-Client-Key", client_key.to_string()) + .set_payload(b"abcd".to_vec()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + // read back, seeing no snapshot + let uri = "/v1/client/snapshot"; + let req = test::TestRequest::get() + .uri(uri) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[actix_rt::test] + async fn test_bad_content_type() { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header("Content-Type", "not/correct") + .header("X-Client-Key", client_key.to_string()) + .set_payload(b"abcd".to_vec()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[actix_rt::test] + async fn test_empty_body() { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header( + "Content-Type", + "application/vnd.taskchampion.history-segment", + ) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } +} diff --git a/sync-server/src/api/mod.rs b/sync-server/src/api/mod.rs index 87666a35a..1c8dd48d5 100644 --- a/sync-server/src/api/mod.rs +++ b/sync-server/src/api/mod.rs @@ -3,6 +3,7 @@ use crate::storage::Storage; use actix_web::{error, http::StatusCode, web, HttpRequest, Result, Scope}; use std::sync::Arc; +mod add_snapshot; mod add_version; mod get_child_version; mod get_snapshot; @@ -31,6 +32,7 @@ pub(crate) fn api_scope() -> Scope { .service(get_child_version::service) .service(add_version::service) .service(get_snapshot::service) + .service(add_snapshot::service) } /// Convert a failure::Error to an Actix ISE diff --git a/sync-server/src/lib.rs b/sync-server/src/lib.rs index c219f2e1b..c817da29e 100644 --- a/sync-server/src/lib.rs +++ b/sync-server/src/lib.rs @@ -35,3 +35,10 @@ impl Server { .service(api_scope()) } } + +#[cfg(test)] +mod test { + pub(crate) fn init_logging() { + let _ = env_logger::builder().is_test(true).try_init(); + } +} diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 0ed3f4e7b..3a4455847 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -1,12 +1,18 @@ //! This module implements the core logic of the server: handling transactions, upholding //! invariants, and so on. This does not implement the HTTP-specific portions; those //! are in [`crate::api`]. See the protocol documentation for details. -use crate::storage::{Client, StorageTxn}; +use crate::storage::{Client, Snapshot, StorageTxn}; +use chrono::Utc; use uuid::Uuid; /// The distinguished value for "no version" pub const NO_VERSION_ID: VersionId = Uuid::nil(); +/// Number of versions to search back from the latest to find the +/// version for a newly-added snapshot. Snapshots for versions older +/// than this will be rejected. +const SNAPSHOT_SEARCH_LEN: i32 = 5; + pub(crate) type HistorySegment = Vec; pub(crate) type ClientKey = Uuid; pub(crate) type VersionId = Uuid; @@ -105,6 +111,90 @@ pub(crate) fn add_version<'a>( Ok(AddVersionResult::Ok(version_id)) } +/// Implementation of the AddSnapshot protocol transaction +pub(crate) fn add_snapshot<'a>( + mut txn: Box, + client_key: ClientKey, + client: Client, + version_id: VersionId, + data: Vec, +) -> anyhow::Result<()> { + log::debug!( + "add_snapshot(client_key: {}, version_id: {})", + client_key, + version_id, + ); + + // NOTE: if the snapshot is rejected, this function logs about it and returns + // Ok(()), as there's no reason to report an errot to the client / user. + + let last_snapshot = client.snapshot.map(|snap| snap.version_id); + if Some(version_id) == last_snapshot { + log::debug!( + "rejecting snapshot for version {}: already exists", + version_id + ); + return Ok(()); + } + + // look for this version in the history of this client, starting at the latest version, and + // only iterating for a limited number of versions. + let mut search_len = SNAPSHOT_SEARCH_LEN; + let mut vid = client.latest_version_id; + + loop { + if vid == version_id && version_id != NO_VERSION_ID { + // the new snapshot is for a recent version, so proceed + break; + } + + if Some(vid) == last_snapshot { + // the new snapshot is older than the last snapshot, so ignore it + log::debug!( + "rejecting snapshot for version {}: newer snapshot already exists or no such version", + version_id + ); + return Ok(()); + } + + search_len -= 1; + if search_len <= 0 || vid == NO_VERSION_ID { + // this should not happen in normal operation, so warn about it + log::warn!( + "rejecting snapshot for version {}: version is too old or no such version", + version_id + ); + return Ok(()); + } + + // get the parent version ID + if let Some(parent) = txn.get_version(client_key, vid)? { + vid = parent.parent_version_id; + } else { + // this version does not exist; "this should not happen" but if it does, + // we don't need a snapshot earlier than the missing version. + log::warn!( + "rejecting snapshot for version {}: newer versions have already been deleted", + version_id + ); + return Ok(()); + } + } + + log::warn!("accepting snapshot for version {}", version_id); + txn.set_snapshot( + client_key, + Snapshot { + version_id, + timestamp: Utc::now(), + versions_since: 0, + }, + data, + )?; + txn.commit()?; + Ok(()) +} + /// Implementation of the GetSnapshot protocol transaction pub(crate) fn get_snapshot<'a>( mut txn: Box, @@ -123,11 +213,14 @@ pub(crate) fn get_snapshot<'a>( mod test { use super::*; use crate::storage::{InMemoryStorage, Snapshot, Storage}; + use crate::test::init_logging; use chrono::{TimeZone, Utc}; use pretty_assertions::assert_eq; #[test] fn gcv_not_found_initial() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -144,6 +237,8 @@ mod test { #[test] fn gcv_gone_initial() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -170,6 +265,8 @@ mod test { #[test] fn gcv_not_found_up_to_date() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -189,6 +286,8 @@ mod test { #[test] fn gcv_gone() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -208,6 +307,8 @@ mod test { #[test] fn gcv_found() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -237,6 +338,8 @@ mod test { #[test] fn av_conflict() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -265,6 +368,8 @@ mod test { } fn test_av_success(latest_version_id_nil: bool) -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -317,8 +422,207 @@ mod test { test_av_success(false) } + #[test] + fn add_snapshot_success_latest() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + + // set up a task DB with one version in it + txn.new_client(client_key, version_id)?; + txn.add_version(client_key, version_id, NO_VERSION_ID, vec![])?; + + // add a snapshot for that version + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_id, vec![1, 2, 3])?; + + // verify the snapshot + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_id); + assert_eq!(snapshot.versions_since, 0); + assert_eq!( + txn.get_snapshot_data(client_key, version_id).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_success_older() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); + + // set up a task DB with two versions in it + txn.new_client(client_key, version_id_2)?; + txn.add_version(client_key, version_id_1, NO_VERSION_ID, vec![])?; + txn.add_version(client_key, version_id_2, version_id_1, vec![])?; + + // add a snapshot for version 1 + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_id_1, vec![1, 2, 3])?; + + // verify the snapshot + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_id_1); + assert_eq!(snapshot.versions_since, 0); + assert_eq!( + txn.get_snapshot_data(client_key, version_id_1).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_no_such() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); + + // set up a task DB with two versions in it + txn.new_client(client_key, version_id_2)?; + txn.add_version(client_key, version_id_1, NO_VERSION_ID, vec![])?; + txn.add_version(client_key, version_id_2, version_id_1, vec![])?; + + // add a snapshot for unknown version + let client = txn.get_client(client_key)?.unwrap(); + let version_id_unk = Uuid::new_v4(); + add_snapshot(txn, client_key, client, version_id_unk, vec![1, 2, 3])?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_too_old() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; + + // set up a task DB with 10 versions in it (oldest to newest) + txn.new_client(client_key, Uuid::nil())?; + for _ in 0..10 { + txn.add_version(client_key, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } + + // add a snapshot for the earliest of those + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_ids[0], vec![1, 2, 3])?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; + + // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the middle + // one + txn.new_client(client_key, Uuid::nil())?; + for _ in 0..5 { + txn.add_version(client_key, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } + txn.set_snapshot( + client_key, + Snapshot { + version_id: version_ids[2], + versions_since: 2, + timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40), + }, + vec![1, 2, 3], + )?; + + // add a snapshot for the earliest of those + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_ids[0], vec![9, 9, 9])?; + + println!("{:?}", version_ids); + + // verify the snapshot was not replaced + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_ids[2]); + assert_eq!(snapshot.versions_since, 2); + assert_eq!( + txn.get_snapshot_data(client_key, version_ids[2]).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_nil_version() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + + // just set up the client + txn.new_client(client_key, NO_VERSION_ID)?; + + // add a snapshot for the nil version + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, NO_VERSION_ID, vec![9, 9, 9])?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + #[test] fn get_snapshot_found() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -347,6 +651,8 @@ mod test { #[test] fn get_snapshot_not_found() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); From 7bb6ea6865e101041ae0d154a1e9f2ae970d8bc2 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Sun, 3 Oct 2021 02:14:36 +0000 Subject: [PATCH 7/7] Request snapshots in AddVersion --- sync-server/src/api/add_version.rs | 42 +++- sync-server/src/api/mod.rs | 3 + sync-server/src/server.rs | 361 +++++++++++++++++++++++------ 3 files changed, 328 insertions(+), 78 deletions(-) diff --git a/sync-server/src/api/add_version.rs b/sync-server/src/api/add_version.rs index c6d7314d0..92d86ef52 100644 --- a/sync-server/src/api/add_version.rs +++ b/sync-server/src/api/add_version.rs @@ -1,8 +1,8 @@ use crate::api::{ client_key_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, - PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER, + PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, VERSION_ID_HEADER, }; -use crate::server::{add_version, AddVersionResult, VersionId, NO_VERSION_ID}; +use crate::server::{add_version, AddVersionResult, SnapshotUrgency, VersionId, NO_VERSION_ID}; use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; use futures::StreamExt; @@ -18,6 +18,9 @@ const MAX_SIZE: usize = 100 * 1024 * 1024; /// the version cannot be added due to a conflict, the response is a 409 CONFLICT with the expected /// parent version ID in the `X-Parent-Version-Id` header. /// +/// If included, a snapshot request appears in the `X-Snapshot-Request` header with value +/// `urgency=low` or `urgency=high`. +/// /// Returns other 4xx or 5xx responses on other errors. #[post("/v1/client/add-version/{parent_version_id}")] pub(crate) async fn service( @@ -63,15 +66,30 @@ pub(crate) async fn service( } }; - let result = add_version(txn, client_key, client, parent_version_id, body.to_vec()) - .map_err(failure_to_ise)?; + let (result, snap_urgency) = + add_version(txn, client_key, client, parent_version_id, body.to_vec()) + .map_err(failure_to_ise)?; + Ok(match result { - AddVersionResult::Ok(version_id) => HttpResponse::Ok() - .header(VERSION_ID_HEADER, version_id.to_string()) - .body(""), - AddVersionResult::ExpectedParentVersion(parent_version_id) => HttpResponse::Conflict() - .header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string()) - .body(""), + AddVersionResult::Ok(version_id) => { + let mut rb = HttpResponse::Ok(); + rb.header(VERSION_ID_HEADER, version_id.to_string()); + match snap_urgency { + SnapshotUrgency::None => {} + SnapshotUrgency::Low => { + rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=low"); + } + SnapshotUrgency::High => { + rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=high"); + } + }; + rb.finish() + } + AddVersionResult::ExpectedParentVersion(parent_version_id) => { + let mut rb = HttpResponse::Conflict(); + rb.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string()); + rb.finish() + } }) } @@ -117,6 +135,10 @@ mod test { let new_version_id = resp.headers().get("X-Version-Id").unwrap(); assert!(new_version_id != &version_id.to_string()); + // Shapshot should be requested, since there is no existing snapshot + let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap(); + assert_eq!(snapshot_request, "urgency=high"); + assert_eq!(resp.headers().get("X-Parent-Version-Id"), None); } diff --git a/sync-server/src/api/mod.rs b/sync-server/src/api/mod.rs index 1c8dd48d5..b3157438a 100644 --- a/sync-server/src/api/mod.rs +++ b/sync-server/src/api/mod.rs @@ -24,6 +24,9 @@ pub(crate) const CLIENT_KEY_HEADER: &str = "X-Client-Key"; /// The header name for parent version ID pub(crate) const PARENT_VERSION_ID_HEADER: &str = "X-Parent-Version-Id"; +/// The header name for parent version ID +pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request"; + /// The type containing a reference to the Storage object in the Actix state. pub(crate) type ServerState = Arc; diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 3a4455847..69a65a088 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -13,6 +13,12 @@ pub const NO_VERSION_ID: VersionId = Uuid::nil(); /// than this will be rejected. const SNAPSHOT_SEARCH_LEN: i32 = 5; +/// Maximum number of days between snapshots +const SNAPSHOT_DAYS: i64 = 14; + +/// Maximum number of versions between snapshots +const SNAPSHOT_VERSIONS: u32 = 30; + pub(crate) type HistorySegment = Vec; pub(crate) type ClientKey = Uuid; pub(crate) type VersionId = Uuid; @@ -75,6 +81,43 @@ pub(crate) enum AddVersionResult { ExpectedParentVersion(VersionId), } +/// Urgency of a snapshot for a client; used to create the `X-Snapshot-Request` header. +#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)] +pub(crate) enum SnapshotUrgency { + /// Don't need a snapshot right now. + None, + + /// A snapshot would be good, but can wait for other replicas to provide it. + Low, + + /// A snapshot is needed right now. + High, +} + +impl SnapshotUrgency { + /// Calculate the urgency for a snapshot based on its age in days + fn for_days(days: i64) -> Self { + if days >= SNAPSHOT_DAYS * 3 / 2 { + SnapshotUrgency::High + } else if days >= SNAPSHOT_DAYS { + SnapshotUrgency::Low + } else { + SnapshotUrgency::None + } + } + + /// Calculate the urgency for a snapshot based on its age in versions + fn for_versions_since(versions_since: u32) -> Self { + if versions_since >= SNAPSHOT_VERSIONS * 3 / 2 { + SnapshotUrgency::High + } else if versions_since >= SNAPSHOT_VERSIONS { + SnapshotUrgency::Low + } else { + SnapshotUrgency::None + } + } +} + /// Implementation of the AddVersion protocol transaction pub(crate) fn add_version<'a>( mut txn: Box, @@ -82,7 +125,7 @@ pub(crate) fn add_version<'a>( client: Client, parent_version_id: VersionId, history_segment: HistorySegment, -) -> anyhow::Result { +) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { log::debug!( "add_version(client_key: {}, parent_version_id: {})", client_key, @@ -92,8 +135,9 @@ pub(crate) fn add_version<'a>( // check if this version is acceptable, under the protection of the transaction if client.latest_version_id != NO_VERSION_ID && parent_version_id != client.latest_version_id { log::debug!("add_version request rejected: mismatched latest_version_id"); - return Ok(AddVersionResult::ExpectedParentVersion( - client.latest_version_id, + return Ok(( + AddVersionResult::ExpectedParentVersion(client.latest_version_id), + SnapshotUrgency::None, )); } @@ -108,7 +152,26 @@ pub(crate) fn add_version<'a>( txn.add_version(client_key, version_id, parent_version_id, history_segment)?; txn.commit()?; - Ok(AddVersionResult::Ok(version_id)) + // calculate the urgency + let time_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { timestamp, .. }) => { + SnapshotUrgency::for_days((Utc::now() - timestamp).num_days()) + } + }; + + println!("{:?}", client.snapshot); + let version_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { versions_since, .. }) => { + SnapshotUrgency::for_versions_since(versions_since) + } + }; + + Ok(( + AddVersionResult::Ok(version_id), + std::cmp::max(time_urgency, version_urgency), + )) } /// Implementation of the AddSnapshot protocol transaction @@ -214,11 +277,44 @@ mod test { use super::*; use crate::storage::{InMemoryStorage, Snapshot, Storage}; use crate::test::init_logging; - use chrono::{TimeZone, Utc}; + use chrono::{Duration, TimeZone, Utc}; use pretty_assertions::assert_eq; #[test] - fn gcv_not_found_initial() -> anyhow::Result<()> { + fn snapshot_urgency_max() { + use SnapshotUrgency::*; + assert_eq!(std::cmp::max(None, None), None); + assert_eq!(std::cmp::max(None, Low), Low); + assert_eq!(std::cmp::max(None, High), High); + assert_eq!(std::cmp::max(Low, None), Low); + assert_eq!(std::cmp::max(Low, Low), Low); + assert_eq!(std::cmp::max(Low, High), High); + assert_eq!(std::cmp::max(High, None), High); + assert_eq!(std::cmp::max(High, Low), High); + assert_eq!(std::cmp::max(High, High), High); + } + + #[test] + fn snapshot_urgency_for_days() { + use SnapshotUrgency::*; + assert_eq!(SnapshotUrgency::for_days(0), None); + assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS), Low); + assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS * 2), High); + } + + #[test] + fn snapshot_urgency_for_versions_since() { + use SnapshotUrgency::*; + assert_eq!(SnapshotUrgency::for_versions_since(0), None); + assert_eq!(SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS), Low); + assert_eq!( + SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS * 2), + High + ); + } + + #[test] + fn get_child_version_not_found_initial() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -236,7 +332,7 @@ mod test { } #[test] - fn gcv_gone_initial() -> anyhow::Result<()> { + fn get_child_version_gone_initial() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -264,7 +360,7 @@ mod test { } #[test] - fn gcv_not_found_up_to_date() -> anyhow::Result<()> { + fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -285,7 +381,7 @@ mod test { } #[test] - fn gcv_gone() -> anyhow::Result<()> { + fn get_child_version_gone() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -306,7 +402,7 @@ mod test { } #[test] - fn gcv_found() -> anyhow::Result<()> { + fn get_child_version_found() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -336,90 +432,221 @@ mod test { Ok(()) } - #[test] - fn av_conflict() -> anyhow::Result<()> { + /// Utility setup function for add_version tests + fn av_setup( + storage: &InMemoryStorage, + num_versions: u32, + snapshot_version: Option, + snapshot_days_ago: Option, + ) -> anyhow::Result<(Uuid, Vec)> { init_logging(); - - let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); - let parent_version_id = Uuid::new_v4(); - let history_segment = b"abcd".to_vec(); - let existing_parent_version_id = Uuid::new_v4(); - let client = Client { - latest_version_id: existing_parent_version_id, - snapshot: None, - }; + let mut versions = vec![]; + let mut version_id = Uuid::nil(); + txn.new_client(client_key, Uuid::nil())?; + for vnum in 0..num_versions { + let parent_version_id = version_id; + version_id = Uuid::new_v4(); + versions.push(version_id); + txn.add_version( + client_key, + version_id, + parent_version_id, + vec![0, 0, vnum as u8], + )?; + if Some(vnum) == snapshot_version { + txn.set_snapshot( + client_key, + Snapshot { + version_id, + versions_since: 0, + timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)), + }, + vec![vnum as u8], + )?; + } + } + + Ok((client_key, versions)) + } + + /// Utility function to check the results of an add_version call + fn av_success_check( + storage: &InMemoryStorage, + client_key: Uuid, + existing_versions: &[Uuid], + result: (AddVersionResult, SnapshotUrgency), + expected_history: Vec, + expected_urgency: SnapshotUrgency, + ) -> anyhow::Result<()> { + if let AddVersionResult::Ok(new_version_id) = result.0 { + // check that it invented a new version ID + for v in existing_versions { + assert_ne!(&new_version_id, v); + } + + // verify that the storage was updated + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, new_version_id); + + let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil); + let version = txn.get_version(client_key, new_version_id)?.unwrap(); + assert_eq!(version.version_id, new_version_id); + assert_eq!(version.parent_version_id, parent_version_id); + assert_eq!(version.history_segment, expected_history); + } else { + panic!("did not get Ok from add_version: {:?}", result); + } + + assert_eq!(result.1, expected_urgency); + + Ok(()) + } + + #[test] + fn add_version_conflict() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_key, versions) = av_setup(&storage, 3, None, None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + // try to add a child of a version other than the latest assert_eq!( - add_version(txn, client_key, client, parent_version_id, history_segment)?, - AddVersionResult::ExpectedParentVersion(existing_parent_version_id) + add_version(txn, client_key, client, versions[1], vec![3, 6, 9])?.0, + AddVersionResult::ExpectedParentVersion(versions[2]) ); // verify that the storage wasn't updated txn = storage.txn()?; - assert_eq!(txn.get_client(client_key)?, None); assert_eq!( - txn.get_version_by_parent(client_key, parent_version_id)?, - None + txn.get_client(client_key)?.unwrap().latest_version_id, + versions[2] ); + assert_eq!(txn.get_version_by_parent(client_key, versions[2])?, None); Ok(()) } - fn test_av_success(latest_version_id_nil: bool) -> anyhow::Result<()> { - init_logging(); - + #[test] + fn add_version_with_existing_history() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_key = Uuid::new_v4(); - let parent_version_id = Uuid::new_v4(); - let history_segment = b"abcd".to_vec(); - let latest_version_id = if latest_version_id_nil { - Uuid::nil() - } else { - parent_version_id - }; + let (client_key, versions) = av_setup(&storage, 1, None, None)?; - txn.new_client(client_key, latest_version_id)?; + let mut txn = storage.txn()?; let client = txn.get_client(client_key)?.unwrap(); - let result = add_version( - txn, - client_key, - client, - parent_version_id, - history_segment.clone(), - )?; - if let AddVersionResult::Ok(new_version_id) = result { - // check that it invented a new version ID - assert!(new_version_id != parent_version_id); + let result = add_version(txn, client_key, client, versions[0], vec![3, 6, 9])?; - // verify that the storage was updated - txn = storage.txn()?; - let client = txn.get_client(client_key)?.unwrap(); - assert_eq!(client.latest_version_id, new_version_id); - let version = txn - .get_version_by_parent(client_key, parent_version_id)? - .unwrap(); - assert_eq!(version.version_id, new_version_id); - assert_eq!(version.parent_version_id, parent_version_id); - assert_eq!(version.history_segment, history_segment); - } else { - panic!("did not get Ok from add_version"); - } + av_success_check( + &storage, + client_key, + &versions, + result, + vec![3, 6, 9], + // urgency=high because there are no snapshots yet + SnapshotUrgency::High, + )?; Ok(()) } #[test] - fn av_success_with_existing_history() -> anyhow::Result<()> { - test_av_success(true) + fn add_version_with_no_history() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_key, versions) = av_setup(&storage, 0, None, None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let parent_version_id = Uuid::nil(); + let result = add_version(txn, client_key, client, parent_version_id, vec![3, 6, 9])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![3, 6, 9], + // urgency=high because there are no snapshots yet + SnapshotUrgency::High, + )?; + + Ok(()) } #[test] - fn av_success_nil_latest_version_id() -> anyhow::Result<()> { - test_av_success(false) + fn add_version_success_recent_snapshot() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_key, versions) = av_setup(&storage, 1, Some(0), None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![1, 2, 3], + // no snapshot request since the previous version has a snapshot + SnapshotUrgency::None, + )?; + + Ok(()) + } + + #[test] + fn add_version_success_aged_snapshot() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + // one snapshot, but it was 50 days ago + let (client_key, versions) = av_setup(&storage, 1, Some(0), Some(50))?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![1, 2, 3], + // urgency=high due to days since the snapshot + SnapshotUrgency::High, + )?; + + Ok(()) + } + + #[test] + fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + // one snapshot, but it was 50 versions ago + let (client_key, versions) = av_setup(&storage, 50, Some(0), None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let result = add_version(txn, client_key, client, versions[49], vec![1, 2, 3])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![1, 2, 3], + // urgency=high due to number of versions since the snapshot + SnapshotUrgency::High, + )?; + + Ok(()) } #[test] @@ -580,8 +807,6 @@ mod test { let client = txn.get_client(client_key)?.unwrap(); add_snapshot(txn, client_key, client, version_ids[0], vec![9, 9, 9])?; - println!("{:?}", version_ids); - // verify the snapshot was not replaced let mut txn = storage.txn()?; let client = txn.get_client(client_key)?.unwrap();