diff --git a/Cargo.lock b/Cargo.lock index c4ba8516d..75eabf642 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,8 +2228,11 @@ dependencies = [ "futures", "kv", "log", + "rusqlite", "serde", + "serde_json", "tempfile", + "thiserror", "uuid", ] diff --git a/sync-server/Cargo.toml b/sync-server/Cargo.toml index 2a42298be..c9d665ba0 100644 --- a/sync-server/Cargo.toml +++ b/sync-server/Cargo.toml @@ -10,12 +10,15 @@ edition = "2018" uuid = { version = "^0.8.2", features = ["serde", "v4"] } actix-web = "^3.3.2" anyhow = "1.0" +thiserror = "1.0" futures = "^0.3.8" serde = "^1.0.125" +serde_json = "^1.0" kv = {version = "^0.10.0", features = ["msgpack-value"]} clap = "^2.33.0" log = "^0.4.14" env_logger = "^0.8.3" +rusqlite = { version = "0.25", features = ["bundled"] } [dev-dependencies] actix-rt = "^2.2.0" diff --git a/sync-server/src/storage/mod.rs b/sync-server/src/storage/mod.rs index 1d1cbe139..41fb400e9 100644 --- a/sync-server/src/storage/mod.rs +++ b/sync-server/src/storage/mod.rs @@ -6,6 +6,9 @@ mod inmemory; #[cfg(test)] pub(crate) use inmemory::InMemoryStorage; +mod sqlite; +pub(crate) use self::sqlite::SqliteStorage; + mod kv; pub(crate) use self::kv::KvStorage; diff --git a/sync-server/src/storage/sqlite.rs b/sync-server/src/storage/sqlite.rs new file mode 100644 index 000000000..a6eaa0df4 --- /dev/null +++ b/sync-server/src/storage/sqlite.rs @@ -0,0 +1,270 @@ +use super::{Client, Storage, StorageTxn, Uuid, Version}; +use rusqlite::types::{FromSql, ToSql}; +use rusqlite::{params, Connection, OptionalExtension}; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use anyhow::Context; + +#[derive(Debug, thiserror::Error)] +enum SqliteError { + #[error("SQLite transaction already committted")] + TransactionAlreadyCommitted, +} + + +/// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid` +struct StoredUuid(Uuid); + +/// Conversion from Uuid stored as a string (rusqlite's uuid feature stores as binary blob) +impl FromSql for StoredUuid { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { + let u = Uuid::parse_str(value.as_str()?) + .map_err(|_| rusqlite::types::FromSqlError::InvalidType)?; + Ok(StoredUuid(u)) + } +} + +/// Store Uuid as string in database +impl ToSql for StoredUuid { + fn to_sql(&self) -> rusqlite::Result> { + let s = self.0.to_string(); + Ok(s.into()) + } +} + +/// Stores [`Client`] in SQLite +impl FromSql for Client { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { + let o: Client = serde_json::from_str(value.as_str()?) + .map_err(|_| rusqlite::types::FromSqlError::InvalidType)?; + Ok(o) + } +} + +/// Parsers Operation stored as JSON in string column +impl ToSql for Client { + fn to_sql(&self) -> rusqlite::Result> { + let s = serde_json::to_string(&self) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?; + Ok(s.into()) + } +} + + + +/// DB Key for versions: concatenation of client_key and parent_version_id +type VersionDbKey = [u8; 32]; + +fn version_db_key(client_key: Uuid, parent_version_id: Uuid) -> VersionDbKey { + let mut key = [0u8; 32]; + key[..16].clone_from_slice(client_key.as_bytes()); + key[16..].clone_from_slice(parent_version_id.as_bytes()); + key +} + +/// Key for clients: just the client_key +type ClientDbKey = [u8; 16]; + +fn client_db_key(client_key: Uuid) -> ClientDbKey { + *client_key.as_bytes() +} + +/// An on-disk storage backend which uses SQLite +pub(crate) struct SqliteStorage { + db_file: std::path::PathBuf, +} + +impl SqliteStorage { + fn new_connection(&self) -> anyhow::Result { + Ok(Connection::open(&self.db_file)?) + } + + pub fn new>(directory: P) -> anyhow::Result { + let db_file = directory.as_ref().join("taskchampion-sync-server.sqlite3"); + + let o = SqliteStorage { db_file }; + + { + let mut con = o.new_connection()?; + let txn = con.transaction()?; + + let queries = vec![ + "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY AUTOINCREMENT, data STRING);", + ]; + for q in queries { + txn.execute(q, []).context("Creating table")?; + } + txn.commit()?; + } + + Ok(o) + } +} + +impl Storage for SqliteStorage { + fn txn<'a>(&'a self) -> anyhow::Result> { + let mut con = self.new_connection()?; + let mut t = Txn{con, txn: None}; + Ok(Box::new(t)) + } +} + +struct Txn<'t> { + con: Connection, + txn: Option<&'t rusqlite::Transaction<'t>>, +} + +impl <'t>Txn<'t> { + fn get_txn(&mut self) -> Result<&'t rusqlite::Transaction, SqliteError> { + Ok(&self.con.transaction().unwrap()) + } +} + + +impl <'t>StorageTxn for Txn<'t> { + fn get_client(&mut self, client_key: Uuid) -> anyhow::Result> { + let t = self.get_txn()?; + let result: Option = t + .query_row( + "SELECT data FROM clients WHERE client_key = ? LIMIT 1", + [&StoredUuid(client_key)], + |r| r.get("data"), + ) + .optional()?; + + Ok(result) + } + + fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> { + let t = self.get_txn()?; + + let client = Client{ latest_version_id }; + t.execute( + "INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)", + params![&StoredUuid(latest_version_id), &client], + ) + .context("Create client query")?; + Ok(()) + } + + fn set_client_latest_version_id( + &mut self, + client_key: Uuid, + latest_version_id: Uuid, + ) -> anyhow::Result<()> { + // Implementation is same as new_client + self.new_client(client_key, latest_version_id) + } + + fn get_version_by_parent( + &mut self, + client_key: Uuid, + parent_version_id: Uuid, + ) -> anyhow::Result> { + todo!() + } + + fn add_version( + &mut self, + client_key: Uuid, + version_id: Uuid, + parent_version_id: Uuid, + history_segment: Vec, + ) -> anyhow::Result<()> { + let version = Version { + version_id, + parent_version_id, + history_segment, + }; + todo!(); + Ok(()) + } + + fn commit(&mut self) -> anyhow::Result<()> { + let t = self + .txn + .take() + .ok_or(SqliteError::TransactionAlreadyCommitted)?; + t.commit().context("Committing transaction")?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_get_client_empty() -> anyhow::Result<()> { + let tmp_dir = TempDir::new()?; + let storage = SqliteStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + let maybe_client = txn.get_client(Uuid::new_v4())?; + assert!(maybe_client.is_none()); + Ok(()) + } + + #[test] + fn test_client_storage() -> anyhow::Result<()> { + let tmp_dir = TempDir::new()?; + let storage = SqliteStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + let latest_version_id = Uuid::new_v4(); + txn.new_client(client_key, latest_version_id)?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + + let latest_version_id = Uuid::new_v4(); + txn.set_client_latest_version_id(client_key, latest_version_id)?; + + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + + Ok(()) + } + + #[test] + fn test_gvbp_empty() -> anyhow::Result<()> { + let tmp_dir = TempDir::new()?; + let storage = SqliteStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?; + assert!(maybe_version.is_none()); + Ok(()) + } + + #[test] + fn test_add_version_and_gvbp() -> anyhow::Result<()> { + let tmp_dir = TempDir::new()?; + let storage = SqliteStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + let history_segment = b"abc".to_vec(); + txn.add_version( + client_key, + version_id, + parent_version_id, + history_segment.clone(), + )?; + let version = txn + .get_version_by_parent(client_key, parent_version_id)? + .unwrap(); + + assert_eq!( + version, + Version { + version_id, + parent_version_id, + history_segment, + } + ); + Ok(()) + } +}