diff --git a/.changelogs/2021-10-03-server-storage.md b/.changelogs/2021-10-03-server-storage.md new file mode 100644 index 000000000..7834601d5 --- /dev/null +++ b/.changelogs/2021-10-03-server-storage.md @@ -0,0 +1,2 @@ +- The SQLite server storage schema has changed incompatibly, in order to add support for snapshots. + As this is not currently ready for production usage, no migration path is provided except deleting the existing database. diff --git a/Cargo.lock b/Cargo.lock index 4c0d541ac..bb79505da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3016,6 +3016,7 @@ dependencies = [ "actix-rt", "actix-web", "anyhow", + "chrono", "clap", "env_logger 0.8.4", "futures", diff --git a/sync-server/Cargo.toml b/sync-server/Cargo.toml index c0d26656e..25ac111bf 100644 --- a/sync-server/Cargo.toml +++ b/sync-server/Cargo.toml @@ -19,6 +19,7 @@ clap = "^2.33.0" log = "^0.4.14" env_logger = "^0.8.3" rusqlite = { version = "0.25", features = ["bundled"] } +chrono = { version = "^0.4.10", features = ["serde"] } [dev-dependencies] actix-rt = "^1.1.1" diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 9bc007cc4..c20d0b955 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -71,7 +71,6 @@ pub(crate) fn add_version<'a>( // update the DB txn.add_version(client_key, version_id, parent_version_id, history_segment)?; - txn.set_client_latest_version_id(client_key, version_id)?; txn.commit()?; Ok(AddVersionResult::Ok(version_id)) @@ -102,6 +101,7 @@ mod test { let parent_version_id = Uuid::new_v4(); let history_segment = b"abcd".to_vec(); + txn.new_client(client_key, version_id)?; txn.add_version( client_key, version_id, @@ -130,6 +130,7 @@ mod test { let existing_parent_version_id = Uuid::new_v4(); let client = Client { latest_version_id: existing_parent_version_id, + snapshot: None, }; assert_eq!( diff --git a/sync-server/src/storage/inmemory.rs b/sync-server/src/storage/inmemory.rs index 7aef5f98c..03b36c47a 100644 --- a/sync-server/src/storage/inmemory.rs +++ b/sync-server/src/storage/inmemory.rs @@ -1,4 +1,4 @@ -use super::{Client, Storage, StorageTxn, Uuid, Version}; +use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version}; use std::collections::HashMap; use std::sync::{Mutex, MutexGuard}; @@ -6,6 +6,9 @@ struct Inner { /// Clients, indexed by client_key clients: HashMap, + /// Snapshot data, indexed by client key + snapshots: HashMap>, + /// Versions, indexed by (client_key, version_id) versions: HashMap<(Uuid, Uuid), Version>, @@ -20,6 +23,7 @@ impl InMemoryStorage { pub fn new() -> Self { Self(Mutex::new(Inner { clients: HashMap::new(), + snapshots: HashMap::new(), versions: HashMap::new(), children: HashMap::new(), })) @@ -46,23 +50,44 @@ impl<'a> StorageTxn for InnerTxn<'a> { if self.0.clients.get(&client_key).is_some() { return Err(anyhow::anyhow!("Client {} already exists", client_key)); } - self.0 - .clients - .insert(client_key, Client { latest_version_id }); + self.0.clients.insert( + client_key, + Client { + latest_version_id, + snapshot: None, + }, + ); Ok(()) } - fn set_client_latest_version_id( + fn set_snapshot( &mut self, client_key: Uuid, - latest_version_id: Uuid, + snapshot: Snapshot, + data: Vec, ) -> anyhow::Result<()> { - if let Some(client) = self.0.clients.get_mut(&client_key) { - client.latest_version_id = latest_version_id; - Ok(()) - } else { - Err(anyhow::anyhow!("Client {} does not exist", client_key)) + let mut client = self + .0 + .clients + .get_mut(&client_key) + .ok_or_else(|| anyhow::anyhow!("no such client"))?; + client.snapshot = Some(snapshot); + self.0.snapshots.insert(client_key, data); + Ok(()) + } + + fn get_snapshot_data( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result>> { + // sanity check + let client = self.0.clients.get(&client_key); + let client = client.ok_or_else(|| anyhow::anyhow!("no such client"))?; + if Some(&version_id) != client.snapshot.as_ref().map(|snap| &snap.version_id) { + return Err(anyhow::anyhow!("unexpected snapshot_version_id")); } + Ok(self.0.snapshots.get(&client_key).cloned()) } fn get_version_by_parent( @@ -102,12 +127,21 @@ impl<'a> StorageTxn for InnerTxn<'a> { parent_version_id, history_segment, }; + + if let Some(client) = self.0.clients.get_mut(&client_key) { + client.latest_version_id = version_id; + if let Some(ref mut snap) = client.snapshot { + snap.versions_since += 1; + } + } else { + return Err(anyhow::anyhow!("Client {} does not exist", client_key)); + } + self.0 .children - .insert((client_key, version.parent_version_id), version.version_id); - self.0 - .versions - .insert((client_key, version.version_id), version); + .insert((client_key, parent_version_id), version_id); + self.0.versions.insert((client_key, version_id), version); + Ok(()) } @@ -119,15 +153,7 @@ impl<'a> StorageTxn for InnerTxn<'a> { #[cfg(test)] mod test { use super::*; - - #[test] - fn test_emtpy_dir() -> anyhow::Result<()> { - let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let maybe_client = txn.get_client(Uuid::new_v4())?; - assert!(maybe_client.is_none()); - Ok(()) - } + use chrono::Utc; #[test] fn test_get_client_empty() -> anyhow::Result<()> { @@ -149,12 +175,25 @@ mod test { let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); let latest_version_id = Uuid::new_v4(); - txn.set_client_latest_version_id(client_key, latest_version_id)?; + txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?; let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: Utc::now(), + versions_since: 4, + }; + txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + assert_eq!(client.snapshot.unwrap(), snap); Ok(()) } @@ -177,6 +216,8 @@ mod test { let version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); let history_segment = b"abc".to_vec(); + + txn.new_client(client_key, parent_version_id)?; txn.add_version( client_key, version_id, @@ -200,4 +241,47 @@ mod test { Ok(()) } + + #[test] + fn test_snapshots() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + + txn.new_client(client_key, Uuid::new_v4())?; + assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: Utc::now(), + versions_since: 3, + }; + txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(), + vec![9, 8, 9] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap)); + + let snap2 = Snapshot { + version_id: Uuid::new_v4(), + timestamp: Utc::now(), + versions_since: 10, + }; + txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap2.version_id)? + .unwrap(), + vec![0, 2, 4, 6] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2)); + + // check that mismatched version is detected + assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err()); + + Ok(()) + } } diff --git a/sync-server/src/storage/mod.rs b/sync-server/src/storage/mod.rs index 121999892..c52624898 100644 --- a/sync-server/src/storage/mod.rs +++ b/sync-server/src/storage/mod.rs @@ -1,4 +1,4 @@ -use serde::{Deserialize, Serialize}; +use chrono::{DateTime, Utc}; use uuid::Uuid; #[cfg(debug_assertions)] @@ -10,12 +10,27 @@ pub use inmemory::InMemoryStorage; mod sqlite; pub use self::sqlite::SqliteStorage; -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug)] pub struct Client { + /// The latest version for this client (may be the nil version) pub latest_version_id: Uuid, + /// Data about the latest snapshot for this client + pub snapshot: Option, } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug)] +pub struct Snapshot { + /// ID of the version at which this snapshot was made + pub version_id: Uuid, + + /// Timestamp at which this snapshot was set + pub timestamp: DateTime, + + /// Number of versions since this snapshot was made + pub versions_since: u32, +} + +#[derive(Clone, PartialEq, Debug)] pub struct Version { pub version_id: Uuid, pub parent_version_id: Uuid, @@ -29,13 +44,22 @@ pub trait StorageTxn { /// Create a new client with the given latest_version_id fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()>; - /// Set the client's latest_version_id - fn set_client_latest_version_id( + /// Set the client's most recent snapshot. + fn set_snapshot( &mut self, client_key: Uuid, - latest_version_id: Uuid, + snapshot: Snapshot, + data: Vec, ) -> anyhow::Result<()>; + /// Get the data for the most recent snapshot. The version_id + /// is used to verify that the snapshot is for the correct version. + fn get_snapshot_data( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result>>; + /// Get a version, indexed by parent version id fn get_version_by_parent( &mut self, @@ -50,7 +74,9 @@ pub trait StorageTxn { version_id: Uuid, ) -> anyhow::Result>; - /// Add a version (that must not already exist) + /// Add a version (that must not already exist), and + /// - update latest_version_id + /// - increment snapshot.versions_since fn add_version( &mut self, client_key: Uuid, diff --git a/sync-server/src/storage/sqlite.rs b/sync-server/src/storage/sqlite.rs index e1310a55c..47a07014a 100644 --- a/sync-server/src/storage/sqlite.rs +++ b/sync-server/src/storage/sqlite.rs @@ -1,5 +1,6 @@ -use super::{Client, Storage, StorageTxn, Uuid, Version}; +use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version}; use anyhow::Context; +use chrono::{TimeZone, Utc}; use rusqlite::types::{FromSql, ToSql}; use rusqlite::{params, Connection, OptionalExtension}; use std::path::Path; @@ -30,24 +31,6 @@ impl ToSql for StoredUuid { } } -/// Stores [`Client`] in SQLite -impl FromSql for Client { - fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { - let o: Client = serde_json::from_str(value.as_str()?) - .map_err(|_| rusqlite::types::FromSqlError::InvalidType)?; - Ok(o) - } -} - -/// Parses Operation stored as JSON in string column -impl ToSql for Client { - fn to_sql(&self) -> rusqlite::Result> { - let s = serde_json::to_string(&self) - .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?; - Ok(s.into()) - } -} - /// An on-disk storage backend which uses SQLite pub struct SqliteStorage { db_file: std::path::PathBuf, @@ -69,12 +52,19 @@ impl SqliteStorage { let txn = con.transaction()?; let queries = vec![ - "CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);", + "CREATE TABLE IF NOT EXISTS clients ( + client_key STRING PRIMARY KEY, + latest_version_id STRING, + snapshot_version_id STRING, + versions_since_snapshot INTEGER, + snapshot_timestamp INTEGER, + snapshot BLOB);", "CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_key STRING, parent_version_id STRING, history_segment BLOB);", "CREATE INDEX IF NOT EXISTS versions_by_parent ON versions (parent_version_id);", ]; for q in queries { - txn.execute(q, []).context("Creating table")?; + txn.execute(q, []) + .context("Error while creating SQLite tables")?; } txn.commit()?; } @@ -126,7 +116,7 @@ impl Txn { }, ) .optional() - .context("Get version query")?; + .context("Error getting version")?; Ok(r) } } @@ -136,17 +126,42 @@ impl StorageTxn for Txn { let t = self.get_txn()?; let result: Option = t .query_row( - "SELECT latest_version_id FROM clients WHERE client_key = ? LIMIT 1", + "SELECT + latest_version_id, + snapshot_timestamp, + versions_since_snapshot, + snapshot_version_id + FROM clients + WHERE client_key = ? + LIMIT 1", [&StoredUuid(client_key)], |r| { let latest_version_id: StoredUuid = r.get(0)?; + let snapshot_timestamp: Option = r.get(1)?; + let versions_since_snapshot: Option = r.get(2)?; + let snapshot_version_id: Option = r.get(3)?; + + // if all of the relevant fields are non-NULL, return a snapshot + let snapshot = match ( + snapshot_timestamp, + versions_since_snapshot, + snapshot_version_id, + ) { + (Some(ts), Some(vs), Some(v)) => Some(Snapshot { + version_id: v.0, + timestamp: Utc.timestamp(ts, 0), + versions_since: vs, + }), + _ => None, + }; Ok(Client { latest_version_id: latest_version_id.0, + snapshot, }) }, ) .optional() - .context("Get client query")?; + .context("Error getting client")?; Ok(result) } @@ -158,18 +173,66 @@ impl StorageTxn for Txn { "INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)", params![&StoredUuid(client_key), &StoredUuid(latest_version_id)], ) - .context("Create client query")?; + .context("Error creating/updating client")?; t.commit()?; Ok(()) } - fn set_client_latest_version_id( + fn set_snapshot( &mut self, client_key: Uuid, - latest_version_id: Uuid, + snapshot: Snapshot, + data: Vec, ) -> anyhow::Result<()> { - // Implementation is same as new_client - self.new_client(client_key, latest_version_id) + let t = self.get_txn()?; + + t.execute( + "UPDATE clients + SET + snapshot_version_id = ?, + snapshot_timestamp = ?, + versions_since_snapshot = ?, + snapshot = ? + WHERE client_key = ?", + params![ + &StoredUuid(snapshot.version_id), + snapshot.timestamp.timestamp(), + snapshot.versions_since, + data, + &StoredUuid(client_key), + ], + ) + .context("Error creating/updating snapshot")?; + t.commit()?; + Ok(()) + } + + fn get_snapshot_data( + &mut self, + client_key: Uuid, + version_id: Uuid, + ) -> anyhow::Result>> { + let t = self.get_txn()?; + let r = t + .query_row( + "SELECT snapshot, snapshot_version_id FROM clients WHERE client_key = ?", + params![&StoredUuid(client_key)], + |r| { + let v: StoredUuid = r.get("snapshot_version_id")?; + let d: Vec = r.get("snapshot")?; + Ok((v.0, d)) + }, + ) + .optional() + .context("Error getting snapshot")?; + r.map(|(v, d)| { + if v != version_id { + return Err(anyhow::anyhow!("unexpected snapshot_version_id")); + } + + Ok(d) + }) + .transpose() } fn get_version_by_parent( @@ -182,6 +245,7 @@ impl StorageTxn for Txn { client_key, parent_version_id) } + fn get_version( &mut self, client_key: Uuid, @@ -209,9 +273,19 @@ impl StorageTxn for Txn { StoredUuid(client_key), StoredUuid(parent_version_id), history_segment - ], + ] ) - .context("Add version query")?; + .context("Error adding version")?; + t.execute( + "UPDATE clients + SET + latest_version_id = ?, + versions_since_snapshot = versions_since_snapshot + 1 + WHERE client_key = ?", + params![StoredUuid(version_id), StoredUuid(client_key),], + ) + .context("Error updating client for new version")?; + t.commit()?; Ok(()) } @@ -228,6 +302,7 @@ impl StorageTxn for Txn { #[cfg(test)] mod test { use super::*; + use chrono::DateTime; use pretty_assertions::assert_eq; use tempfile::TempDir; @@ -264,12 +339,25 @@ mod test { let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); let latest_version_id = Uuid::new_v4(); - txn.set_client_latest_version_id(client_key, latest_version_id)?; + txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?; let client = txn.get_client(client_key)?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); + assert!(client.snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: "2014-11-28T12:00:09Z".parse::>().unwrap(), + versions_since: 4, + }; + txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + assert_eq!(client.snapshot.unwrap(), snap); Ok(()) } @@ -317,4 +405,48 @@ mod test { Ok(()) } + + #[test] + fn test_snapshots() -> anyhow::Result<()> { + let tmp_dir = TempDir::new()?; + let storage = SqliteStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + + txn.new_client(client_key, Uuid::new_v4())?; + assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none()); + + let snap = Snapshot { + version_id: Uuid::new_v4(), + timestamp: "2013-10-08T12:00:09Z".parse::>().unwrap(), + versions_since: 3, + }; + txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(), + vec![9, 8, 9] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap)); + + let snap2 = Snapshot { + version_id: Uuid::new_v4(), + timestamp: "2014-11-28T12:00:09Z".parse::>().unwrap(), + versions_since: 10, + }; + txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?; + + assert_eq!( + txn.get_snapshot_data(client_key, snap2.version_id)? + .unwrap(), + vec![0, 2, 4, 6] + ); + assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2)); + + // check that mismatched version is detected + assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err()); + + Ok(()) + } }