diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index a675293..7051784 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -62,11 +62,24 @@ jobs: override: true minimal: true - - uses: actions-rs/cargo@v1.0.3 + - name: taskchampion-sync-server + uses: actions-rs/cargo@v1.0.3 with: command: rustdoc args: -p taskchampion-sync-server --all-features -- -Z unstable-options --check -Dwarnings + - name: taskchampion-sync-server-core + uses: actions-rs/cargo@v1.0.3 + with: + command: rustdoc + args: -p taskchampion-sync-server-core --all-features -- -Z unstable-options --check -Dwarnings + + - name: taskchampion-sync-server-storage-sqlite + uses: actions-rs/cargo@v1.0.3 + with: + command: rustdoc + args: -p taskchampion-sync-server-storage-sqlite --all-features -- -Z unstable-options --check -Dwarnings + fmt: runs-on: ubuntu-latest name: "Formatting" diff --git a/Cargo.lock b/Cargo.lock index c98cfdb..b8685a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -687,9 +687,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" @@ -1406,9 +1406,36 @@ dependencies = [ "futures", "log", "pretty_assertions", - "rusqlite", "serde", "serde_json", + "taskchampion-sync-server-core", + "taskchampion-sync-server-storage-sqlite", + "tempfile", + "thiserror", + "uuid", +] + +[[package]] +name = "taskchampion-sync-server-core" +version = "0.4.1" +dependencies = [ + "anyhow", + "chrono", + "env_logger", + "log", + "pretty_assertions", + "uuid", +] + +[[package]] +name = "taskchampion-sync-server-storage-sqlite" +version = "0.4.1" +dependencies = [ + "anyhow", + "chrono", + "pretty_assertions", + "rusqlite", + "taskchampion-sync-server-core", "tempfile", "thiserror", "uuid", @@ -1429,18 +1456,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.0" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15291287e9bff1bc6f9ff3409ed9af665bec7a5fc8ac079ea96be07bca0e2668" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.0" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22efd00f33f93fa62848a7cab956c3d38c8d43095efda1decfc2b3a5dc0b8972" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 52ca601..ef67faf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,12 @@ -[package] -name = "taskchampion-sync-server" -version = "0.4.1" -authors = ["Dustin J. Mitchell "] -edition = "2021" -publish = false +[workspace] +resolver = "2" +members = [ + "core", + "server", + "sqlite", +] -[dependencies] +[workspace.dependencies] uuid = { version = "^1.11.0", features = ["serde", "v4"] } actix-web = "^4.9.0" anyhow = "1.0" @@ -18,8 +19,6 @@ log = "^0.4.17" env_logger = "^0.11.5" rusqlite = { version = "0.32", features = ["bundled"] } chrono = { version = "^0.4.38", features = ["serde"] } - -[dev-dependencies] actix-rt = "2" tempfile = "3" pretty_assertions = "1" diff --git a/README.md b/README.md index b20d0bd..03327f1 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,12 @@ This repository was spun off from Taskwarrior itself after the 3.0.0 release. It is still under development and currently best described as a reference implementation of the Taskchampion sync protocol. +It is comprised of three crates: + + - `taskchampion-sync-server-core` implements the core of the protocol + - `taskchmpaion-sync-server-sqlite` implements an SQLite backend for the core + - `taskchampion-sync-server` implements a simple HTTP server for the protocol + ## Installation ### From binary diff --git a/core/Cargo.toml b/core/Cargo.toml new file mode 100644 index 0000000..61c0a9d --- /dev/null +++ b/core/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "taskchampion-sync-server-core" +version = "0.4.1" +authors = ["Dustin J. Mitchell "] +edition = "2021" + +[dependencies] +uuid.workspace = true +anyhow.workspace = true +log.workspace = true +env_logger.workspace = true +chrono.workspace = true + +[dev-dependencies] +pretty_assertions.workspace = true diff --git a/core/README.md b/core/README.md new file mode 100644 index 0000000..f6a35ff --- /dev/null +++ b/core/README.md @@ -0,0 +1,8 @@ +# taskchampion-sync-server-core + +This crate implements the core logic of the taskchampion sync protocol. + +This should be considered a reference implementation, with [the protocol +documentation](https://gothenburgbitfactory.org/taskchampion/sync-protocol.html). +representing the authoritative definition of the protocol. Other +implementations are encouraged. diff --git a/src/storage/inmemory.rs b/core/src/inmemory.rs similarity index 98% rename from src/storage/inmemory.rs rename to core/src/inmemory.rs index fcb5ac6..23cea4a 100644 --- a/src/storage/inmemory.rs +++ b/core/src/inmemory.rs @@ -1,6 +1,7 @@ -use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version}; +use super::{Client, Snapshot, Storage, StorageTxn, Version}; use std::collections::HashMap; use std::sync::{Mutex, MutexGuard}; +use uuid::Uuid; struct Inner { /// Clients, indexed by client_id @@ -16,6 +17,11 @@ struct Inner { children: HashMap<(Uuid, Uuid), Uuid>, } +/// In-memory storage for testing and experimentation. +/// +/// This is not for production use, but supports testing of sync server implementations. +/// +/// NOTE: this does not implement transaction rollback. pub struct InMemoryStorage(Mutex); impl InMemoryStorage { @@ -32,9 +38,6 @@ impl InMemoryStorage { struct InnerTxn<'a>(MutexGuard<'a, Inner>); -/// In-memory storage for testing and experimentation. -/// -/// NOTE: this does not implement transaction rollback. impl Storage for InMemoryStorage { fn txn<'a>(&'a self) -> anyhow::Result> { Ok(Box::new(InnerTxn(self.0.lock().expect("poisoned lock")))) diff --git a/core/src/lib.rs b/core/src/lib.rs new file mode 100644 index 0000000..99c6778 --- /dev/null +++ b/core/src/lib.rs @@ -0,0 +1,32 @@ +//! This crate implements the core logic of the taskchampion sync protocol. +//! +//! This should be considered a reference implementation, with [the protocol +//! documentation](https://gothenburgbitfactory.org/taskchampion/sync-protocol.html). representing +//! the authoritative definition of the protocol. Other implementations are encouraged. +//! +//! This crate uses an abstract storage backend. Note that this does not implement the +//! HTTP-specific portions of the protocol, nor provide any storage implementations. +//! +//! ## API Methods +//! +//! The following API methods are implemented. These methods are documented in more detail in +//! the protocol documentation. +//! +//! * [`add_version`] +//! * [`get_child_version`] +//! * [`add_snapshot`] +//! * [`get_snapshot`] +//! +//! Each API method takes: +//! +//! * [`StorageTxn`] to access storage. Methods which modify storage will commit the transaction before returning. +//! * [`ServerConfig`] providing basic configuration for the server's behavior. +//! * `client_id` and a [`Client`] providing the client metadata. + +mod inmemory; +mod server; +mod storage; + +pub use inmemory::*; +pub use server::*; +pub use storage::*; diff --git a/core/src/server.rs b/core/src/server.rs new file mode 100644 index 0000000..cc99b86 --- /dev/null +++ b/core/src/server.rs @@ -0,0 +1,1037 @@ +use crate::storage::{Client, Snapshot, StorageTxn}; +use chrono::Utc; +use uuid::Uuid; + +/// The distinguished value for "no version" +pub const NIL_VERSION_ID: VersionId = Uuid::nil(); + +/// Number of versions to search back from the latest to find the +/// version for a newly-added snapshot. Snapshots for versions older +/// than this will be rejected. +const SNAPSHOT_SEARCH_LEN: i32 = 5; + +pub type HistorySegment = Vec; +pub type ClientId = Uuid; +pub type VersionId = Uuid; + +/// ServerConfig contains configuration parameters for the server. +pub struct ServerConfig { + /// Target number of days between snapshots. + pub snapshot_days: i64, + + /// Target number of versions between snapshots. + pub snapshot_versions: u32, +} + +impl Default for ServerConfig { + fn default() -> Self { + ServerConfig { + snapshot_days: 14, + snapshot_versions: 100, + } + } +} + +impl ServerConfig { + pub fn from_args(snapshot_days: i64, snapshot_versions: u32) -> anyhow::Result { + Ok(ServerConfig { + snapshot_days, + snapshot_versions, + }) + } +} + +/// Response to get_child_version. See the protocol documentation. +#[derive(Clone, PartialEq, Debug)] +pub enum GetVersionResult { + NotFound, + Gone, + Success { + version_id: Uuid, + parent_version_id: Uuid, + history_segment: HistorySegment, + }, +} + +/// Implementation of the GetChildVersion protocol transaction. +pub fn get_child_version<'a>( + mut txn: Box, + _config: &ServerConfig, + client_id: ClientId, + client: Client, + parent_version_id: VersionId, +) -> anyhow::Result { + // If a version with parentVersionId equal to the requested parentVersionId exists, it is returned. + if let Some(version) = txn.get_version_by_parent(client_id, parent_version_id)? { + return Ok(GetVersionResult::Success { + version_id: version.version_id, + parent_version_id: version.parent_version_id, + history_segment: version.history_segment, + }); + } + + // Return NotFound if an AddVersion with this parent_version_id would succeed, and otherwise + // return Gone. + // + // AddVersion will succeed if either + // - the requested parent version is the latest version; or + // - there is no latest version, meaning there are no versions stored for this client + Ok( + if client.latest_version_id == parent_version_id + || client.latest_version_id == NIL_VERSION_ID + { + GetVersionResult::NotFound + } else { + GetVersionResult::Gone + }, + ) +} + +/// Response to add_version +#[derive(Clone, PartialEq, Debug)] +pub enum AddVersionResult { + /// OK, version added with the given ID + Ok(VersionId), + /// Rejected; expected a version with the given parent version + ExpectedParentVersion(VersionId), +} + +/// Urgency of a snapshot for a client; used to create the `X-Snapshot-Request` header. +#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)] +pub enum SnapshotUrgency { + /// Don't need a snapshot right now. + None, + + /// A snapshot would be good, but can wait for other replicas to provide it. + Low, + + /// A snapshot is needed right now. + High, +} + +impl SnapshotUrgency { + /// Calculate the urgency for a snapshot based on its age in days + fn for_days(config: &ServerConfig, days: i64) -> Self { + if days >= config.snapshot_days * 3 / 2 { + SnapshotUrgency::High + } else if days >= config.snapshot_days { + SnapshotUrgency::Low + } else { + SnapshotUrgency::None + } + } + + /// Calculate the urgency for a snapshot based on its age in versions + fn for_versions_since(config: &ServerConfig, versions_since: u32) -> Self { + if versions_since >= config.snapshot_versions * 3 / 2 { + SnapshotUrgency::High + } else if versions_since >= config.snapshot_versions { + SnapshotUrgency::Low + } else { + SnapshotUrgency::None + } + } +} + +/// Implementation of the AddVersion protocol transaction +pub fn add_version<'a>( + mut txn: Box, + config: &ServerConfig, + client_id: ClientId, + client: Client, + parent_version_id: VersionId, + history_segment: HistorySegment, +) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { + log::debug!( + "add_version(client_id: {}, parent_version_id: {})", + client_id, + parent_version_id, + ); + + // check if this version is acceptable, under the protection of the transaction + if client.latest_version_id != NIL_VERSION_ID && parent_version_id != client.latest_version_id { + log::debug!("add_version request rejected: mismatched latest_version_id"); + return Ok(( + AddVersionResult::ExpectedParentVersion(client.latest_version_id), + SnapshotUrgency::None, + )); + } + + // invent a version ID + let version_id = Uuid::new_v4(); + log::debug!( + "add_version request accepted: new version_id: {}", + version_id + ); + + // update the DB + txn.add_version(client_id, version_id, parent_version_id, history_segment)?; + txn.commit()?; + + // calculate the urgency + let time_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { timestamp, .. }) => { + SnapshotUrgency::for_days(config, (Utc::now() - timestamp).num_days()) + } + }; + + let version_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { versions_since, .. }) => { + SnapshotUrgency::for_versions_since(config, versions_since) + } + }; + + Ok(( + AddVersionResult::Ok(version_id), + std::cmp::max(time_urgency, version_urgency), + )) +} + +/// Implementation of the AddSnapshot protocol transaction +pub fn add_snapshot<'a>( + mut txn: Box, + _config: &ServerConfig, + client_id: ClientId, + client: Client, + version_id: VersionId, + data: Vec, +) -> anyhow::Result<()> { + log::debug!( + "add_snapshot(client_id: {}, version_id: {})", + client_id, + version_id, + ); + + // NOTE: if the snapshot is rejected, this function logs about it and returns + // Ok(()), as there's no reason to report an errot to the client / user. + + let last_snapshot = client.snapshot.map(|snap| snap.version_id); + if Some(version_id) == last_snapshot { + log::debug!( + "rejecting snapshot for version {}: already exists", + version_id + ); + return Ok(()); + } + + // look for this version in the history of this client, starting at the latest version, and + // only iterating for a limited number of versions. + let mut search_len = SNAPSHOT_SEARCH_LEN; + let mut vid = client.latest_version_id; + + loop { + if vid == version_id && version_id != NIL_VERSION_ID { + // the new snapshot is for a recent version, so proceed + break; + } + + if Some(vid) == last_snapshot { + // the new snapshot is older than the last snapshot, so ignore it + log::debug!( + "rejecting snapshot for version {}: newer snapshot already exists or no such version", + version_id + ); + return Ok(()); + } + + search_len -= 1; + if search_len <= 0 || vid == NIL_VERSION_ID { + // this should not happen in normal operation, so warn about it + log::warn!( + "rejecting snapshot for version {}: version is too old or no such version", + version_id + ); + return Ok(()); + } + + // get the parent version ID + if let Some(parent) = txn.get_version(client_id, vid)? { + vid = parent.parent_version_id; + } else { + // this version does not exist; "this should not happen" but if it does, + // we don't need a snapshot earlier than the missing version. + log::warn!( + "rejecting snapshot for version {}: newer versions have already been deleted", + version_id + ); + return Ok(()); + } + } + + log::debug!("accepting snapshot for version {}", version_id); + txn.set_snapshot( + client_id, + Snapshot { + version_id, + timestamp: Utc::now(), + versions_since: 0, + }, + data, + )?; + txn.commit()?; + Ok(()) +} + +/// Implementation of the GetSnapshot protocol transaction +pub fn get_snapshot<'a>( + mut txn: Box, + _config: &ServerConfig, + client_id: ClientId, + client: Client, +) -> anyhow::Result)>> { + Ok(if let Some(snap) = client.snapshot { + txn.get_snapshot_data(client_id, snap.version_id)? + .map(|data| (snap.version_id, data)) + } else { + None + }) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::inmemory::InMemoryStorage; + use crate::storage::{Snapshot, Storage}; + use chrono::{Duration, TimeZone, Utc}; + use pretty_assertions::assert_eq; + + fn init_logging() { + let _ = env_logger::builder().is_test(true).try_init(); + } + + #[test] + fn snapshot_urgency_max() { + use SnapshotUrgency::*; + assert_eq!(std::cmp::max(None, None), None); + assert_eq!(std::cmp::max(None, Low), Low); + assert_eq!(std::cmp::max(None, High), High); + assert_eq!(std::cmp::max(Low, None), Low); + assert_eq!(std::cmp::max(Low, Low), Low); + assert_eq!(std::cmp::max(Low, High), High); + assert_eq!(std::cmp::max(High, None), High); + assert_eq!(std::cmp::max(High, Low), High); + assert_eq!(std::cmp::max(High, High), High); + } + + #[test] + fn snapshot_urgency_for_days() { + use SnapshotUrgency::*; + let config = ServerConfig::default(); + assert_eq!(SnapshotUrgency::for_days(&config, 0), None); + assert_eq!( + SnapshotUrgency::for_days(&config, config.snapshot_days), + Low + ); + assert_eq!( + SnapshotUrgency::for_days(&config, config.snapshot_days * 2), + High + ); + } + + #[test] + fn snapshot_urgency_for_versions_since() { + use SnapshotUrgency::*; + let config = ServerConfig::default(); + assert_eq!(SnapshotUrgency::for_versions_since(&config, 0), None); + assert_eq!( + SnapshotUrgency::for_versions_since(&config, config.snapshot_versions), + Low + ); + assert_eq!( + SnapshotUrgency::for_versions_since(&config, config.snapshot_versions * 2), + High + ); + } + + #[test] + fn get_child_version_not_found_initial_nil() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + txn.new_client(client_id, NIL_VERSION_ID)?; + + // when no latest version exists, the first version is NotFound + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!( + get_child_version( + txn, + &ServerConfig::default(), + client_id, + client, + NIL_VERSION_ID + )?, + GetVersionResult::NotFound + ); + Ok(()) + } + + #[test] + fn get_child_version_not_found_initial_continuing() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + + txn.new_client(client_id, NIL_VERSION_ID)?; + + // when no latest version exists, _any_ child version is NOT_FOUND. This allows syncs to + // start to a new server even if the client already has been uploading to another service. + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!( + get_child_version( + txn, + &ServerConfig::default(), + client_id, + client, + Uuid::new_v4(), + )?, + GetVersionResult::NotFound + ); + Ok(()) + } + + #[test] + fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + + // add a parent version, but not the requested child version + let parent_version_id = Uuid::new_v4(); + txn.new_client(client_id, parent_version_id)?; + txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; + + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!( + get_child_version( + txn, + &ServerConfig::default(), + client_id, + client, + parent_version_id + )?, + GetVersionResult::NotFound + ); + Ok(()) + } + + #[test] + fn get_child_version_gone_not_latest() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + + // Add a parent version, but not the requested parent version + let parent_version_id = Uuid::new_v4(); + txn.new_client(client_id, parent_version_id)?; + txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; + + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!( + get_child_version( + txn, + &ServerConfig::default(), + client_id, + client, + Uuid::new_v4(), + )?, + GetVersionResult::Gone + ); + Ok(()) + } + + #[test] + fn get_child_version_found() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + let history_segment = b"abcd".to_vec(); + + txn.new_client(client_id, version_id)?; + txn.add_version( + client_id, + version_id, + parent_version_id, + history_segment.clone(), + )?; + + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!( + get_child_version( + txn, + &ServerConfig::default(), + client_id, + client, + parent_version_id + )?, + GetVersionResult::Success { + version_id, + parent_version_id, + history_segment, + } + ); + Ok(()) + } + + /// Utility setup function for add_version tests + fn av_setup( + storage: &InMemoryStorage, + num_versions: u32, + snapshot_version: Option, + snapshot_days_ago: Option, + ) -> anyhow::Result<(Uuid, Vec)> { + init_logging(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let mut versions = vec![]; + + let mut version_id = Uuid::nil(); + txn.new_client(client_id, Uuid::nil())?; + for vnum in 0..num_versions { + let parent_version_id = version_id; + version_id = Uuid::new_v4(); + versions.push(version_id); + txn.add_version( + client_id, + version_id, + parent_version_id, + vec![0, 0, vnum as u8], + )?; + if Some(vnum) == snapshot_version { + txn.set_snapshot( + client_id, + Snapshot { + version_id, + versions_since: 0, + timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)), + }, + vec![vnum as u8], + )?; + } + } + + Ok((client_id, versions)) + } + + /// Utility function to check the results of an add_version call + fn av_success_check( + storage: &InMemoryStorage, + client_id: Uuid, + existing_versions: &[Uuid], + result: (AddVersionResult, SnapshotUrgency), + expected_history: Vec, + expected_urgency: SnapshotUrgency, + ) -> anyhow::Result<()> { + if let AddVersionResult::Ok(new_version_id) = result.0 { + // check that it invented a new version ID + for v in existing_versions { + assert_ne!(&new_version_id, v); + } + + // verify that the storage was updated + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!(client.latest_version_id, new_version_id); + + let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil); + let version = txn.get_version(client_id, new_version_id)?.unwrap(); + assert_eq!(version.version_id, new_version_id); + assert_eq!(version.parent_version_id, parent_version_id); + assert_eq!(version.history_segment, expected_history); + } else { + panic!("did not get Ok from add_version: {:?}", result); + } + + assert_eq!(result.1, expected_urgency); + + Ok(()) + } + + #[test] + fn add_version_conflict() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_id, versions) = av_setup(&storage, 3, None, None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + + // try to add a child of a version other than the latest + assert_eq!( + add_version( + txn, + &ServerConfig::default(), + client_id, + client, + versions[1], + vec![3, 6, 9] + )? + .0, + AddVersionResult::ExpectedParentVersion(versions[2]) + ); + + // verify that the storage wasn't updated + txn = storage.txn()?; + assert_eq!( + txn.get_client(client_id)?.unwrap().latest_version_id, + versions[2] + ); + assert_eq!(txn.get_version_by_parent(client_id, versions[2])?, None); + + Ok(()) + } + + #[test] + fn add_version_with_existing_history() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_id, versions) = av_setup(&storage, 1, None, None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + + let result = add_version( + txn, + &ServerConfig::default(), + client_id, + client, + versions[0], + vec![3, 6, 9], + )?; + + av_success_check( + &storage, + client_id, + &versions, + result, + vec![3, 6, 9], + // urgency=high because there are no snapshots yet + SnapshotUrgency::High, + )?; + + Ok(()) + } + + #[test] + fn add_version_with_no_history() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_id, versions) = av_setup(&storage, 0, None, None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + + let parent_version_id = Uuid::nil(); + let result = add_version( + txn, + &ServerConfig::default(), + client_id, + client, + parent_version_id, + vec![3, 6, 9], + )?; + + av_success_check( + &storage, + client_id, + &versions, + result, + vec![3, 6, 9], + // urgency=high because there are no snapshots yet + SnapshotUrgency::High, + )?; + + Ok(()) + } + + #[test] + fn add_version_success_recent_snapshot() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_id, versions) = av_setup(&storage, 1, Some(0), None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + + let result = add_version( + txn, + &ServerConfig::default(), + client_id, + client, + versions[0], + vec![1, 2, 3], + )?; + + av_success_check( + &storage, + client_id, + &versions, + result, + vec![1, 2, 3], + // no snapshot request since the previous version has a snapshot + SnapshotUrgency::None, + )?; + + Ok(()) + } + + #[test] + fn add_version_success_aged_snapshot() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + // one snapshot, but it was 50 days ago + let (client_id, versions) = av_setup(&storage, 1, Some(0), Some(50))?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + + let result = add_version( + txn, + &ServerConfig::default(), + client_id, + client, + versions[0], + vec![1, 2, 3], + )?; + + av_success_check( + &storage, + client_id, + &versions, + result, + vec![1, 2, 3], + // urgency=high due to days since the snapshot + SnapshotUrgency::High, + )?; + + Ok(()) + } + + #[test] + fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + // one snapshot, but it was 50 versions ago + let (client_id, versions) = av_setup(&storage, 50, Some(0), None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + + let result = add_version( + txn, + &ServerConfig { + snapshot_versions: 30, + ..ServerConfig::default() + }, + client_id, + client, + versions[49], + vec![1, 2, 3], + )?; + + av_success_check( + &storage, + client_id, + &versions, + result, + vec![1, 2, 3], + // urgency=high due to number of versions since the snapshot + SnapshotUrgency::High, + )?; + + Ok(()) + } + + #[test] + fn add_snapshot_success_latest() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + + // set up a task DB with one version in it + txn.new_client(client_id, version_id)?; + txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?; + + // add a snapshot for that version + let client = txn.get_client(client_id)?.unwrap(); + add_snapshot( + txn, + &ServerConfig::default(), + client_id, + client, + version_id, + vec![1, 2, 3], + )?; + + // verify the snapshot + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_id); + assert_eq!(snapshot.versions_since, 0); + assert_eq!( + txn.get_snapshot_data(client_id, version_id).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_success_older() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); + + // set up a task DB with two versions in it + txn.new_client(client_id, version_id_2)?; + txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; + txn.add_version(client_id, version_id_2, version_id_1, vec![])?; + + // add a snapshot for version 1 + let client = txn.get_client(client_id)?.unwrap(); + add_snapshot( + txn, + &ServerConfig::default(), + client_id, + client, + version_id_1, + vec![1, 2, 3], + )?; + + // verify the snapshot + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_id_1); + assert_eq!(snapshot.versions_since, 0); + assert_eq!( + txn.get_snapshot_data(client_id, version_id_1).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_no_such() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); + + // set up a task DB with two versions in it + txn.new_client(client_id, version_id_2)?; + txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; + txn.add_version(client_id, version_id_2, version_id_1, vec![])?; + + // add a snapshot for unknown version + let client = txn.get_client(client_id)?.unwrap(); + let version_id_unk = Uuid::new_v4(); + add_snapshot( + txn, + &ServerConfig::default(), + client_id, + client, + version_id_unk, + vec![1, 2, 3], + )?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_too_old() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; + + // set up a task DB with 10 versions in it (oldest to newest) + txn.new_client(client_id, Uuid::nil())?; + for _ in 0..10 { + txn.add_version(client_id, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } + + // add a snapshot for the earliest of those + let client = txn.get_client(client_id)?.unwrap(); + add_snapshot( + txn, + &ServerConfig::default(), + client_id, + client, + version_ids[0], + vec![1, 2, 3], + )?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; + + // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the middle + // one + txn.new_client(client_id, Uuid::nil())?; + for _ in 0..5 { + txn.add_version(client_id, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } + txn.set_snapshot( + client_id, + Snapshot { + version_id: version_ids[2], + versions_since: 2, + timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), + }, + vec![1, 2, 3], + )?; + + // add a snapshot for the earliest of those + let client = txn.get_client(client_id)?.unwrap(); + add_snapshot( + txn, + &ServerConfig::default(), + client_id, + client, + version_ids[0], + vec![9, 9, 9], + )?; + + // verify the snapshot was not replaced + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_ids[2]); + assert_eq!(snapshot.versions_since, 2); + assert_eq!( + txn.get_snapshot_data(client_id, version_ids[2]).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_nil_version() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + + // just set up the client + txn.new_client(client_id, NIL_VERSION_ID)?; + + // add a snapshot for the nil version + let client = txn.get_client(client_id)?.unwrap(); + add_snapshot( + txn, + &ServerConfig::default(), + client_id, + client, + NIL_VERSION_ID, + vec![9, 9, 9], + )?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_id)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + + #[test] + fn get_snapshot_found() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + let data = vec![1, 2, 3]; + let snapshot_version_id = Uuid::new_v4(); + + txn.new_client(client_id, snapshot_version_id)?; + txn.set_snapshot( + client_id, + Snapshot { + version_id: snapshot_version_id, + versions_since: 3, + timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(), + }, + data.clone(), + )?; + + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!( + get_snapshot(txn, &ServerConfig::default(), client_id, client)?, + Some((snapshot_version_id, data)) + ); + + Ok(()) + } + + #[test] + fn get_snapshot_not_found() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_id = Uuid::new_v4(); + + txn.new_client(client_id, NIL_VERSION_ID)?; + let client = txn.get_client(client_id)?.unwrap(); + + assert_eq!( + get_snapshot(txn, &ServerConfig::default(), client_id, client)?, + None + ); + + Ok(()) + } +} diff --git a/src/storage/mod.rs b/core/src/storage.rs similarity index 82% rename from src/storage/mod.rs rename to core/src/storage.rs index a2e468f..7845dd6 100644 --- a/src/storage/mod.rs +++ b/core/src/storage.rs @@ -1,15 +1,7 @@ use chrono::{DateTime, Utc}; use uuid::Uuid; -#[cfg(debug_assertions)] -mod inmemory; - -#[cfg(debug_assertions)] -pub use inmemory::InMemoryStorage; - -mod sqlite; -pub use self::sqlite::SqliteStorage; - +/// A representation of stored metadata about a client. #[derive(Clone, PartialEq, Eq, Debug)] pub struct Client { /// The latest version for this client (may be the nil version) @@ -18,6 +10,7 @@ pub struct Client { pub snapshot: Option, } +/// Metadata about a snapshot, not including the snapshot data itself. #[derive(Clone, PartialEq, Eq, Debug)] pub struct Snapshot { /// ID of the version at which this snapshot was made @@ -32,11 +25,19 @@ pub struct Snapshot { #[derive(Clone, PartialEq, Eq, Debug)] pub struct Version { + /// The uuid identifying this version. pub version_id: Uuid, + /// The uuid identifying this version's parent. pub parent_version_id: Uuid, + /// The data carried in this version. pub history_segment: Vec, } +/// A transaction in the storage backend. +/// +/// Transactions must be sequentially consistent. That is, the results of transactions performed +/// in storage must be as if each were executed sequentially in some order. In particular, the +/// `Client.latest_version` must not change between a call to `get_client` and `add_version`. pub trait StorageTxn { /// Get information about the given client fn get_client(&mut self, client_id: Uuid) -> anyhow::Result>; diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..f631ac5 --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "taskchampion-sync-server" +version = "0.4.1" +authors = ["Dustin J. Mitchell "] +edition = "2021" +publish = false + +[dependencies] +taskchampion-sync-server-core = { path = "../core" } +taskchampion-sync-server-storage-sqlite = { path = "../sqlite" } +uuid.workspace = true +actix-web.workspace = true +anyhow.workspace = true +thiserror.workspace = true +futures.workspace = true +serde_json.workspace = true +serde.workspace = true +clap.workspace = true +log.workspace = true +env_logger.workspace = true +chrono.workspace = true + +[dev-dependencies] +actix-rt.workspace = true +tempfile.workspace = true +pretty_assertions.workspace = true diff --git a/src/api/add_snapshot.rs b/server/src/api/add_snapshot.rs similarity index 98% rename from src/api/add_snapshot.rs rename to server/src/api/add_snapshot.rs index 5ac42f5..9f0e249 100644 --- a/src/api/add_snapshot.rs +++ b/server/src/api/add_snapshot.rs @@ -1,8 +1,8 @@ use crate::api::{client_id_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE}; -use crate::server::{add_snapshot, VersionId, NIL_VERSION_ID}; use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; use futures::StreamExt; use std::sync::Arc; +use taskchampion_sync_server_core::{add_snapshot, VersionId, NIL_VERSION_ID}; /// Max snapshot size: 100MB const MAX_SIZE: usize = 100 * 1024 * 1024; @@ -77,10 +77,10 @@ pub(crate) async fn service( mod test { use super::*; use crate::api::CLIENT_ID_HEADER; - use crate::storage::{InMemoryStorage, Storage}; use crate::Server; use actix_web::{http::StatusCode, test, App}; use pretty_assertions::assert_eq; + use taskchampion_sync_server_core::{InMemoryStorage, Storage}; use uuid::Uuid; #[actix_rt::test] diff --git a/src/api/add_version.rs b/server/src/api/add_version.rs similarity index 97% rename from src/api/add_version.rs rename to server/src/api/add_version.rs index d3d693b..f416753 100644 --- a/src/api/add_version.rs +++ b/server/src/api/add_version.rs @@ -2,10 +2,12 @@ use crate::api::{ client_id_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, VERSION_ID_HEADER, }; -use crate::server::{add_version, AddVersionResult, SnapshotUrgency, VersionId, NIL_VERSION_ID}; use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; use futures::StreamExt; use std::sync::Arc; +use taskchampion_sync_server_core::{ + add_version, AddVersionResult, SnapshotUrgency, VersionId, NIL_VERSION_ID, +}; /// Max history segment size: 100MB const MAX_SIZE: usize = 100 * 1024 * 1024; @@ -105,10 +107,10 @@ pub(crate) async fn service( #[cfg(test)] mod test { use crate::api::CLIENT_ID_HEADER; - use crate::storage::{InMemoryStorage, Storage}; use crate::Server; use actix_web::{http::StatusCode, test, App}; use pretty_assertions::assert_eq; + use taskchampion_sync_server_core::{InMemoryStorage, Storage}; use uuid::Uuid; #[actix_rt::test] diff --git a/src/api/get_child_version.rs b/server/src/api/get_child_version.rs similarity index 97% rename from src/api/get_child_version.rs rename to server/src/api/get_child_version.rs index c8f43df..d73f0fc 100644 --- a/src/api/get_child_version.rs +++ b/server/src/api/get_child_version.rs @@ -2,9 +2,9 @@ use crate::api::{ client_id_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER, }; -use crate::server::{get_child_version, GetVersionResult, VersionId}; use actix_web::{error, get, web, HttpRequest, HttpResponse, Result}; use std::sync::Arc; +use taskchampion_sync_server_core::{get_child_version, GetVersionResult, VersionId}; /// Get a child version. /// @@ -57,11 +57,10 @@ pub(crate) async fn service( #[cfg(test)] mod test { use crate::api::CLIENT_ID_HEADER; - use crate::server::NIL_VERSION_ID; - use crate::storage::{InMemoryStorage, Storage}; use crate::Server; use actix_web::{http::StatusCode, test, App}; use pretty_assertions::assert_eq; + use taskchampion_sync_server_core::{InMemoryStorage, Storage, NIL_VERSION_ID}; use uuid::Uuid; #[actix_rt::test] diff --git a/src/api/get_snapshot.rs b/server/src/api/get_snapshot.rs similarity index 96% rename from src/api/get_snapshot.rs rename to server/src/api/get_snapshot.rs index e5a4dc5..b179a4b 100644 --- a/src/api/get_snapshot.rs +++ b/server/src/api/get_snapshot.rs @@ -1,9 +1,9 @@ use crate::api::{ client_id_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE, VERSION_ID_HEADER, }; -use crate::server::get_snapshot; use actix_web::{error, get, web, HttpRequest, HttpResponse, Result}; use std::sync::Arc; +use taskchampion_sync_server_core::get_snapshot; /// Get a snapshot. /// @@ -42,11 +42,11 @@ pub(crate) async fn service( #[cfg(test)] mod test { use crate::api::CLIENT_ID_HEADER; - use crate::storage::{InMemoryStorage, Snapshot, Storage}; use crate::Server; use actix_web::{http::StatusCode, test, App}; use chrono::{TimeZone, Utc}; use pretty_assertions::assert_eq; + use taskchampion_sync_server_core::{InMemoryStorage, Snapshot, Storage}; use uuid::Uuid; #[actix_rt::test] diff --git a/src/api/mod.rs b/server/src/api/mod.rs similarity index 95% rename from src/api/mod.rs rename to server/src/api/mod.rs index bb5001f..4489369 100644 --- a/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -1,7 +1,5 @@ -use crate::server::ClientId; -use crate::storage::Storage; -use crate::ServerConfig; use actix_web::{error, http::StatusCode, web, HttpRequest, Result, Scope}; +use taskchampion_sync_server_core::{ClientId, ServerConfig, Storage}; mod add_snapshot; mod add_version; diff --git a/src/bin/taskchampion-sync-server.rs b/server/src/bin/taskchampion-sync-server.rs similarity index 93% rename from src/bin/taskchampion-sync-server.rs rename to server/src/bin/taskchampion-sync-server.rs index e25ce2c..23ccb78 100644 --- a/src/bin/taskchampion-sync-server.rs +++ b/server/src/bin/taskchampion-sync-server.rs @@ -3,8 +3,9 @@ use actix_web::{middleware::Logger, App, HttpServer}; use clap::{arg, builder::ValueParser, value_parser, Command}; use std::ffi::OsString; -use taskchampion_sync_server::storage::SqliteStorage; -use taskchampion_sync_server::{Server, ServerConfig}; +use taskchampion_sync_server::Server; +use taskchampion_sync_server_core::ServerConfig; +use taskchampion_sync_server_storage_sqlite::SqliteStorage; #[actix_web::main] async fn main() -> anyhow::Result<()> { @@ -62,7 +63,7 @@ async fn main() -> anyhow::Result<()> { mod test { use super::*; use actix_web::{test, App}; - use taskchampion_sync_server::storage::InMemoryStorage; + use taskchampion_sync_server_core::InMemoryStorage; #[actix_rt::test] async fn test_index_get() { diff --git a/src/lib.rs b/server/src/lib.rs similarity index 88% rename from src/lib.rs rename to server/src/lib.rs index a4d52bf..67c8956 100644 --- a/src/lib.rs +++ b/server/src/lib.rs @@ -1,15 +1,11 @@ #![deny(clippy::all)] mod api; -mod server; -pub mod storage; -use crate::storage::Storage; use actix_web::{get, middleware, web, Responder}; use api::{api_scope, ServerState}; use std::sync::Arc; - -pub use server::ServerConfig; +use taskchampion_sync_server_core::{ServerConfig, Storage}; #[get("/")] async fn index() -> impl Responder { @@ -47,13 +43,9 @@ impl Server { #[cfg(test)] mod test { use super::*; - use crate::storage::InMemoryStorage; use actix_web::{test, App}; use pretty_assertions::assert_eq; - - pub(crate) fn init_logging() { - let _ = env_logger::builder().is_test(true).try_init(); - } + use taskchampion_sync_server_core::InMemoryStorage; #[actix_rt::test] async fn test_cache_control() { diff --git a/src/server.rs b/server/src/server.rs similarity index 100% rename from src/server.rs rename to server/src/server.rs diff --git a/sqlite/Cargo.toml b/sqlite/Cargo.toml new file mode 100644 index 0000000..96d402b --- /dev/null +++ b/sqlite/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "taskchampion-sync-server-storage-sqlite" +version = "0.4.1" +authors = ["Dustin J. Mitchell "] +edition = "2021" + +[dependencies] +taskchampion-sync-server-core = { path = "../core" } +uuid.workspace = true +anyhow.workspace = true +thiserror.workspace = true +rusqlite.workspace = true +chrono.workspace = true + +[dev-dependencies] +tempfile.workspace = true +pretty_assertions.workspace = true diff --git a/sqlite/README.md b/sqlite/README.md new file mode 100644 index 0000000..9a629c1 --- /dev/null +++ b/sqlite/README.md @@ -0,0 +1,4 @@ +# taskchampion-sync-server-storage-sqlite + +This crate implements a SQLite storage backend for the +`taskchampion-sync-server-core`. diff --git a/src/storage/sqlite.rs b/sqlite/src/lib.rs similarity index 97% rename from src/storage/sqlite.rs rename to sqlite/src/lib.rs index 3e05b3c..edd0be1 100644 --- a/src/storage/sqlite.rs +++ b/sqlite/src/lib.rs @@ -1,9 +1,11 @@ -use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version}; +//! Tihs crate implements a SQLite storage backend for the TaskChampion sync server. use anyhow::Context; use chrono::{TimeZone, Utc}; use rusqlite::types::{FromSql, ToSql}; use rusqlite::{params, Connection, OptionalExtension}; use std::path::Path; +use taskchampion_sync_server_core::{Client, Snapshot, Storage, StorageTxn, Version}; +use uuid::Uuid; #[derive(Debug, thiserror::Error)] enum SqliteError { @@ -31,7 +33,7 @@ impl ToSql for StoredUuid { } } -/// An on-disk storage backend which uses SQLite +/// An on-disk storage backend which uses SQLite. pub struct SqliteStorage { db_file: std::path::PathBuf, } @@ -41,6 +43,10 @@ impl SqliteStorage { Ok(Connection::open(&self.db_file)?) } + /// Create a new instance using a database at the given directory. + /// + /// The database will be stored in a file named `taskchampion-sync-server.sqlite3` in the given + /// directory. pub fn new>(directory: P) -> anyhow::Result { std::fs::create_dir_all(&directory) .with_context(|| format!("Failed to create `{}`.", directory.as_ref().display()))?;