diff --git a/Cargo.lock b/Cargo.lock index 6b649fe16..5d4662bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2217,8 +2217,12 @@ version = "0.1.0" dependencies = [ "actix-rt", "actix-web", + "clap", "failure", "futures", + "kv", + "serde", + "tempdir", "uuid", ] diff --git a/docs/src/usage.md b/docs/src/usage.md index 37123bfd2..f60e07e07 100644 --- a/docs/src/usage.md +++ b/docs/src/usage.md @@ -30,5 +30,5 @@ The following configuration parameters are available: ## `taskchampion-sync-server` Run `taskchampion-sync-server` to start the sync server. -It serves on port 8080 on all interfaces, using an in-memory database (meaning that all data is lost when the process exits). -Requests for previously-unknown clients automatically create the client. +Use `--port` to specify the port it should listen on, and `--data-dir` to specify the directory which it should store its data. +It only serves HTTP; the expectation is that a frontend proxy will be used for HTTPS support. diff --git a/sync-server/Cargo.toml b/sync-server/Cargo.toml index a72fd691c..7047a3028 100644 --- a/sync-server/Cargo.toml +++ b/sync-server/Cargo.toml @@ -11,6 +11,10 @@ uuid = { version = "^0.8.1", features = ["serde", "v4"] } actix-web = "^3.3.0" failure = "^0.1.8" futures = "^0.3.8" +serde = "^1.0.104" +kv = {version = "^0.10.0", features = ["msgpack-value"]} +clap = "^2.33.0" [dev-dependencies] actix-rt = "^1.1.1" +tempdir = "^0.3.7" diff --git a/sync-server/src/main.rs b/sync-server/src/main.rs index 7b91c5e3c..17821e36e 100644 --- a/sync-server/src/main.rs +++ b/sync-server/src/main.rs @@ -1,6 +1,8 @@ -use crate::storage::{InMemoryStorage, Storage}; +use crate::storage::{KVStorage, Storage}; use actix_web::{get, web, App, HttpServer, Responder, Scope}; use api::{api_scope, ServerState}; +use clap::Arg; +use failure::Fallible; mod api; mod server; @@ -24,14 +26,44 @@ pub(crate) fn app_scope(server_state: ServerState) -> Scope { } #[actix_web::main] -async fn main() -> std::io::Result<()> { - let server_box: Box = Box::new(InMemoryStorage::new()); +async fn main() -> Fallible<()> { + let matches = clap::App::new("taskchampion-sync-server") + .version("0.1.0") + .about("Server for TaskChampion") + .arg( + Arg::with_name("port") + .short("p") + .long("port") + .value_name("PORT") + .help("Port on which to serve") + .default_value("8080") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("data-dir") + .short("d") + .long("data-dir") + .value_name("DIR") + .help("Directory in which to store data") + .default_value("/var/lib/taskchampion-sync-server") + .takes_value(true) + .required(true), + ) + .get_matches(); + + let data_dir = matches.value_of("data-dir").unwrap(); + let port = matches.value_of("port").unwrap(); + + let server_box: Box = Box::new(KVStorage::new(data_dir)?); let server_state = ServerState::new(server_box); + println!("Serving on port {}", port); HttpServer::new(move || App::new().service(app_scope(server_state.clone()))) - .bind("127.0.0.1:8080")? + .bind(format!("0.0.0.0:{}", port))? .run() - .await + .await?; + Ok(()) } #[cfg(test)] diff --git a/sync-server/src/storage/kv.rs b/sync-server/src/storage/kv.rs new file mode 100644 index 000000000..c704955c4 --- /dev/null +++ b/sync-server/src/storage/kv.rs @@ -0,0 +1,244 @@ +#![allow(dead_code, unused_variables)] // temporary +use super::{Client, Storage, StorageTxn, Uuid, Version}; +use failure::Fallible; +use kv::msgpack::Msgpack; +use kv::{Bucket, Config, Error, Serde, Store, ValueBuf}; +use std::path::Path; + +/// Key for versions: concatenation of client_id and parent_version_id +type VersionKey = [u8; 32]; + +fn version_key(client_id: Uuid, parent_version_id: Uuid) -> VersionKey { + let mut key = [0u8; 32]; + key[..16].clone_from_slice(client_id.as_bytes()); + key[16..].clone_from_slice(parent_version_id.as_bytes()); + key +} + +/// Key for clients: just the client_id +type ClientKey = [u8; 16]; + +fn client_key(client_id: Uuid) -> ClientKey { + *client_id.as_bytes() +} + +/// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate. +pub(crate) struct KVStorage<'t> { + store: Store, + clients_bucket: Bucket<'t, ClientKey, ValueBuf>>, + versions_bucket: Bucket<'t, VersionKey, ValueBuf>>, +} + +impl<'t> KVStorage<'t> { + pub fn new>(directory: P) -> Fallible> { + let mut config = Config::default(directory); + config.bucket("clients", None); + config.bucket("versions", None); + + let store = Store::new(config)?; + + let clients_bucket = + store.bucket::>>(Some("clients"))?; + let versions_bucket = + store.bucket::>>(Some("versions"))?; + + Ok(KVStorage { + store, + clients_bucket, + versions_bucket, + }) + } +} + +impl<'t> Storage for KVStorage<'t> { + fn txn<'a>(&'a self) -> Fallible> { + Ok(Box::new(Txn { + storage: self, + txn: Some(self.store.write_txn()?), + })) + } +} + +struct Txn<'t> { + storage: &'t KVStorage<'t>, + txn: Option>, +} + +impl<'t> Txn<'t> { + // get the underlying kv Txn + fn kvtxn(&mut self) -> &mut kv::Txn<'t> { + if let Some(ref mut txn) = self.txn { + txn + } else { + panic!("cannot use transaction after commit"); + } + } + + fn clients_bucket(&self) -> &'t Bucket<'t, ClientKey, ValueBuf>> { + &self.storage.clients_bucket + } + fn versions_bucket(&self) -> &'t Bucket<'t, VersionKey, ValueBuf>> { + &self.storage.versions_bucket + } +} + +impl<'t> StorageTxn for Txn<'t> { + fn get_client(&mut self, client_id: Uuid) -> Fallible> { + let key = client_key(client_id); + let bucket = self.clients_bucket(); + let kvtxn = self.kvtxn(); + + let client = match kvtxn.get(&bucket, key) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(None), + Err(e) => return Err(e.into()), + } + .inner()? + .to_serde(); + Ok(Some(client)) + } + + fn new_client(&mut self, client_id: Uuid, latest_version_id: Uuid) -> Fallible<()> { + let key = client_key(client_id); + let bucket = self.clients_bucket(); + let kvtxn = self.kvtxn(); + let client = Client { latest_version_id }; + kvtxn.set(&bucket, key, Msgpack::to_value_buf(client)?)?; + Ok(()) + } + + fn set_client_latest_version_id( + &mut self, + client_id: Uuid, + latest_version_id: Uuid, + ) -> Fallible<()> { + // implementation is the same as new_client.. + self.new_client(client_id, latest_version_id) + } + + fn get_version_by_parent( + &mut self, + client_id: Uuid, + parent_version_id: Uuid, + ) -> Fallible> { + let key = version_key(client_id, parent_version_id); + let bucket = self.versions_bucket(); + let kvtxn = self.kvtxn(); + let version = match kvtxn.get(&bucket, key) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(None), + Err(e) => return Err(e.into()), + } + .inner()? + .to_serde(); + Ok(Some(version)) + } + + fn add_version( + &mut self, + client_id: Uuid, + version_id: Uuid, + parent_version_id: Uuid, + history_segment: Vec, + ) -> Fallible<()> { + let key = version_key(client_id, parent_version_id); + let bucket = self.versions_bucket(); + let kvtxn = self.kvtxn(); + let version = Version { + version_id, + parent_version_id, + history_segment, + }; + kvtxn.set(&bucket, key, Msgpack::to_value_buf(version)?)?; + Ok(()) + } + + fn commit(&mut self) -> Fallible<()> { + if let Some(kvtxn) = self.txn.take() { + kvtxn.commit()?; + } else { + panic!("transaction already committed"); + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use failure::Fallible; + use tempdir::TempDir; + + #[test] + fn test_get_client_empty() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let storage = KVStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + let maybe_client = txn.get_client(Uuid::new_v4())?; + assert!(maybe_client.is_none()); + Ok(()) + } + + #[test] + fn test_client_storage() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let storage = KVStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + + let client_id = Uuid::new_v4(); + let latest_version_id = Uuid::new_v4(); + txn.new_client(client_id, latest_version_id)?; + + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + + let latest_version_id = Uuid::new_v4(); + txn.set_client_latest_version_id(client_id, latest_version_id)?; + + let client = txn.get_client(client_id)?.unwrap(); + assert_eq!(client.latest_version_id, latest_version_id); + + Ok(()) + } + + #[test] + fn test_gvbp_empty() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let storage = KVStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?; + assert!(maybe_version.is_none()); + Ok(()) + } + + #[test] + fn test_add_version_and_gvbp() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let storage = KVStorage::new(&tmp_dir.path())?; + let mut txn = storage.txn()?; + + let client_id = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + let history_segment = b"abc".to_vec(); + txn.add_version( + client_id, + version_id, + parent_version_id, + history_segment.clone(), + )?; + let version = txn + .get_version_by_parent(client_id, parent_version_id)? + .unwrap(); + + assert_eq!( + version, + Version { + version_id, + parent_version_id, + history_segment, + } + ); + Ok(()) + } +} diff --git a/sync-server/src/storage/mod.rs b/sync-server/src/storage/mod.rs index 1462a3769..2a95917b9 100644 --- a/sync-server/src/storage/mod.rs +++ b/sync-server/src/storage/mod.rs @@ -1,15 +1,21 @@ use failure::Fallible; +use serde::{Deserialize, Serialize}; use uuid::Uuid; +#[cfg(test)] mod inmemory; +#[cfg(test)] pub(crate) use inmemory::InMemoryStorage; -#[derive(Clone, PartialEq, Debug)] +mod kv; +pub(crate) use self::kv::KVStorage; + +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub(crate) struct Client { pub(crate) latest_version_id: Uuid, } -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub(crate) struct Version { pub(crate) version_id: Uuid, pub(crate) parent_version_id: Uuid,