From a81c84d7c75a3ab83c29a5c0044835d3bd84b00b Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 25 Nov 2020 17:52:47 -0500 Subject: [PATCH] refactor sync to new model --- taskchampion/src/replica.rs | 4 +- taskchampion/src/server/mod.rs | 2 +- taskchampion/src/server/test.rs | 120 +++++++++++------------ taskchampion/src/server/types.rs | 52 +++++++--- taskchampion/src/taskdb.rs | 98 ++++++++++-------- taskchampion/src/taskstorage/inmemory.rs | 14 +-- taskchampion/src/taskstorage/kv.rs | 36 ++++--- taskchampion/src/taskstorage/mod.rs | 10 +- 8 files changed, 193 insertions(+), 143 deletions(-) diff --git a/taskchampion/src/replica.rs b/taskchampion/src/replica.rs index 0ca2e67f6..f141e2348 100644 --- a/taskchampion/src/replica.rs +++ b/taskchampion/src/replica.rs @@ -145,8 +145,8 @@ impl Replica { } /// Synchronize this replica against the given server. - pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> { - self.taskdb.sync(username, server) + pub fn sync(&mut self, server: &mut dyn Server) -> Fallible<()> { + self.taskdb.sync(server) } /// Perform "garbage collection" on this replica. In particular, this renumbers the working diff --git a/taskchampion/src/server/mod.rs b/taskchampion/src/server/mod.rs index 9a6214425..06009b127 100644 --- a/taskchampion/src/server/mod.rs +++ b/taskchampion/src/server/mod.rs @@ -4,4 +4,4 @@ pub(crate) mod test; mod signing; mod types; -pub use types::{Blob, Server, VersionAdd}; +pub use types::*; diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index e9a85f17d..3175c2aa5 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -1,84 +1,78 @@ -use crate::server::{Blob, Server, VersionAdd}; +use crate::server::{ + AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID, +}; use failure::Fallible; use std::collections::HashMap; +use uuid::Uuid; -pub(crate) struct TestServer { - users: HashMap, +struct Version { + version_id: VersionId, + parent_version_id: VersionId, + history_segment: HistorySegment, } -struct User { - // versions, indexed at v-1 - versions: Vec, - snapshots: HashMap, +pub(crate) struct TestServer { + latest_version_id: VersionId, + // NOTE: indexed by parent_version_id! + versions: HashMap, } impl TestServer { + /// A test server has no notion of clients, signatures, encryption, etc. pub fn new() -> TestServer { TestServer { - users: HashMap::new(), + latest_version_id: NO_VERSION_ID, + versions: HashMap::new(), } } - - fn get_user_mut(&mut self, username: &str) -> &mut User { - self.users - .entry(username.to_string()) - .or_insert_with(User::new) - } } impl Server for TestServer { - /// Get a vector of all versions after `since_version` - fn get_versions(&self, username: &str, since_version: u64) -> Fallible> { - if let Some(user) = self.users.get(username) { - user.get_versions(since_version) - } else { - Ok(vec![]) - } - } - /// Add a new version. If the given version number is incorrect, this responds with the /// appropriate version and expects the caller to try again. - fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible { - self.get_user_mut(username).add_version(version, blob) + fn add_version( + &mut self, + parent_version_id: VersionId, + history_segment: HistorySegment, + ) -> Fallible { + // no client lookup + // no signature validation + + // check the parent_version_id for linearity + if self.latest_version_id != NO_VERSION_ID { + if parent_version_id != self.latest_version_id { + return Ok(AddVersionResult::ExpectedParentVersion( + self.latest_version_id, + )); + } + } + + // invent a new ID for this version + let version_id = Uuid::new_v4(); + + self.versions.insert( + parent_version_id, + Version { + version_id, + parent_version_id, + history_segment, + }, + ); + self.latest_version_id = version_id; + + Ok(AddVersionResult::Ok(version_id)) } - fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()> { - self.get_user_mut(username).add_snapshot(version, blob) - } -} - -impl User { - fn new() -> User { - User { - versions: vec![], - snapshots: HashMap::new(), - } - } - - fn get_versions(&self, since_version: u64) -> Fallible> { - let last_version = self.versions.len(); - if last_version == since_version as usize { - return Ok(vec![]); - } - Ok(self.versions[since_version as usize..last_version] - .iter() - .map(|r| r.clone()) - .collect::>()) - } - - fn add_version(&mut self, version: u64, blob: Blob) -> Fallible { - // of by one here: client wants to send version 1 first - let expected_version = self.versions.len() as u64 + 1; - if version != expected_version { - return Ok(VersionAdd::ExpectedVersion(expected_version)); - } - self.versions.push(blob); - - Ok(VersionAdd::Ok) - } - - fn add_snapshot(&mut self, version: u64, blob: Blob) -> Fallible<()> { - self.snapshots.insert(version, blob); - Ok(()) + /// Get a vector of all versions after `since_version` + fn get_child_version(&self, parent_version_id: VersionId) -> Fallible { + if let Some(version) = self.versions.get(&parent_version_id) { + Ok(GetVersionResult::Version { + version_id: version.version_id, + parent_version_id: version.parent_version_id, + history_segment: version.history_segment.clone(), + }) + } else { + Ok(GetVersionResult::NoSuchVersion) + } } } diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index 567997cac..9d95f588b 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -1,26 +1,46 @@ use failure::Fallible; +use uuid::Uuid; -/// A Blob is a hunk of encoded data that is sent to the server. The server does not interpret -/// this data at all. -pub type Blob = Vec; +/// Versions are referred to with sha2 hashes. +pub type VersionId = Uuid; + +/// The distinguished value for "no version" +pub const NO_VERSION_ID: VersionId = Uuid::nil(); + +/// A segment in the history of this task database, in the form of a sequence of operations. This +/// data is pre-encoded, and from the protocol level appears as a sequence of bytes. +pub type HistorySegment = Vec; /// VersionAdd is the response type from [`crate:server::Server::add_version`]. -pub enum VersionAdd { - /// OK, version added - Ok, - /// Rejected, must be based on the the given version - ExpectedVersion(u64), +pub enum AddVersionResult { + /// OK, version added with the given ID + Ok(VersionId), + /// Rejected; expected a version with the given parent version + ExpectedParentVersion(VersionId), +} + +/// A version as downloaded from the server +pub enum GetVersionResult { + /// No such version exists + NoSuchVersion, + + /// The requested version + Version { + version_id: VersionId, + parent_version_id: VersionId, + history_segment: HistorySegment, + }, } /// A value implementing this trait can act as a server against which a replica can sync. pub trait Server { - /// Get a vector of all versions after `since_version` - fn get_versions(&self, username: &str, since_version: u64) -> Fallible>; + /// Add a new version. + fn add_version( + &mut self, + parent_version_id: VersionId, + history_segment: HistorySegment, + ) -> Fallible; - /// Add a new version. If the given version number is incorrect, this responds with the - /// appropriate version and expects the caller to try again. - fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible; - - /// TODO: undefined yet - fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()>; + /// Get the version with the given parent VersionId + fn get_child_version(&self, parent_version_id: VersionId) -> Fallible; } diff --git a/taskchampion/src/taskdb.rs b/taskchampion/src/taskdb.rs index 65e321888..86523717d 100644 --- a/taskchampion/src/taskdb.rs +++ b/taskchampion/src/taskdb.rs @@ -1,5 +1,5 @@ use crate::errors::Error; -use crate::server::{Server, VersionAdd}; +use crate::server::{AddVersionResult, GetVersionResult, Server}; use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; use failure::Fallible; use serde::{Deserialize, Serialize}; @@ -13,7 +13,6 @@ pub struct TaskDB { #[derive(Serialize, Deserialize, Debug)] struct Version { - version: u64, operations: Vec, } @@ -165,21 +164,34 @@ impl TaskDB { } /// Sync to the given server, pulling remote changes and pushing local changes. - pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> { + pub fn sync(&mut self, server: &mut dyn Server) -> Fallible<()> { let mut txn = self.storage.txn()?; // retry synchronizing until the server accepts our version (this allows for races between // replicas trying to sync to the same server) loop { - // first pull changes and "rebase" on top of them - let new_versions = server.get_versions(username, txn.base_version()?)?; - for version_blob in new_versions { - let version_str = str::from_utf8(&version_blob).unwrap(); - let version: Version = serde_json::from_str(version_str).unwrap(); - assert_eq!(version.version, txn.base_version()? + 1); - println!("applying version {:?} from server", version.version); + let mut base_version_id = txn.base_version()?; - TaskDB::apply_version(txn.as_mut(), version)?; + // first pull changes and "rebase" on top of them + loop { + if let GetVersionResult::Version { + version_id, + history_segment, + .. + } = server.get_child_version(base_version_id)? + { + let version_str = str::from_utf8(&history_segment).unwrap(); + let version: Version = serde_json::from_str(version_str).unwrap(); + println!("applying version {:?} from server", version_id); + + // apply this verison and update base_version in storage + TaskDB::apply_version(txn.as_mut(), version)?; + txn.set_base_version(version_id)?; + base_version_id = version_id; + } else { + // at the moment, no more child versions, so we can try adding our own + break; + } } let operations: Vec = txn.operations()?.to_vec(); @@ -189,18 +201,23 @@ impl TaskDB { } // now make a version of our local changes and push those - let new_version = Version { - version: txn.base_version()? + 1, - operations, - }; - let new_version_str = serde_json::to_string(&new_version).unwrap(); - println!("sending version {:?} to server", new_version.version); - if let VersionAdd::Ok = - server.add_version(username, new_version.version, new_version_str.into())? - { - txn.set_base_version(new_version.version)?; - txn.set_operations(vec![])?; - break; + let new_version = Version { operations }; + let history_segment = serde_json::to_string(&new_version).unwrap().into(); + println!("sending new version to server"); + match server.add_version(base_version_id, history_segment)? { + AddVersionResult::Ok(new_version_id) => { + println!("version {:?} received by server", new_version_id); + txn.set_base_version(new_version_id)?; + txn.set_operations(vec![])?; + break; + } + AddVersionResult::ExpectedParentVersion(parent_version_id) => { + println!( + "new version rejected; must be based on {:?}", + parent_version_id + ); + // ..continue the outer loop + } } } @@ -256,7 +273,6 @@ impl TaskDB { } local_operations = new_local_ops; } - txn.set_base_version(version.version)?; txn.set_operations(local_operations)?; Ok(()) } @@ -518,10 +534,10 @@ mod tests { let mut server = TestServer::new(); let mut db1 = newdb(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); let mut db2 = newdb(); - db2.sync("me", &mut server).unwrap(); + db2.sync(&mut server).unwrap(); // make some changes in parallel to db1 and db2.. let uuid1 = Uuid::new_v4(); @@ -545,9 +561,9 @@ mod tests { .unwrap(); // and synchronize those around - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // now make updates to the same task on both sides @@ -567,9 +583,9 @@ mod tests { .unwrap(); // and synchronize those around - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } @@ -578,10 +594,10 @@ mod tests { let mut server = TestServer::new(); let mut db1 = newdb(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); let mut db2 = newdb(); - db2.sync("me", &mut server).unwrap(); + db2.sync(&mut server).unwrap(); // create and update a task.. let uuid = Uuid::new_v4(); @@ -595,9 +611,9 @@ mod tests { .unwrap(); // and synchronize those around - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // delete and re-create the task on db1 @@ -620,9 +636,9 @@ mod tests { }) .unwrap(); - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } @@ -678,7 +694,7 @@ mod tests { println!(" {:?} (ignored)", e); } }, - Action::Sync => db.sync("me", &mut server).unwrap(), + Action::Sync => db.sync(&mut server).unwrap(), } } diff --git a/taskchampion/src/taskstorage/inmemory.rs b/taskchampion/src/taskstorage/inmemory.rs index 8fb3da3a1..66a871ed8 100644 --- a/taskchampion/src/taskstorage/inmemory.rs +++ b/taskchampion/src/taskstorage/inmemory.rs @@ -1,6 +1,8 @@ #![allow(clippy::new_without_default)] -use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; +use crate::taskstorage::{ + Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION, +}; use failure::Fallible; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -9,7 +11,7 @@ use uuid::Uuid; #[derive(PartialEq, Debug, Clone)] struct Data { tasks: HashMap, - base_version: u64, + base_version: VersionId, operations: Vec, working_set: Vec>, } @@ -79,11 +81,11 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(self.data_ref().tasks.keys().copied().collect()) } - fn base_version(&mut self) -> Fallible { - Ok(self.data_ref().base_version) + fn base_version(&mut self) -> Fallible { + Ok(self.data_ref().base_version.clone()) } - fn set_base_version(&mut self, version: u64) -> Fallible<()> { + fn set_base_version(&mut self, version: VersionId) -> Fallible<()> { self.mut_data_ref().base_version = version; Ok(()) } @@ -138,7 +140,7 @@ impl InMemoryStorage { InMemoryStorage { data: Data { tasks: HashMap::new(), - base_version: 0, + base_version: DEFAULT_BASE_VERSION.into(), operations: vec![], working_set: vec![None], }, diff --git a/taskchampion/src/taskstorage/kv.rs b/taskchampion/src/taskstorage/kv.rs index 18e18fb19..52947a5c0 100644 --- a/taskchampion/src/taskstorage/kv.rs +++ b/taskchampion/src/taskstorage/kv.rs @@ -1,4 +1,6 @@ -use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; +use crate::taskstorage::{ + Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION, +}; use failure::Fallible; use kv::msgpack::Msgpack; use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf}; @@ -48,6 +50,7 @@ pub struct KVStorage<'t> { store: Store, tasks_bucket: Bucket<'t, Key, ValueBuf>>, numbers_bucket: Bucket<'t, Integer, ValueBuf>>, + uuids_bucket: Bucket<'t, Integer, ValueBuf>>, operations_bucket: Bucket<'t, Integer, ValueBuf>>, working_set_bucket: Bucket<'t, Integer, ValueBuf>>, } @@ -61,6 +64,7 @@ impl<'t> KVStorage<'t> { let mut config = Config::default(directory); config.bucket("tasks", None); config.bucket("numbers", None); + config.bucket("uuids", None); config.bucket("operations", None); config.bucket("working_set", None); let store = Store::new(config)?; @@ -71,6 +75,9 @@ impl<'t> KVStorage<'t> { // this bucket contains various u64s, indexed by constants above let numbers_bucket = store.int_bucket::>>(Some("numbers"))?; + // this bucket contains various Uuids, indexed by constants above + let uuids_bucket = store.int_bucket::>>(Some("uuids"))?; + // this bucket contains operations, numbered consecutively; the NEXT_OPERATION number gives // the index of the next operation to insert let operations_bucket = @@ -85,6 +92,7 @@ impl<'t> KVStorage<'t> { store, tasks_bucket, numbers_bucket, + uuids_bucket, operations_bucket, working_set_bucket, }) @@ -122,6 +130,9 @@ impl<'t> Txn<'t> { fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { &self.storage.numbers_bucket } + fn uuids_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { + &self.storage.uuids_bucket + } fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { &self.storage.operations_bucket } @@ -193,26 +204,26 @@ impl<'t> TaskStorageTxn for Txn<'t> { .collect()) } - fn base_version(&mut self) -> Fallible { - let bucket = self.numbers_bucket(); + fn base_version(&mut self) -> Fallible { + let bucket = self.uuids_bucket(); let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) { Ok(buf) => buf, - Err(Error::NotFound) => return Ok(0), + Err(Error::NotFound) => return Ok(DEFAULT_BASE_VERSION.into()), Err(e) => return Err(e.into()), } .inner()? .to_serde(); - Ok(base_version) + Ok(base_version as VersionId) } - fn set_base_version(&mut self, version: u64) -> Fallible<()> { - let numbers_bucket = self.numbers_bucket(); + fn set_base_version(&mut self, version: VersionId) -> Fallible<()> { + let uuids_bucket = self.uuids_bucket(); let kvtxn = self.kvtxn(); kvtxn.set( - numbers_bucket, + uuids_bucket, BASE_VERSION.into(), - Msgpack::to_value_buf(version)?, + Msgpack::to_value_buf(version as Uuid)?, )?; Ok(()) } @@ -528,7 +539,7 @@ mod test { let mut storage = KVStorage::new(&tmp_dir.path())?; { let mut txn = storage.txn()?; - assert_eq!(txn.base_version()?, 0); + assert_eq!(txn.base_version()?, DEFAULT_BASE_VERSION); } Ok(()) } @@ -537,14 +548,15 @@ mod test { fn test_base_version_setting() -> Fallible<()> { let tmp_dir = TempDir::new("test")?; let mut storage = KVStorage::new(&tmp_dir.path())?; + let u = Uuid::new_v4(); { let mut txn = storage.txn()?; - txn.set_base_version(3)?; + txn.set_base_version(u)?; txn.commit()?; } { let mut txn = storage.txn()?; - assert_eq!(txn.base_version()?, 3); + assert_eq!(txn.base_version()?, u); } Ok(()) } diff --git a/taskchampion/src/taskstorage/mod.rs b/taskchampion/src/taskstorage/mod.rs index 8f25781c5..4ddd7df75 100644 --- a/taskchampion/src/taskstorage/mod.rs +++ b/taskchampion/src/taskstorage/mod.rs @@ -23,6 +23,12 @@ fn taskmap_with(mut properties: Vec<(String, String)>) -> TaskMap { rv } +/// The type of VersionIds +pub use crate::server::VersionId; + +/// The default for base_version. +pub(crate) const DEFAULT_BASE_VERSION: Uuid = crate::server::NO_VERSION_ID; + /// A TaskStorage transaction, in which storage operations are performed. /// /// # Concurrency @@ -58,10 +64,10 @@ pub trait TaskStorageTxn { fn all_task_uuids(&mut self) -> Fallible>; /// Get the current base_version for this storage -- the last version synced from the server. - fn base_version(&mut self) -> Fallible; + fn base_version(&mut self) -> Fallible; /// Set the current base_version for this storage. - fn set_base_version(&mut self, version: u64) -> Fallible<()>; + fn set_base_version(&mut self, version: VersionId) -> Fallible<()>; /// Get the current set of outstanding operations (operations that have not been sync'd to the /// server yet)