diff --git a/Cargo.lock b/Cargo.lock index eaf24ef..98e1ada 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1424,6 +1424,7 @@ dependencies = [ "env_logger", "log", "pretty_assertions", + "thiserror", "uuid", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 4a2f614..0944bdd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -9,6 +9,7 @@ license = "MIT" [dependencies] uuid.workspace = true anyhow.workspace = true +thiserror.workspace = true log.workspace = true env_logger.workspace = true chrono.workspace = true diff --git a/core/src/error.rs b/core/src/error.rs new file mode 100644 index 0000000..79e1f05 --- /dev/null +++ b/core/src/error.rs @@ -0,0 +1,13 @@ +/// An error from the [`crate::Server`] type. +/// +/// This type represents only circumstances outside the realm of the protocol, and not the specific +/// results descriebd in the protocol documentation. +#[derive(Debug, thiserror::Error)] +pub enum ServerError { + /// There is no client with the given ClientId. + #[error("No such client")] + NoSuchClient, + + #[error(transparent)] + Other(#[from] anyhow::Error), +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 99c6778..807996a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -7,26 +7,17 @@ //! This crate uses an abstract storage backend. Note that this does not implement the //! HTTP-specific portions of the protocol, nor provide any storage implementations. //! -//! ## API Methods +//! ## Usage //! -//! The following API methods are implemented. These methods are documented in more detail in -//! the protocol documentation. -//! -//! * [`add_version`] -//! * [`get_child_version`] -//! * [`add_snapshot`] -//! * [`get_snapshot`] -//! -//! Each API method takes: -//! -//! * [`StorageTxn`] to access storage. Methods which modify storage will commit the transaction before returning. -//! * [`ServerConfig`] providing basic configuration for the server's behavior. -//! * `client_id` and a [`Client`] providing the client metadata. +//! To use, create a new [`Server`] instance and call the relevant protocol API methods. The +//! arguments and return values correspond closely to the protocol documentation. +mod error; mod inmemory; mod server; mod storage; +pub use error::*; pub use inmemory::*; pub use server::*; pub use storage::*; diff --git a/core/src/server.rs b/core/src/server.rs index cc99b86..ea871a5 100644 --- a/core/src/server.rs +++ b/core/src/server.rs @@ -1,4 +1,5 @@ -use crate::storage::{Client, Snapshot, StorageTxn}; +use crate::error::ServerError; +use crate::storage::{Snapshot, Storage, StorageTxn}; use chrono::Utc; use uuid::Uuid; @@ -32,15 +33,6 @@ impl Default for ServerConfig { } } -impl ServerConfig { - pub fn from_args(snapshot_days: i64, snapshot_versions: u32) -> anyhow::Result { - Ok(ServerConfig { - snapshot_days, - snapshot_versions, - }) - } -} - /// Response to get_child_version. See the protocol documentation. #[derive(Clone, PartialEq, Debug)] pub enum GetVersionResult { @@ -53,40 +45,6 @@ pub enum GetVersionResult { }, } -/// Implementation of the GetChildVersion protocol transaction. -pub fn get_child_version<'a>( - mut txn: Box, - _config: &ServerConfig, - client_id: ClientId, - client: Client, - parent_version_id: VersionId, -) -> anyhow::Result { - // If a version with parentVersionId equal to the requested parentVersionId exists, it is returned. - if let Some(version) = txn.get_version_by_parent(client_id, parent_version_id)? { - return Ok(GetVersionResult::Success { - version_id: version.version_id, - parent_version_id: version.parent_version_id, - history_segment: version.history_segment, - }); - } - - // Return NotFound if an AddVersion with this parent_version_id would succeed, and otherwise - // return Gone. - // - // AddVersion will succeed if either - // - the requested parent version is the latest version; or - // - there is no latest version, meaning there are no versions stored for this client - Ok( - if client.latest_version_id == parent_version_id - || client.latest_version_id == NIL_VERSION_ID - { - GetVersionResult::NotFound - } else { - GetVersionResult::Gone - }, - ) -} - /// Response to add_version #[derive(Clone, PartialEq, Debug)] pub enum AddVersionResult { @@ -133,172 +91,301 @@ impl SnapshotUrgency { } } -/// Implementation of the AddVersion protocol transaction -pub fn add_version<'a>( - mut txn: Box, - config: &ServerConfig, - client_id: ClientId, - client: Client, - parent_version_id: VersionId, - history_segment: HistorySegment, -) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { - log::debug!( - "add_version(client_id: {}, parent_version_id: {})", - client_id, - parent_version_id, - ); - - // check if this version is acceptable, under the protection of the transaction - if client.latest_version_id != NIL_VERSION_ID && parent_version_id != client.latest_version_id { - log::debug!("add_version request rejected: mismatched latest_version_id"); - return Ok(( - AddVersionResult::ExpectedParentVersion(client.latest_version_id), - SnapshotUrgency::None, - )); - } - - // invent a version ID - let version_id = Uuid::new_v4(); - log::debug!( - "add_version request accepted: new version_id: {}", - version_id - ); - - // update the DB - txn.add_version(client_id, version_id, parent_version_id, history_segment)?; - txn.commit()?; - - // calculate the urgency - let time_urgency = match client.snapshot { - None => SnapshotUrgency::High, - Some(Snapshot { timestamp, .. }) => { - SnapshotUrgency::for_days(config, (Utc::now() - timestamp).num_days()) - } - }; - - let version_urgency = match client.snapshot { - None => SnapshotUrgency::High, - Some(Snapshot { versions_since, .. }) => { - SnapshotUrgency::for_versions_since(config, versions_since) - } - }; - - Ok(( - AddVersionResult::Ok(version_id), - std::cmp::max(time_urgency, version_urgency), - )) +/// A server implementing the TaskChampion sync protocol. +pub struct Server { + config: ServerConfig, + storage: Box, } -/// Implementation of the AddSnapshot protocol transaction -pub fn add_snapshot<'a>( - mut txn: Box, - _config: &ServerConfig, - client_id: ClientId, - client: Client, - version_id: VersionId, - data: Vec, -) -> anyhow::Result<()> { - log::debug!( - "add_snapshot(client_id: {}, version_id: {})", - client_id, - version_id, - ); - - // NOTE: if the snapshot is rejected, this function logs about it and returns - // Ok(()), as there's no reason to report an errot to the client / user. - - let last_snapshot = client.snapshot.map(|snap| snap.version_id); - if Some(version_id) == last_snapshot { - log::debug!( - "rejecting snapshot for version {}: already exists", - version_id - ); - return Ok(()); +impl Server { + pub fn new(config: ServerConfig, storage: ST) -> Self { + Self { + config, + storage: Box::new(storage), + } } - // look for this version in the history of this client, starting at the latest version, and - // only iterating for a limited number of versions. - let mut search_len = SNAPSHOT_SEARCH_LEN; - let mut vid = client.latest_version_id; + /// Implementation of the GetChildVersion protocol transaction. + pub fn get_child_version( + &self, + client_id: ClientId, + parent_version_id: VersionId, + ) -> Result { + let mut txn = self.storage.txn()?; + let client = txn + .get_client(client_id)? + .ok_or(ServerError::NoSuchClient)?; - loop { - if vid == version_id && version_id != NIL_VERSION_ID { - // the new snapshot is for a recent version, so proceed - break; + // If a version with parentVersionId equal to the requested parentVersionId exists, it is + // returned. + if let Some(version) = txn.get_version_by_parent(client_id, parent_version_id)? { + return Ok(GetVersionResult::Success { + version_id: version.version_id, + parent_version_id: version.parent_version_id, + history_segment: version.history_segment, + }); } - if Some(vid) == last_snapshot { - // the new snapshot is older than the last snapshot, so ignore it - log::debug!( - "rejecting snapshot for version {}: newer snapshot already exists or no such version", - version_id - ); + // Return NotFound if an AddVersion with this parent_version_id would succeed, and + // otherwise return Gone. + // + // AddVersion will succeed if either + // - the requested parent version is the latest version; or + // - there is no latest version, meaning there are no versions stored for this client + Ok( + if client.latest_version_id == parent_version_id + || client.latest_version_id == NIL_VERSION_ID + { + GetVersionResult::NotFound + } else { + GetVersionResult::Gone + }, + ) + } + + /// Implementation of the AddVersion protocol transaction + pub fn add_version( + &self, + client_id: ClientId, + parent_version_id: VersionId, + history_segment: HistorySegment, + ) -> Result<(AddVersionResult, SnapshotUrgency), ServerError> { + log::debug!("add_version(client_id: {client_id}, parent_version_id: {parent_version_id})"); + + let mut txn = self.storage.txn()?; + let client = txn + .get_client(client_id)? + .ok_or(ServerError::NoSuchClient)?; + + // check if this version is acceptable, under the protection of the transaction + if client.latest_version_id != NIL_VERSION_ID + && parent_version_id != client.latest_version_id + { + log::debug!("add_version request rejected: mismatched latest_version_id"); + return Ok(( + AddVersionResult::ExpectedParentVersion(client.latest_version_id), + SnapshotUrgency::None, + )); + } + + // invent a version ID + let version_id = Uuid::new_v4(); + log::debug!("add_version request accepted: new version_id: {version_id}"); + + // update the DB + txn.add_version(client_id, version_id, parent_version_id, history_segment)?; + txn.commit()?; + + // calculate the urgency + let time_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { timestamp, .. }) => { + SnapshotUrgency::for_days(&self.config, (Utc::now() - timestamp).num_days()) + } + }; + + let version_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { versions_since, .. }) => { + SnapshotUrgency::for_versions_since(&self.config, versions_since) + } + }; + + Ok(( + AddVersionResult::Ok(version_id), + std::cmp::max(time_urgency, version_urgency), + )) + } + + /// Implementation of the AddSnapshot protocol transaction + pub fn add_snapshot( + &self, + client_id: ClientId, + version_id: VersionId, + data: Vec, + ) -> Result<(), ServerError> { + log::debug!("add_snapshot(client_id: {client_id}, version_id: {version_id})"); + + let mut txn = self.storage.txn()?; + let client = txn + .get_client(client_id)? + .ok_or(ServerError::NoSuchClient)?; + + // NOTE: if the snapshot is rejected, this function logs about it and returns + // Ok(()), as there's no reason to report an errot to the client / user. + + let last_snapshot = client.snapshot.map(|snap| snap.version_id); + if Some(version_id) == last_snapshot { + log::debug!("rejecting snapshot for version {version_id}: already exists"); return Ok(()); } - search_len -= 1; - if search_len <= 0 || vid == NIL_VERSION_ID { - // this should not happen in normal operation, so warn about it - log::warn!( - "rejecting snapshot for version {}: version is too old or no such version", - version_id - ); - return Ok(()); + // look for this version in the history of this client, starting at the latest version, and + // only iterating for a limited number of versions. + let mut search_len = SNAPSHOT_SEARCH_LEN; + let mut vid = client.latest_version_id; + + loop { + if vid == version_id && version_id != NIL_VERSION_ID { + // the new snapshot is for a recent version, so proceed + break; + } + + if Some(vid) == last_snapshot { + // the new snapshot is older than the last snapshot, so ignore it + log::debug!("rejecting snapshot for version {version_id}: newer snapshot already exists or no such version"); + return Ok(()); + } + + search_len -= 1; + if search_len <= 0 || vid == NIL_VERSION_ID { + // this should not happen in normal operation, so warn about it + log::warn!("rejecting snapshot for version {version_id}: version is too old or no such version"); + return Ok(()); + } + + // get the parent version ID + if let Some(parent) = txn.get_version(client_id, vid)? { + vid = parent.parent_version_id; + } else { + // this version does not exist; "this should not happen" but if it does, + // we don't need a snapshot earlier than the missing version. + log::warn!("rejecting snapshot for version {version_id}: newer versions have already been deleted"); + return Ok(()); + } } - // get the parent version ID - if let Some(parent) = txn.get_version(client_id, vid)? { - vid = parent.parent_version_id; + log::debug!("accepting snapshot for version {version_id}"); + txn.set_snapshot( + client_id, + Snapshot { + version_id, + timestamp: Utc::now(), + versions_since: 0, + }, + data, + )?; + txn.commit()?; + Ok(()) + } + + /// Implementation of the GetSnapshot protocol transaction + pub fn get_snapshot( + &self, + client_id: ClientId, + ) -> Result)>, ServerError> { + let mut txn = self.storage.txn()?; + let client = txn + .get_client(client_id)? + .ok_or(ServerError::NoSuchClient)?; + + Ok(if let Some(snap) = client.snapshot { + txn.get_snapshot_data(client_id, snap.version_id)? + .map(|data| (snap.version_id, data)) } else { - // this version does not exist; "this should not happen" but if it does, - // we don't need a snapshot earlier than the missing version. - log::warn!( - "rejecting snapshot for version {}: newer versions have already been deleted", - version_id - ); - return Ok(()); - } + None + }) } - log::debug!("accepting snapshot for version {}", version_id); - txn.set_snapshot( - client_id, - Snapshot { - version_id, - timestamp: Utc::now(), - versions_since: 0, - }, - data, - )?; - txn.commit()?; - Ok(()) -} - -/// Implementation of the GetSnapshot protocol transaction -pub fn get_snapshot<'a>( - mut txn: Box, - _config: &ServerConfig, - client_id: ClientId, - client: Client, -) -> anyhow::Result)>> { - Ok(if let Some(snap) = client.snapshot { - txn.get_snapshot_data(client_id, snap.version_id)? - .map(|data| (snap.version_id, data)) - } else { - None - }) + /// Convenience method to get a transaction for the embedded storage. + pub fn txn(&self) -> Result, ServerError> { + Ok(self.storage.txn()?) + } } #[cfg(test)] mod test { use super::*; use crate::inmemory::InMemoryStorage; - use crate::storage::{Snapshot, Storage}; + use crate::storage::{Snapshot, Storage, StorageTxn}; use chrono::{Duration, TimeZone, Utc}; use pretty_assertions::assert_eq; - fn init_logging() { + fn setup(init: INIT) -> anyhow::Result<(Server, RES)> + where + INIT: FnOnce(&mut dyn StorageTxn) -> anyhow::Result, + { let _ = env_logger::builder().is_test(true).try_init(); + let storage = InMemoryStorage::new(); + let res = init(storage.txn()?.as_mut())?; + Ok((Server::new(ServerConfig::default(), storage), res)) + } + + /// Utility setup function for add_version tests + fn av_setup( + num_versions: u32, + snapshot_version: Option, + snapshot_days_ago: Option, + ) -> anyhow::Result<(Server, Uuid, Vec)> { + let (server, (client_id, versions)) = setup(|txn| { + let client_id = Uuid::new_v4(); + let mut versions = vec![]; + + let mut version_id = Uuid::nil(); + txn.new_client(client_id, Uuid::nil())?; + debug_assert!(num_versions < u8::MAX.into()); + for vnum in 0..num_versions { + let parent_version_id = version_id; + version_id = Uuid::new_v4(); + versions.push(version_id); + txn.add_version( + client_id, + version_id, + parent_version_id, + // Generate some unique data for this version. + vec![0, 0, vnum as u8], + )?; + if Some(vnum) == snapshot_version { + txn.set_snapshot( + client_id, + Snapshot { + version_id, + versions_since: 0, + timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)), + }, + // Generate some unique data for this snapshot. + vec![vnum as u8], + )?; + } + } + + Ok((client_id, versions)) + })?; + Ok((server, client_id, versions)) + } + + /// Utility function to check the results of an add_version call + fn av_success_check( + server: &Server, + client_id: Uuid, + existing_versions: &[Uuid], + (add_version_result, snapshot_urgency): (AddVersionResult, SnapshotUrgency), + expected_history: Vec, + expected_urgency: SnapshotUrgency, + ) -> anyhow::Result<()> { + if let AddVersionResult::Ok(new_version_id) = add_version_result { + // check that it invented a new version ID + for v in existing_versions { + assert_ne!(&new_version_id, v); + } + + // verify that the storage was updated + let mut txn = server.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!(client.latest_version_id, new_version_id); + + let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil); + let version = txn.get_version(client_id, new_version_id)?.unwrap(); + assert_eq!(version.version_id, new_version_id); + assert_eq!(version.parent_version_id, parent_version_id); + assert_eq!(version.history_segment, expected_history); + } else { + panic!("did not get Ok from add_version: {:?}", add_version_result); + } + + assert_eq!(snapshot_urgency, expected_urgency); + + Ok(()) } #[test] @@ -347,23 +434,15 @@ mod test { #[test] fn get_child_version_not_found_initial_nil() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - txn.new_client(client_id, NIL_VERSION_ID)?; + let (server, client_id) = setup(|txn| { + let client_id = Uuid::new_v4(); + txn.new_client(client_id, NIL_VERSION_ID)?; + Ok(client_id) + })?; // when no latest version exists, the first version is NotFound - let client = txn.get_client(client_id)?.unwrap(); assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - NIL_VERSION_ID - )?, + server.get_child_version(client_id, NIL_VERSION_ID)?, GetVersionResult::NotFound ); Ok(()) @@ -371,25 +450,17 @@ mod test { #[test] fn get_child_version_not_found_initial_continuing() -> anyhow::Result<()> { - init_logging(); + let (server, client_id) = setup(|txn| { + let client_id = Uuid::new_v4(); + txn.new_client(client_id, NIL_VERSION_ID)?; - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - - txn.new_client(client_id, NIL_VERSION_ID)?; + Ok(client_id) + })?; // when no latest version exists, _any_ child version is NOT_FOUND. This allows syncs to // start to a new server even if the client already has been uploading to another service. - let client = txn.get_client(client_id)?.unwrap(); assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - Uuid::new_v4(), - )?, + server.get_child_version(client_id, Uuid::new_v4(),)?, GetVersionResult::NotFound ); Ok(()) @@ -397,26 +468,18 @@ mod test { #[test] fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> { - init_logging(); + let (server, (client_id, parent_version_id)) = setup(|txn| { + // add a parent version, but not the requested child version + let client_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + txn.new_client(client_id, parent_version_id)?; + txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); + Ok((client_id, parent_version_id)) + })?; - // add a parent version, but not the requested child version - let parent_version_id = Uuid::new_v4(); - txn.new_client(client_id, parent_version_id)?; - txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; - - let client = txn.get_client(client_id)?.unwrap(); assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - parent_version_id - )?, + server.get_child_version(client_id, parent_version_id)?, GetVersionResult::NotFound ); Ok(()) @@ -424,26 +487,19 @@ mod test { #[test] fn get_child_version_gone_not_latest() -> anyhow::Result<()> { - init_logging(); + let (server, client_id) = setup(|txn| { + let client_id = Uuid::new_v4(); - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); + // Add a parent version, but not the requested parent version + let parent_version_id = Uuid::new_v4(); + txn.new_client(client_id, parent_version_id)?; + txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; - // Add a parent version, but not the requested parent version - let parent_version_id = Uuid::new_v4(); - txn.new_client(client_id, parent_version_id)?; - txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; + Ok(client_id) + })?; - let client = txn.get_client(client_id)?.unwrap(); assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - Uuid::new_v4(), - )?, + server.get_child_version(client_id, Uuid::new_v4(),)?, GetVersionResult::Gone ); Ok(()) @@ -451,32 +507,24 @@ mod test { #[test] fn get_child_version_found() -> anyhow::Result<()> { - init_logging(); + let (server, (client_id, version_id, parent_version_id, history_segment)) = setup(|txn| { + let client_id = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + let history_segment = b"abcd".to_vec(); - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id = Uuid::new_v4(); - let parent_version_id = Uuid::new_v4(); - let history_segment = b"abcd".to_vec(); - - txn.new_client(client_id, version_id)?; - txn.add_version( - client_id, - version_id, - parent_version_id, - history_segment.clone(), - )?; - - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), + txn.new_client(client_id, version_id)?; + txn.add_version( client_id, - client, - parent_version_id - )?, + version_id, + parent_version_id, + history_segment.clone(), + )?; + + Ok((client_id, version_id, parent_version_id, history_segment)) + })?; + assert_eq!( + server.get_child_version(client_id, parent_version_id)?, GetVersionResult::Success { version_id, parent_version_id, @@ -486,104 +534,18 @@ mod test { Ok(()) } - /// Utility setup function for add_version tests - fn av_setup( - storage: &InMemoryStorage, - num_versions: u32, - snapshot_version: Option, - snapshot_days_ago: Option, - ) -> anyhow::Result<(Uuid, Vec)> { - init_logging(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let mut versions = vec![]; - - let mut version_id = Uuid::nil(); - txn.new_client(client_id, Uuid::nil())?; - for vnum in 0..num_versions { - let parent_version_id = version_id; - version_id = Uuid::new_v4(); - versions.push(version_id); - txn.add_version( - client_id, - version_id, - parent_version_id, - vec![0, 0, vnum as u8], - )?; - if Some(vnum) == snapshot_version { - txn.set_snapshot( - client_id, - Snapshot { - version_id, - versions_since: 0, - timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)), - }, - vec![vnum as u8], - )?; - } - } - - Ok((client_id, versions)) - } - - /// Utility function to check the results of an add_version call - fn av_success_check( - storage: &InMemoryStorage, - client_id: Uuid, - existing_versions: &[Uuid], - result: (AddVersionResult, SnapshotUrgency), - expected_history: Vec, - expected_urgency: SnapshotUrgency, - ) -> anyhow::Result<()> { - if let AddVersionResult::Ok(new_version_id) = result.0 { - // check that it invented a new version ID - for v in existing_versions { - assert_ne!(&new_version_id, v); - } - - // verify that the storage was updated - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!(client.latest_version_id, new_version_id); - - let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil); - let version = txn.get_version(client_id, new_version_id)?.unwrap(); - assert_eq!(version.version_id, new_version_id); - assert_eq!(version.parent_version_id, parent_version_id); - assert_eq!(version.history_segment, expected_history); - } else { - panic!("did not get Ok from add_version: {:?}", result); - } - - assert_eq!(result.1, expected_urgency); - - Ok(()) - } - #[test] fn add_version_conflict() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 3, None, None)?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let (server, client_id, versions) = av_setup(3, None, None)?; // try to add a child of a version other than the latest assert_eq!( - add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[1], - vec![3, 6, 9] - )? - .0, + server.add_version(client_id, versions[1], vec![3, 6, 9])?.0, AddVersionResult::ExpectedParentVersion(versions[2]) ); // verify that the storage wasn't updated - txn = storage.txn()?; + let mut txn = server.txn()?; assert_eq!( txn.get_client(client_id)?.unwrap().latest_version_id, versions[2] @@ -595,23 +557,12 @@ mod test { #[test] fn add_version_with_existing_history() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 1, None, None)?; + let (server, client_id, versions) = av_setup(1, None, None)?; - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[0], - vec![3, 6, 9], - )?; + let result = server.add_version(client_id, versions[0], vec![3, 6, 9])?; av_success_check( - &storage, + &server, client_id, &versions, result, @@ -625,24 +576,13 @@ mod test { #[test] fn add_version_with_no_history() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 0, None, None)?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let (server, client_id, versions) = av_setup(0, None, None)?; let parent_version_id = Uuid::nil(); - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - parent_version_id, - vec![3, 6, 9], - )?; + let result = server.add_version(client_id, parent_version_id, vec![3, 6, 9])?; av_success_check( - &storage, + &server, client_id, &versions, result, @@ -656,23 +596,12 @@ mod test { #[test] fn add_version_success_recent_snapshot() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 1, Some(0), None)?; + let (server, client_id, versions) = av_setup(1, Some(0), None)?; - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[0], - vec![1, 2, 3], - )?; + let result = server.add_version(client_id, versions[0], vec![1, 2, 3])?; av_success_check( - &storage, + &server, client_id, &versions, result, @@ -686,24 +615,13 @@ mod test { #[test] fn add_version_success_aged_snapshot() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); // one snapshot, but it was 50 days ago - let (client_id, versions) = av_setup(&storage, 1, Some(0), Some(50))?; + let (server, client_id, versions) = av_setup(1, Some(0), Some(50))?; - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[0], - vec![1, 2, 3], - )?; + let result = server.add_version(client_id, versions[0], vec![1, 2, 3])?; av_success_check( - &storage, + &server, client_id, &versions, result, @@ -717,27 +635,14 @@ mod test { #[test] fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); // one snapshot, but it was 50 versions ago - let (client_id, versions) = av_setup(&storage, 50, Some(0), None)?; + let (mut server, client_id, versions) = av_setup(50, Some(0), None)?; + server.config.snapshot_versions = 30; - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig { - snapshot_versions: 30, - ..ServerConfig::default() - }, - client_id, - client, - versions[49], - vec![1, 2, 3], - )?; + let result = server.add_version(client_id, versions[49], vec![1, 2, 3])?; av_success_check( - &storage, + &server, client_id, &versions, result, @@ -751,30 +656,21 @@ mod test { #[test] fn add_snapshot_success_latest() -> anyhow::Result<()> { - init_logging(); + let (server, (client_id, version_id)) = setup(|txn| { + let client_id = Uuid::new_v4(); + let version_id = Uuid::new_v4(); - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id = Uuid::new_v4(); + // set up a task DB with one version in it + txn.new_client(client_id, version_id)?; + txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?; - // set up a task DB with one version in it - txn.new_client(client_id, version_id)?; - txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?; - - // add a snapshot for that version - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_id, - vec![1, 2, 3], - )?; + // add a snapshot for that version + Ok((client_id, version_id)) + })?; + server.add_snapshot(client_id, version_id, vec![1, 2, 3])?; // verify the snapshot - let mut txn = storage.txn()?; + let mut txn = server.txn()?; let client = txn.get_client(client_id)?.unwrap(); let snapshot = client.snapshot.unwrap(); assert_eq!(snapshot.version_id, version_id); @@ -789,32 +685,23 @@ mod test { #[test] fn add_snapshot_success_older() -> anyhow::Result<()> { - init_logging(); + let (server, (client_id, version_id_1)) = setup(|txn| { + let client_id = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id_1 = Uuid::new_v4(); - let version_id_2 = Uuid::new_v4(); - - // set up a task DB with two versions in it - txn.new_client(client_id, version_id_2)?; - txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; - txn.add_version(client_id, version_id_2, version_id_1, vec![])?; + // set up a task DB with two versions in it + txn.new_client(client_id, version_id_2)?; + txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; + txn.add_version(client_id, version_id_2, version_id_1, vec![])?; + Ok((client_id, version_id_1)) + })?; // add a snapshot for version 1 - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_id_1, - vec![1, 2, 3], - )?; + server.add_snapshot(client_id, version_id_1, vec![1, 2, 3])?; // verify the snapshot - let mut txn = storage.txn()?; + let mut txn = server.txn()?; let client = txn.get_client(client_id)?.unwrap(); let snapshot = client.snapshot.unwrap(); assert_eq!(snapshot.version_id, version_id_1); @@ -829,33 +716,25 @@ mod test { #[test] fn add_snapshot_fails_no_such() -> anyhow::Result<()> { - init_logging(); + let (server, client_id) = setup(|txn| { + let client_id = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id_1 = Uuid::new_v4(); - let version_id_2 = Uuid::new_v4(); + // set up a task DB with two versions in it + txn.new_client(client_id, version_id_2)?; + txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; + txn.add_version(client_id, version_id_2, version_id_1, vec![])?; - // set up a task DB with two versions in it - txn.new_client(client_id, version_id_2)?; - txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; - txn.add_version(client_id, version_id_2, version_id_1, vec![])?; + // add a snapshot for unknown version + Ok(client_id) + })?; - // add a snapshot for unknown version - let client = txn.get_client(client_id)?.unwrap(); let version_id_unk = Uuid::new_v4(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_id_unk, - vec![1, 2, 3], - )?; + server.add_snapshot(client_id, version_id_unk, vec![1, 2, 3])?; // verify the snapshot does not exist - let mut txn = storage.txn()?; + let mut txn = server.txn()?; let client = txn.get_client(client_id)?.unwrap(); assert!(client.snapshot.is_none()); @@ -864,37 +743,28 @@ mod test { #[test] fn add_snapshot_fails_too_old() -> anyhow::Result<()> { - init_logging(); + let (server, (client_id, version_ids)) = setup(|txn| { + let client_id = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let mut version_id = Uuid::new_v4(); - let mut parent_version_id = Uuid::nil(); - let mut version_ids = vec![]; + // set up a task DB with 10 versions in it (oldest to newest) + txn.new_client(client_id, Uuid::nil())?; + for _ in 0..10 { + txn.add_version(client_id, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } - // set up a task DB with 10 versions in it (oldest to newest) - txn.new_client(client_id, Uuid::nil())?; - for _ in 0..10 { - txn.add_version(client_id, version_id, parent_version_id, vec![])?; - version_ids.push(version_id); - parent_version_id = version_id; - version_id = Uuid::new_v4(); - } - - // add a snapshot for the earliest of those - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_ids[0], - vec![1, 2, 3], - )?; + // add a snapshot for the earliest of those + Ok((client_id, version_ids)) + })?; + server.add_snapshot(client_id, version_ids[0], vec![1, 2, 3])?; // verify the snapshot does not exist - let mut txn = storage.txn()?; + let mut txn = server.txn()?; let client = txn.get_client(client_id)?.unwrap(); assert!(client.snapshot.is_none()); @@ -903,47 +773,39 @@ mod test { #[test] fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> { - init_logging(); + let (server, (client_id, version_ids)) = setup(|txn| { + let client_id = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let mut version_id = Uuid::new_v4(); - let mut parent_version_id = Uuid::nil(); - let mut version_ids = vec![]; + // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the + // middle one + txn.new_client(client_id, Uuid::nil())?; + for _ in 0..5 { + txn.add_version(client_id, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } + txn.set_snapshot( + client_id, + Snapshot { + version_id: version_ids[2], + versions_since: 2, + timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), + }, + vec![1, 2, 3], + )?; - // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the middle - // one - txn.new_client(client_id, Uuid::nil())?; - for _ in 0..5 { - txn.add_version(client_id, version_id, parent_version_id, vec![])?; - version_ids.push(version_id); - parent_version_id = version_id; - version_id = Uuid::new_v4(); - } - txn.set_snapshot( - client_id, - Snapshot { - version_id: version_ids[2], - versions_since: 2, - timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), - }, - vec![1, 2, 3], - )?; + // add a snapshot for the earliest of those + Ok((client_id, version_ids)) + })?; - // add a snapshot for the earliest of those - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_ids[0], - vec![9, 9, 9], - )?; + server.add_snapshot(client_id, version_ids[0], vec![9, 9, 9])?; // verify the snapshot was not replaced - let mut txn = storage.txn()?; + let mut txn = server.txn()?; let client = txn.get_client(client_id)?.unwrap(); let snapshot = client.snapshot.unwrap(); assert_eq!(snapshot.version_id, version_ids[2]); @@ -958,28 +820,20 @@ mod test { #[test] fn add_snapshot_fails_nil_version() -> anyhow::Result<()> { - init_logging(); + let (server, client_id) = setup(|txn| { + let client_id = Uuid::new_v4(); - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); + // just set up the client + txn.new_client(client_id, NIL_VERSION_ID)?; - // just set up the client - txn.new_client(client_id, NIL_VERSION_ID)?; + // add a snapshot for the nil version + Ok(client_id) + })?; - // add a snapshot for the nil version - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - NIL_VERSION_ID, - vec![9, 9, 9], - )?; + server.add_snapshot(client_id, NIL_VERSION_ID, vec![9, 9, 9])?; // verify the snapshot does not exist - let mut txn = storage.txn()?; + let mut txn = server.txn()?; let client = txn.get_client(client_id)?.unwrap(); assert!(client.snapshot.is_none()); @@ -988,28 +842,25 @@ mod test { #[test] fn get_snapshot_found() -> anyhow::Result<()> { - init_logging(); + let (server, (client_id, data, snapshot_version_id)) = setup(|txn| { + let client_id = Uuid::new_v4(); + let data = vec![1, 2, 3]; + let snapshot_version_id = Uuid::new_v4(); - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let data = vec![1, 2, 3]; - let snapshot_version_id = Uuid::new_v4(); - - txn.new_client(client_id, snapshot_version_id)?; - txn.set_snapshot( - client_id, - Snapshot { - version_id: snapshot_version_id, - versions_since: 3, - timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), - }, - data.clone(), - )?; - - let client = txn.get_client(client_id)?.unwrap(); + txn.new_client(client_id, snapshot_version_id)?; + txn.set_snapshot( + client_id, + Snapshot { + version_id: snapshot_version_id, + versions_since: 3, + timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), + }, + data.clone(), + )?; + Ok((client_id, data, snapshot_version_id)) + })?; assert_eq!( - get_snapshot(txn, &ServerConfig::default(), client_id, client)?, + server.get_snapshot(client_id)?, Some((snapshot_version_id, data)) ); @@ -1018,19 +869,13 @@ mod test { #[test] fn get_snapshot_not_found() -> anyhow::Result<()> { - init_logging(); + let (server, client_id) = setup(|txn| { + let client_id = Uuid::new_v4(); + txn.new_client(client_id, NIL_VERSION_ID)?; + Ok(client_id) + })?; - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - - txn.new_client(client_id, NIL_VERSION_ID)?; - let client = txn.get_client(client_id)?.unwrap(); - - assert_eq!( - get_snapshot(txn, &ServerConfig::default(), client_id, client)?, - None - ); + assert_eq!(server.get_snapshot(client_id)?, None); Ok(()) } diff --git a/server/src/api/add_snapshot.rs b/server/src/api/add_snapshot.rs index 9f0e249..b5d93cf 100644 --- a/server/src/api/add_snapshot.rs +++ b/server/src/api/add_snapshot.rs @@ -1,8 +1,8 @@ -use crate::api::{client_id_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE}; +use crate::api::{client_id_header, server_error_to_actix, ServerState, SNAPSHOT_CONTENT_TYPE}; use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; use futures::StreamExt; use std::sync::Arc; -use taskchampion_sync_server_core::{add_snapshot, VersionId, NIL_VERSION_ID}; +use taskchampion_sync_server_core::VersionId; /// Max snapshot size: 100MB const MAX_SIZE: usize = 100 * 1024 * 1024; @@ -46,48 +46,27 @@ pub(crate) async fn service( return Err(error::ErrorBadRequest("No snapshot supplied")); } - // note that we do not open the transaction until the body has been read - // completely, to avoid blocking other storage access while that data is - // in transit. - let mut txn = server_state.storage.txn().map_err(failure_to_ise)?; - - // get, or create, the client - let client = match txn.get_client(client_id).map_err(failure_to_ise)? { - Some(client) => client, - None => { - txn.new_client(client_id, NIL_VERSION_ID) - .map_err(failure_to_ise)?; - txn.get_client(client_id).map_err(failure_to_ise)?.unwrap() - } - }; - - add_snapshot( - txn, - &server_state.config, - client_id, - client, - version_id, - body.to_vec(), - ) - .map_err(failure_to_ise)?; + server_state + .server + .add_snapshot(client_id, version_id, body.to_vec()) + .map_err(server_error_to_actix)?; Ok(HttpResponse::Ok().body("")) } #[cfg(test)] mod test { - use super::*; use crate::api::CLIENT_ID_HEADER; - use crate::Server; + use crate::WebServer; use actix_web::{http::StatusCode, test, App}; use pretty_assertions::assert_eq; - use taskchampion_sync_server_core::{InMemoryStorage, Storage}; + use taskchampion_sync_server_core::{InMemoryStorage, Storage, NIL_VERSION_ID}; use uuid::Uuid; #[actix_rt::test] async fn test_success() -> anyhow::Result<()> { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // set up the storage contents.. { @@ -96,7 +75,7 @@ mod test { txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?; } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -130,7 +109,7 @@ mod test { async fn test_not_added_200() { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // set up the storage contents.. { @@ -138,7 +117,7 @@ mod test { txn.new_client(client_id, NIL_VERSION_ID).unwrap(); } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -167,8 +146,8 @@ mod test { async fn test_bad_content_type() { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); - let server = Server::new(Default::default(), storage); + let storage = InMemoryStorage::new(); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -187,8 +166,8 @@ mod test { async fn test_empty_body() { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); - let server = Server::new(Default::default(), storage); + let storage = InMemoryStorage::new(); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; diff --git a/server/src/api/add_version.rs b/server/src/api/add_version.rs index f416753..82dd89c 100644 --- a/server/src/api/add_version.rs +++ b/server/src/api/add_version.rs @@ -1,12 +1,13 @@ use crate::api::{ - client_id_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, - PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, VERSION_ID_HEADER, + client_id_header, failure_to_ise, server_error_to_actix, ServerState, + HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, + VERSION_ID_HEADER, }; use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; use futures::StreamExt; use std::sync::Arc; use taskchampion_sync_server_core::{ - add_version, AddVersionResult, SnapshotUrgency, VersionId, NIL_VERSION_ID, + AddVersionResult, ServerError, SnapshotUrgency, VersionId, NIL_VERSION_ID, }; /// Max history segment size: 100MB @@ -56,58 +57,47 @@ pub(crate) async fn service( return Err(error::ErrorBadRequest("Empty body")); } - // note that we do not open the transaction until the body has been read - // completely, to avoid blocking other storage access while that data is - // in transit. - let mut txn = server_state.storage.txn().map_err(failure_to_ise)?; - - // get, or create, the client - let client = match txn.get_client(client_id).map_err(failure_to_ise)? { - Some(client) => client, - None => { - txn.new_client(client_id, NIL_VERSION_ID) - .map_err(failure_to_ise)?; - txn.get_client(client_id).map_err(failure_to_ise)?.unwrap() - } - }; - - let (result, snap_urgency) = add_version( - txn, - &server_state.config, - client_id, - client, - parent_version_id, - body.to_vec(), - ) - .map_err(failure_to_ise)?; - - Ok(match result { - AddVersionResult::Ok(version_id) => { - let mut rb = HttpResponse::Ok(); - rb.append_header((VERSION_ID_HEADER, version_id.to_string())); - match snap_urgency { - SnapshotUrgency::None => {} - SnapshotUrgency::Low => { - rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=low")); - } - SnapshotUrgency::High => { - rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=high")); - } - }; - rb.finish() - } - AddVersionResult::ExpectedParentVersion(parent_version_id) => { - let mut rb = HttpResponse::Conflict(); - rb.append_header((PARENT_VERSION_ID_HEADER, parent_version_id.to_string())); - rb.finish() - } - }) + loop { + return match server_state + .server + .add_version(client_id, parent_version_id, body.to_vec()) + { + Ok((AddVersionResult::Ok(version_id), snap_urgency)) => { + let mut rb = HttpResponse::Ok(); + rb.append_header((VERSION_ID_HEADER, version_id.to_string())); + match snap_urgency { + SnapshotUrgency::None => {} + SnapshotUrgency::Low => { + rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=low")); + } + SnapshotUrgency::High => { + rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=high")); + } + }; + Ok(rb.finish()) + } + Ok((AddVersionResult::ExpectedParentVersion(parent_version_id), _)) => { + let mut rb = HttpResponse::Conflict(); + rb.append_header((PARENT_VERSION_ID_HEADER, parent_version_id.to_string())); + Ok(rb.finish()) + } + Err(ServerError::NoSuchClient) => { + // Create a new client and repeat the `add_version` call. + let mut txn = server_state.server.txn().map_err(server_error_to_actix)?; + txn.new_client(client_id, NIL_VERSION_ID) + .map_err(failure_to_ise)?; + txn.commit().map_err(failure_to_ise)?; + continue; + } + Err(e) => Err(server_error_to_actix(e)), + }; + } } #[cfg(test)] mod test { use crate::api::CLIENT_ID_HEADER; - use crate::Server; + use crate::WebServer; use actix_web::{http::StatusCode, test, App}; use pretty_assertions::assert_eq; use taskchampion_sync_server_core::{InMemoryStorage, Storage}; @@ -118,7 +108,7 @@ mod test { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // set up the storage contents.. { @@ -126,7 +116,7 @@ mod test { txn.new_client(client_id, Uuid::nil()).unwrap(); } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -155,12 +145,55 @@ mod test { assert_eq!(resp.headers().get("X-Parent-Version-Id"), None); } + #[actix_rt::test] + async fn test_auto_add_client() { + let client_id = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + let server = WebServer::new(Default::default(), InMemoryStorage::new()); + let app = App::new().configure(|sc| server.config(sc)); + let app = test::init_service(app).await; + + let uri = format!("/v1/client/add-version/{}", parent_version_id); + let req = test::TestRequest::post() + .uri(&uri) + .append_header(( + "Content-Type", + "application/vnd.taskchampion.history-segment", + )) + .append_header((CLIENT_ID_HEADER, client_id.to_string())) + .set_payload(b"abcd".to_vec()) + .to_request(); + let resp = test::call_service(&app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + // the returned version ID is random, but let's check that it's not + // the passed parent version ID, at least + let new_version_id = resp.headers().get("X-Version-Id").unwrap(); + let new_version_id = Uuid::parse_str(new_version_id.to_str().unwrap()).unwrap(); + assert!(new_version_id != version_id); + + // Shapshot should be requested, since there is no existing snapshot + let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap(); + assert_eq!(snapshot_request, "urgency=high"); + + assert_eq!(resp.headers().get("X-Parent-Version-Id"), None); + + // Check that the client really was created + { + let mut txn = server.server_state.server.txn().unwrap(); + let client = txn.get_client(client_id).unwrap().unwrap(); + assert_eq!(client.latest_version_id, new_version_id); + assert_eq!(client.snapshot, None); + } + } + #[actix_rt::test] async fn test_conflict() { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // set up the storage contents.. { @@ -168,7 +201,7 @@ mod test { txn.new_client(client_id, version_id).unwrap(); } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -195,8 +228,8 @@ mod test { async fn test_bad_content_type() { let client_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); - let server = Server::new(Default::default(), storage); + let storage = InMemoryStorage::new(); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -215,8 +248,8 @@ mod test { async fn test_empty_body() { let client_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); - let server = Server::new(Default::default(), storage); + let storage = InMemoryStorage::new(); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; diff --git a/server/src/api/get_child_version.rs b/server/src/api/get_child_version.rs index d73f0fc..f1b0a0a 100644 --- a/server/src/api/get_child_version.rs +++ b/server/src/api/get_child_version.rs @@ -1,10 +1,10 @@ use crate::api::{ - client_id_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, + client_id_header, server_error_to_actix, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER, }; use actix_web::{error, get, web, HttpRequest, HttpResponse, Result}; use std::sync::Arc; -use taskchampion_sync_server_core::{get_child_version, GetVersionResult, VersionId}; +use taskchampion_sync_server_core::{GetVersionResult, ServerError, VersionId}; /// Get a child version. /// @@ -21,43 +21,35 @@ pub(crate) async fn service( path: web::Path, ) -> Result { let parent_version_id = path.into_inner(); - - let mut txn = server_state.storage.txn().map_err(failure_to_ise)?; - let client_id = client_id_header(&req)?; - let client = txn - .get_client(client_id) - .map_err(failure_to_ise)? - .ok_or_else(|| error::ErrorNotFound("no such client"))?; - - return match get_child_version( - txn, - &server_state.config, - client_id, - client, - parent_version_id, - ) - .map_err(failure_to_ise)? + return match server_state + .server + .get_child_version(client_id, parent_version_id) { - GetVersionResult::Success { + Ok(GetVersionResult::Success { version_id, parent_version_id, history_segment, - } => Ok(HttpResponse::Ok() + }) => Ok(HttpResponse::Ok() .content_type(HISTORY_SEGMENT_CONTENT_TYPE) .append_header((VERSION_ID_HEADER, version_id.to_string())) .append_header((PARENT_VERSION_ID_HEADER, parent_version_id.to_string())) .body(history_segment)), - GetVersionResult::NotFound => Err(error::ErrorNotFound("no such version")), - GetVersionResult::Gone => Err(error::ErrorGone("version has been deleted")), + Ok(GetVersionResult::NotFound) => Err(error::ErrorNotFound("no such version")), + Ok(GetVersionResult::Gone) => Err(error::ErrorGone("version has been deleted")), + // Note that the HTTP client cannot differentiate `NotFound` and `NoSuchClient`, as both + // are a 404 NOT FOUND response. In either case, the HTTP client will typically attempt + // to add a new version, which may create the new client at the same time. + Err(ServerError::NoSuchClient) => Err(error::ErrorNotFound("no such client")), + Err(e) => Err(server_error_to_actix(e)), }; } #[cfg(test)] mod test { use crate::api::CLIENT_ID_HEADER; - use crate::Server; + use crate::WebServer; use actix_web::{http::StatusCode, test, App}; use pretty_assertions::assert_eq; use taskchampion_sync_server_core::{InMemoryStorage, Storage, NIL_VERSION_ID}; @@ -68,7 +60,7 @@ mod test { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // set up the storage contents.. { @@ -78,7 +70,7 @@ mod test { .unwrap(); } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -111,8 +103,8 @@ mod test { async fn test_client_not_found() { let client_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); - let server = Server::new(Default::default(), storage); + let storage = InMemoryStorage::new(); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -131,7 +123,7 @@ mod test { async fn test_version_not_found_and_gone() { let client_id = Uuid::new_v4(); let test_version_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // create the client and a single version. { @@ -140,7 +132,7 @@ mod test { txn.add_version(client_id, test_version_id, NIL_VERSION_ID, b"vers".to_vec()) .unwrap(); } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; diff --git a/server/src/api/get_snapshot.rs b/server/src/api/get_snapshot.rs index b179a4b..47a728d 100644 --- a/server/src/api/get_snapshot.rs +++ b/server/src/api/get_snapshot.rs @@ -1,9 +1,8 @@ use crate::api::{ - client_id_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE, VERSION_ID_HEADER, + client_id_header, server_error_to_actix, ServerState, SNAPSHOT_CONTENT_TYPE, VERSION_ID_HEADER, }; use actix_web::{error, get, web, HttpRequest, HttpResponse, Result}; use std::sync::Arc; -use taskchampion_sync_server_core::get_snapshot; /// Get a snapshot. /// @@ -18,17 +17,12 @@ pub(crate) async fn service( req: HttpRequest, server_state: web::Data>, ) -> Result { - let mut txn = server_state.storage.txn().map_err(failure_to_ise)?; - let client_id = client_id_header(&req)?; - let client = txn - .get_client(client_id) - .map_err(failure_to_ise)? - .ok_or_else(|| error::ErrorNotFound("no such client"))?; - - if let Some((version_id, data)) = - get_snapshot(txn, &server_state.config, client_id, client).map_err(failure_to_ise)? + if let Some((version_id, data)) = server_state + .server + .get_snapshot(client_id) + .map_err(server_error_to_actix)? { Ok(HttpResponse::Ok() .content_type(SNAPSHOT_CONTENT_TYPE) @@ -42,7 +36,7 @@ pub(crate) async fn service( #[cfg(test)] mod test { use crate::api::CLIENT_ID_HEADER; - use crate::Server; + use crate::WebServer; use actix_web::{http::StatusCode, test, App}; use chrono::{TimeZone, Utc}; use pretty_assertions::assert_eq; @@ -52,7 +46,7 @@ mod test { #[actix_rt::test] async fn test_not_found() { let client_id = Uuid::new_v4(); - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // set up the storage contents.. { @@ -60,7 +54,7 @@ mod test { txn.new_client(client_id, Uuid::new_v4()).unwrap(); } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -78,7 +72,7 @@ mod test { let client_id = Uuid::new_v4(); let version_id = Uuid::new_v4(); let snapshot_data = vec![1, 2, 3, 4]; - let storage: Box = Box::new(InMemoryStorage::new()); + let storage = InMemoryStorage::new(); // set up the storage contents.. { @@ -96,7 +90,7 @@ mod test { .unwrap(); } - let server = Server::new(Default::default(), storage); + let server = WebServer::new(Default::default(), storage); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index 4489369..25812da 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -1,5 +1,5 @@ -use actix_web::{error, http::StatusCode, web, HttpRequest, Result, Scope}; -use taskchampion_sync_server_core::{ClientId, ServerConfig, Storage}; +use actix_web::{error, web, HttpRequest, Result, Scope}; +use taskchampion_sync_server_core::{ClientId, Server, ServerError}; mod add_snapshot; mod add_version; @@ -27,8 +27,7 @@ pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request"; /// The type containing a reference to the persistent state for the server pub(crate) struct ServerState { - pub(crate) storage: Box, - pub(crate) config: ServerConfig, + pub(crate) server: Server, } pub(crate) fn api_scope() -> Scope { @@ -39,9 +38,17 @@ pub(crate) fn api_scope() -> Scope { .service(add_snapshot::service) } -/// Convert a failure::Error to an Actix ISE -fn failure_to_ise(err: anyhow::Error) -> impl actix_web::ResponseError { - error::InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR) +/// Convert a `anyhow::Error` to an Actix ISE +fn failure_to_ise(err: anyhow::Error) -> actix_web::Error { + error::ErrorInternalServerError(err) +} + +/// Convert a ServerError to an Actix error +fn server_error_to_actix(err: ServerError) -> actix_web::Error { + match err { + ServerError::NoSuchClient => error::ErrorNotFound(err), + ServerError::Other(err) => error::ErrorInternalServerError(err), + } } /// Get the client id diff --git a/server/src/bin/taskchampion-sync-server.rs b/server/src/bin/taskchampion-sync-server.rs index 23ccb78..1057b45 100644 --- a/server/src/bin/taskchampion-sync-server.rs +++ b/server/src/bin/taskchampion-sync-server.rs @@ -3,7 +3,7 @@ use actix_web::{middleware::Logger, App, HttpServer}; use clap::{arg, builder::ValueParser, value_parser, Command}; use std::ffi::OsString; -use taskchampion_sync_server::Server; +use taskchampion_sync_server::WebServer; use taskchampion_sync_server_core::ServerConfig; use taskchampion_sync_server_storage_sqlite::SqliteStorage; @@ -44,8 +44,11 @@ async fn main() -> anyhow::Result<()> { let snapshot_versions: u32 = *matches.get_one("snapshot-versions").unwrap(); let snapshot_days: i64 = *matches.get_one("snapshot-days").unwrap(); - let config = ServerConfig::from_args(snapshot_days, snapshot_versions)?; - let server = Server::new(config, Box::new(SqliteStorage::new(data_dir)?)); + let config = ServerConfig { + snapshot_days, + snapshot_versions, + }; + let server = WebServer::new(config, SqliteStorage::new(data_dir)?); log::info!("Serving on port {}", port); HttpServer::new(move || { @@ -67,7 +70,7 @@ mod test { #[actix_rt::test] async fn test_index_get() { - let server = Server::new(Default::default(), Box::new(InMemoryStorage::new())); + let server = WebServer::new(Default::default(), InMemoryStorage::new()); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; diff --git a/server/src/lib.rs b/server/src/lib.rs index 67c8956..2d81e58 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -5,7 +5,7 @@ mod api; use actix_web::{get, middleware, web, Responder}; use api::{api_scope, ServerState}; use std::sync::Arc; -use taskchampion_sync_server_core::{ServerConfig, Storage}; +use taskchampion_sync_server_core::{Server, ServerConfig, Storage}; #[get("/")] async fn index() -> impl Responder { @@ -14,15 +14,17 @@ async fn index() -> impl Responder { /// A Server represents a sync server. #[derive(Clone)] -pub struct Server { +pub struct WebServer { server_state: Arc, } -impl Server { +impl WebServer { /// Create a new sync server with the given storage implementation. - pub fn new(config: ServerConfig, storage: Box) -> Self { + pub fn new(config: ServerConfig, storage: ST) -> Self { Self { - server_state: Arc::new(ServerState { config, storage }), + server_state: Arc::new(ServerState { + server: Server::new(config, storage), + }), } } @@ -49,7 +51,7 @@ mod test { #[actix_rt::test] async fn test_cache_control() { - let server = Server::new(Default::default(), Box::new(InMemoryStorage::new())); + let server = WebServer::new(Default::default(), InMemoryStorage::new()); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; diff --git a/server/src/server.rs b/server/src/server.rs deleted file mode 100644 index e224679..0000000 --- a/server/src/server.rs +++ /dev/null @@ -1,1036 +0,0 @@ -//! This module implements the core logic of the server: handling transactions, upholding -//! invariants, and so on. This does not implement the HTTP-specific portions; those -//! are in [`crate::api`]. See the protocol documentation for details. -use crate::storage::{Client, Snapshot, StorageTxn}; -use chrono::Utc; -use uuid::Uuid; - -/// The distinguished value for "no version" -pub const NIL_VERSION_ID: VersionId = Uuid::nil(); - -/// Number of versions to search back from the latest to find the -/// version for a newly-added snapshot. Snapshots for versions older -/// than this will be rejected. -const SNAPSHOT_SEARCH_LEN: i32 = 5; - -pub(crate) type HistorySegment = Vec; -pub(crate) type ClientId = Uuid; -pub(crate) type VersionId = Uuid; - -/// ServerConfig contains configuration parameters for the server. -pub struct ServerConfig { - /// Target number of days between snapshots. - pub snapshot_days: i64, - - /// Target number of versions between snapshots. - pub snapshot_versions: u32, -} - -impl Default for ServerConfig { - fn default() -> Self { - ServerConfig { - snapshot_days: 14, - snapshot_versions: 100, - } - } -} - -impl ServerConfig { - pub fn from_args(snapshot_days: i64, snapshot_versions: u32) -> anyhow::Result { - Ok(ServerConfig { - snapshot_days, - snapshot_versions, - }) - } -} - -/// Response to get_child_version. See the protocol documentation. -#[derive(Clone, PartialEq, Debug)] -pub(crate) enum GetVersionResult { - NotFound, - Gone, - Success { - version_id: Uuid, - parent_version_id: Uuid, - history_segment: HistorySegment, - }, -} - -/// Implementation of the GetChildVersion protocol transaction -pub(crate) fn get_child_version<'a>( - mut txn: Box, - _config: &ServerConfig, - client_id: ClientId, - client: Client, - parent_version_id: VersionId, -) -> anyhow::Result { - // If a version with parentVersionId equal to the requested parentVersionId exists, it is returned. - if let Some(version) = txn.get_version_by_parent(client_id, parent_version_id)? { - return Ok(GetVersionResult::Success { - version_id: version.version_id, - parent_version_id: version.parent_version_id, - history_segment: version.history_segment, - }); - } - - // Return NotFound if an AddVersion with this parent_version_id would succeed, and otherwise - // return Gone. - // - // AddVersion will succeed if either - // - the requested parent version is the latest version; or - // - there is no latest version, meaning there are no versions stored for this client - Ok( - if client.latest_version_id == parent_version_id - || client.latest_version_id == NIL_VERSION_ID - { - GetVersionResult::NotFound - } else { - GetVersionResult::Gone - }, - ) -} - -/// Response to add_version -#[derive(Clone, PartialEq, Debug)] -pub(crate) enum AddVersionResult { - /// OK, version added with the given ID - Ok(VersionId), - /// Rejected; expected a version with the given parent version - ExpectedParentVersion(VersionId), -} - -/// Urgency of a snapshot for a client; used to create the `X-Snapshot-Request` header. -#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)] -pub(crate) enum SnapshotUrgency { - /// Don't need a snapshot right now. - None, - - /// A snapshot would be good, but can wait for other replicas to provide it. - Low, - - /// A snapshot is needed right now. - High, -} - -impl SnapshotUrgency { - /// Calculate the urgency for a snapshot based on its age in days - fn for_days(config: &ServerConfig, days: i64) -> Self { - if days >= config.snapshot_days * 3 / 2 { - SnapshotUrgency::High - } else if days >= config.snapshot_days { - SnapshotUrgency::Low - } else { - SnapshotUrgency::None - } - } - - /// Calculate the urgency for a snapshot based on its age in versions - fn for_versions_since(config: &ServerConfig, versions_since: u32) -> Self { - if versions_since >= config.snapshot_versions * 3 / 2 { - SnapshotUrgency::High - } else if versions_since >= config.snapshot_versions { - SnapshotUrgency::Low - } else { - SnapshotUrgency::None - } - } -} - -/// Implementation of the AddVersion protocol transaction -pub(crate) fn add_version<'a>( - mut txn: Box, - config: &ServerConfig, - client_id: ClientId, - client: Client, - parent_version_id: VersionId, - history_segment: HistorySegment, -) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { - log::debug!( - "add_version(client_id: {}, parent_version_id: {})", - client_id, - parent_version_id, - ); - - // check if this version is acceptable, under the protection of the transaction - if client.latest_version_id != NIL_VERSION_ID && parent_version_id != client.latest_version_id { - log::debug!("add_version request rejected: mismatched latest_version_id"); - return Ok(( - AddVersionResult::ExpectedParentVersion(client.latest_version_id), - SnapshotUrgency::None, - )); - } - - // invent a version ID - let version_id = Uuid::new_v4(); - log::debug!( - "add_version request accepted: new version_id: {}", - version_id - ); - - // update the DB - txn.add_version(client_id, version_id, parent_version_id, history_segment)?; - txn.commit()?; - - // calculate the urgency - let time_urgency = match client.snapshot { - None => SnapshotUrgency::High, - Some(Snapshot { timestamp, .. }) => { - SnapshotUrgency::for_days(config, (Utc::now() - timestamp).num_days()) - } - }; - - let version_urgency = match client.snapshot { - None => SnapshotUrgency::High, - Some(Snapshot { versions_since, .. }) => { - SnapshotUrgency::for_versions_since(config, versions_since) - } - }; - - Ok(( - AddVersionResult::Ok(version_id), - std::cmp::max(time_urgency, version_urgency), - )) -} - -/// Implementation of the AddSnapshot protocol transaction -pub(crate) fn add_snapshot<'a>( - mut txn: Box, - _config: &ServerConfig, - client_id: ClientId, - client: Client, - version_id: VersionId, - data: Vec, -) -> anyhow::Result<()> { - log::debug!( - "add_snapshot(client_id: {}, version_id: {})", - client_id, - version_id, - ); - - // NOTE: if the snapshot is rejected, this function logs about it and returns - // Ok(()), as there's no reason to report an errot to the client / user. - - let last_snapshot = client.snapshot.map(|snap| snap.version_id); - if Some(version_id) == last_snapshot { - log::debug!( - "rejecting snapshot for version {}: already exists", - version_id - ); - return Ok(()); - } - - // look for this version in the history of this client, starting at the latest version, and - // only iterating for a limited number of versions. - let mut search_len = SNAPSHOT_SEARCH_LEN; - let mut vid = client.latest_version_id; - - loop { - if vid == version_id && version_id != NIL_VERSION_ID { - // the new snapshot is for a recent version, so proceed - break; - } - - if Some(vid) == last_snapshot { - // the new snapshot is older than the last snapshot, so ignore it - log::debug!( - "rejecting snapshot for version {}: newer snapshot already exists or no such version", - version_id - ); - return Ok(()); - } - - search_len -= 1; - if search_len <= 0 || vid == NIL_VERSION_ID { - // this should not happen in normal operation, so warn about it - log::warn!( - "rejecting snapshot for version {}: version is too old or no such version", - version_id - ); - return Ok(()); - } - - // get the parent version ID - if let Some(parent) = txn.get_version(client_id, vid)? { - vid = parent.parent_version_id; - } else { - // this version does not exist; "this should not happen" but if it does, - // we don't need a snapshot earlier than the missing version. - log::warn!( - "rejecting snapshot for version {}: newer versions have already been deleted", - version_id - ); - return Ok(()); - } - } - - log::debug!("accepting snapshot for version {}", version_id); - txn.set_snapshot( - client_id, - Snapshot { - version_id, - timestamp: Utc::now(), - versions_since: 0, - }, - data, - )?; - txn.commit()?; - Ok(()) -} - -/// Implementation of the GetSnapshot protocol transaction -pub(crate) fn get_snapshot<'a>( - mut txn: Box, - _config: &ServerConfig, - client_id: ClientId, - client: Client, -) -> anyhow::Result)>> { - Ok(if let Some(snap) = client.snapshot { - txn.get_snapshot_data(client_id, snap.version_id)? - .map(|data| (snap.version_id, data)) - } else { - None - }) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::storage::{InMemoryStorage, Snapshot, Storage}; - use crate::test::init_logging; - use chrono::{Duration, TimeZone, Utc}; - use pretty_assertions::assert_eq; - - #[test] - fn snapshot_urgency_max() { - use SnapshotUrgency::*; - assert_eq!(std::cmp::max(None, None), None); - assert_eq!(std::cmp::max(None, Low), Low); - assert_eq!(std::cmp::max(None, High), High); - assert_eq!(std::cmp::max(Low, None), Low); - assert_eq!(std::cmp::max(Low, Low), Low); - assert_eq!(std::cmp::max(Low, High), High); - assert_eq!(std::cmp::max(High, None), High); - assert_eq!(std::cmp::max(High, Low), High); - assert_eq!(std::cmp::max(High, High), High); - } - - #[test] - fn snapshot_urgency_for_days() { - use SnapshotUrgency::*; - let config = ServerConfig::default(); - assert_eq!(SnapshotUrgency::for_days(&config, 0), None); - assert_eq!( - SnapshotUrgency::for_days(&config, config.snapshot_days), - Low - ); - assert_eq!( - SnapshotUrgency::for_days(&config, config.snapshot_days * 2), - High - ); - } - - #[test] - fn snapshot_urgency_for_versions_since() { - use SnapshotUrgency::*; - let config = ServerConfig::default(); - assert_eq!(SnapshotUrgency::for_versions_since(&config, 0), None); - assert_eq!( - SnapshotUrgency::for_versions_since(&config, config.snapshot_versions), - Low - ); - assert_eq!( - SnapshotUrgency::for_versions_since(&config, config.snapshot_versions * 2), - High - ); - } - - #[test] - fn get_child_version_not_found_initial_nil() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - txn.new_client(client_id, NIL_VERSION_ID)?; - - // when no latest version exists, the first version is NotFound - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - NIL_VERSION_ID - )?, - GetVersionResult::NotFound - ); - Ok(()) - } - - #[test] - fn get_child_version_not_found_initial_continuing() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - - txn.new_client(client_id, NIL_VERSION_ID)?; - - // when no latest version exists, _any_ child version is NOT_FOUND. This allows syncs to - // start to a new server even if the client already has been uploading to another service. - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - Uuid::new_v4(), - )?, - GetVersionResult::NotFound - ); - Ok(()) - } - - #[test] - fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - - // add a parent version, but not the requested child version - let parent_version_id = Uuid::new_v4(); - txn.new_client(client_id, parent_version_id)?; - txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; - - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - parent_version_id - )?, - GetVersionResult::NotFound - ); - Ok(()) - } - - #[test] - fn get_child_version_gone_not_latest() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - - // Add a parent version, but not the requested parent version - let parent_version_id = Uuid::new_v4(); - txn.new_client(client_id, parent_version_id)?; - txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; - - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - Uuid::new_v4(), - )?, - GetVersionResult::Gone - ); - Ok(()) - } - - #[test] - fn get_child_version_found() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id = Uuid::new_v4(); - let parent_version_id = Uuid::new_v4(); - let history_segment = b"abcd".to_vec(); - - txn.new_client(client_id, version_id)?; - txn.add_version( - client_id, - version_id, - parent_version_id, - history_segment.clone(), - )?; - - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!( - get_child_version( - txn, - &ServerConfig::default(), - client_id, - client, - parent_version_id - )?, - GetVersionResult::Success { - version_id, - parent_version_id, - history_segment, - } - ); - Ok(()) - } - - /// Utility setup function for add_version tests - fn av_setup( - storage: &InMemoryStorage, - num_versions: u32, - snapshot_version: Option, - snapshot_days_ago: Option, - ) -> anyhow::Result<(Uuid, Vec)> { - init_logging(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let mut versions = vec![]; - - let mut version_id = Uuid::nil(); - txn.new_client(client_id, Uuid::nil())?; - for vnum in 0..num_versions { - let parent_version_id = version_id; - version_id = Uuid::new_v4(); - versions.push(version_id); - txn.add_version( - client_id, - version_id, - parent_version_id, - vec![0, 0, vnum as u8], - )?; - if Some(vnum) == snapshot_version { - txn.set_snapshot( - client_id, - Snapshot { - version_id, - versions_since: 0, - timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)), - }, - vec![vnum as u8], - )?; - } - } - - Ok((client_id, versions)) - } - - /// Utility function to check the results of an add_version call - fn av_success_check( - storage: &InMemoryStorage, - client_id: Uuid, - existing_versions: &[Uuid], - result: (AddVersionResult, SnapshotUrgency), - expected_history: Vec, - expected_urgency: SnapshotUrgency, - ) -> anyhow::Result<()> { - if let AddVersionResult::Ok(new_version_id) = result.0 { - // check that it invented a new version ID - for v in existing_versions { - assert_ne!(&new_version_id, v); - } - - // verify that the storage was updated - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!(client.latest_version_id, new_version_id); - - let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil); - let version = txn.get_version(client_id, new_version_id)?.unwrap(); - assert_eq!(version.version_id, new_version_id); - assert_eq!(version.parent_version_id, parent_version_id); - assert_eq!(version.history_segment, expected_history); - } else { - panic!("did not get Ok from add_version: {:?}", result); - } - - assert_eq!(result.1, expected_urgency); - - Ok(()) - } - - #[test] - fn add_version_conflict() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 3, None, None)?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - // try to add a child of a version other than the latest - assert_eq!( - add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[1], - vec![3, 6, 9] - )? - .0, - AddVersionResult::ExpectedParentVersion(versions[2]) - ); - - // verify that the storage wasn't updated - txn = storage.txn()?; - assert_eq!( - txn.get_client(client_id)?.unwrap().latest_version_id, - versions[2] - ); - assert_eq!(txn.get_version_by_parent(client_id, versions[2])?, None); - - Ok(()) - } - - #[test] - fn add_version_with_existing_history() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 1, None, None)?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[0], - vec![3, 6, 9], - )?; - - av_success_check( - &storage, - client_id, - &versions, - result, - vec![3, 6, 9], - // urgency=high because there are no snapshots yet - SnapshotUrgency::High, - )?; - - Ok(()) - } - - #[test] - fn add_version_with_no_history() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 0, None, None)?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let parent_version_id = Uuid::nil(); - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - parent_version_id, - vec![3, 6, 9], - )?; - - av_success_check( - &storage, - client_id, - &versions, - result, - vec![3, 6, 9], - // urgency=high because there are no snapshots yet - SnapshotUrgency::High, - )?; - - Ok(()) - } - - #[test] - fn add_version_success_recent_snapshot() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let (client_id, versions) = av_setup(&storage, 1, Some(0), None)?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[0], - vec![1, 2, 3], - )?; - - av_success_check( - &storage, - client_id, - &versions, - result, - vec![1, 2, 3], - // no snapshot request since the previous version has a snapshot - SnapshotUrgency::None, - )?; - - Ok(()) - } - - #[test] - fn add_version_success_aged_snapshot() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - // one snapshot, but it was 50 days ago - let (client_id, versions) = av_setup(&storage, 1, Some(0), Some(50))?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig::default(), - client_id, - client, - versions[0], - vec![1, 2, 3], - )?; - - av_success_check( - &storage, - client_id, - &versions, - result, - vec![1, 2, 3], - // urgency=high due to days since the snapshot - SnapshotUrgency::High, - )?; - - Ok(()) - } - - #[test] - fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - // one snapshot, but it was 50 versions ago - let (client_id, versions) = av_setup(&storage, 50, Some(0), None)?; - - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - - let result = add_version( - txn, - &ServerConfig { - snapshot_versions: 30, - ..ServerConfig::default() - }, - client_id, - client, - versions[49], - vec![1, 2, 3], - )?; - - av_success_check( - &storage, - client_id, - &versions, - result, - vec![1, 2, 3], - // urgency=high due to number of versions since the snapshot - SnapshotUrgency::High, - )?; - - Ok(()) - } - - #[test] - fn add_snapshot_success_latest() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id = Uuid::new_v4(); - - // set up a task DB with one version in it - txn.new_client(client_id, version_id)?; - txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?; - - // add a snapshot for that version - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_id, - vec![1, 2, 3], - )?; - - // verify the snapshot - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - let snapshot = client.snapshot.unwrap(); - assert_eq!(snapshot.version_id, version_id); - assert_eq!(snapshot.versions_since, 0); - assert_eq!( - txn.get_snapshot_data(client_id, version_id).unwrap(), - Some(vec![1, 2, 3]) - ); - - Ok(()) - } - - #[test] - fn add_snapshot_success_older() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id_1 = Uuid::new_v4(); - let version_id_2 = Uuid::new_v4(); - - // set up a task DB with two versions in it - txn.new_client(client_id, version_id_2)?; - txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; - txn.add_version(client_id, version_id_2, version_id_1, vec![])?; - - // add a snapshot for version 1 - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_id_1, - vec![1, 2, 3], - )?; - - // verify the snapshot - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - let snapshot = client.snapshot.unwrap(); - assert_eq!(snapshot.version_id, version_id_1); - assert_eq!(snapshot.versions_since, 0); - assert_eq!( - txn.get_snapshot_data(client_id, version_id_1).unwrap(), - Some(vec![1, 2, 3]) - ); - - Ok(()) - } - - #[test] - fn add_snapshot_fails_no_such() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let version_id_1 = Uuid::new_v4(); - let version_id_2 = Uuid::new_v4(); - - // set up a task DB with two versions in it - txn.new_client(client_id, version_id_2)?; - txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; - txn.add_version(client_id, version_id_2, version_id_1, vec![])?; - - // add a snapshot for unknown version - let client = txn.get_client(client_id)?.unwrap(); - let version_id_unk = Uuid::new_v4(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_id_unk, - vec![1, 2, 3], - )?; - - // verify the snapshot does not exist - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - assert!(client.snapshot.is_none()); - - Ok(()) - } - - #[test] - fn add_snapshot_fails_too_old() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let mut version_id = Uuid::new_v4(); - let mut parent_version_id = Uuid::nil(); - let mut version_ids = vec![]; - - // set up a task DB with 10 versions in it (oldest to newest) - txn.new_client(client_id, Uuid::nil())?; - for _ in 0..10 { - txn.add_version(client_id, version_id, parent_version_id, vec![])?; - version_ids.push(version_id); - parent_version_id = version_id; - version_id = Uuid::new_v4(); - } - - // add a snapshot for the earliest of those - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_ids[0], - vec![1, 2, 3], - )?; - - // verify the snapshot does not exist - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - assert!(client.snapshot.is_none()); - - Ok(()) - } - - #[test] - fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let mut version_id = Uuid::new_v4(); - let mut parent_version_id = Uuid::nil(); - let mut version_ids = vec![]; - - // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the middle - // one - txn.new_client(client_id, Uuid::nil())?; - for _ in 0..5 { - txn.add_version(client_id, version_id, parent_version_id, vec![])?; - version_ids.push(version_id); - parent_version_id = version_id; - version_id = Uuid::new_v4(); - } - txn.set_snapshot( - client_id, - Snapshot { - version_id: version_ids[2], - versions_since: 2, - timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), - }, - vec![1, 2, 3], - )?; - - // add a snapshot for the earliest of those - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - version_ids[0], - vec![9, 9, 9], - )?; - - // verify the snapshot was not replaced - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - let snapshot = client.snapshot.unwrap(); - assert_eq!(snapshot.version_id, version_ids[2]); - assert_eq!(snapshot.versions_since, 2); - assert_eq!( - txn.get_snapshot_data(client_id, version_ids[2]).unwrap(), - Some(vec![1, 2, 3]) - ); - - Ok(()) - } - - #[test] - fn add_snapshot_fails_nil_version() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - - // just set up the client - txn.new_client(client_id, NIL_VERSION_ID)?; - - // add a snapshot for the nil version - let client = txn.get_client(client_id)?.unwrap(); - add_snapshot( - txn, - &ServerConfig::default(), - client_id, - client, - NIL_VERSION_ID, - vec![9, 9, 9], - )?; - - // verify the snapshot does not exist - let mut txn = storage.txn()?; - let client = txn.get_client(client_id)?.unwrap(); - assert!(client.snapshot.is_none()); - - Ok(()) - } - - #[test] - fn get_snapshot_found() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let data = vec![1, 2, 3]; - let snapshot_version_id = Uuid::new_v4(); - - txn.new_client(client_id, snapshot_version_id)?; - txn.set_snapshot( - client_id, - Snapshot { - version_id: snapshot_version_id, - versions_since: 3, - timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), - }, - data.clone(), - )?; - - let client = txn.get_client(client_id)?.unwrap(); - assert_eq!( - get_snapshot(txn, &ServerConfig::default(), client_id, client)?, - Some((snapshot_version_id, data)) - ); - - Ok(()) - } - - #[test] - fn get_snapshot_not_found() -> anyhow::Result<()> { - init_logging(); - - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - - txn.new_client(client_id, NIL_VERSION_ID)?; - let client = txn.get_client(client_id)?.unwrap(); - - assert_eq!( - get_snapshot(txn, &ServerConfig::default(), client_id, client)?, - None - ); - - Ok(()) - } -}