diff --git a/Cargo.lock b/Cargo.lock index d48a1eaeb..2aa865cc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,12 +163,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "kv" -version = "0.9.3" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "lmdb-rkv 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp-serde 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", + "thiserror 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -270,6 +271,18 @@ dependencies = [ "proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rand" version = "0.6.5" @@ -429,6 +442,25 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rmp" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rmp-serde" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rustc-demangle" version = "0.1.16" @@ -511,13 +543,24 @@ dependencies = [ "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "kv 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", + "kv 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lmdb-rkv 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", "proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", + "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tempfile" version = "3.1.0" @@ -539,6 +582,24 @@ dependencies = [ "unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "thiserror" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "thiserror-impl 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.42" @@ -635,7 +696,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" "checksum getrandom 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "e7db7ca94ed4cd01190ceee0d8a8052f08a247aa1b469a7f68c6a3b71afcf407" "checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f" -"checksum kv 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "db74e838988c38867eac475ff9793b34ee520618c73cad9dc5a450caa4f5a5e6" +"checksum kv 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cb79e59d356a5ae85b13990bbb3649a293d64df1ca6e7890822076186527a9f7" "checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" "checksum libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)" = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558" "checksum lmdb-rkv 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "605061e5465304475be2041f19967a900175ea1b6d8f47fbab84a84fb8c48452" @@ -648,6 +709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cf147e022eacf0c8a054ab864914a7602618adba841d800a9a9868a5237a529f" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" "checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe" +"checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" "checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" "checksum rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3ae1b169243eaf61759b8475a998f0a385e42042370f3a7dbaf35246eacc8412" "checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" @@ -666,6 +728,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" "checksum regex-syntax 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "11a7e20d1cce64ef2fed88b66d347f88bd9babb82845b2b858f3edbf59a4f716" "checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +"checksum rmp 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)" = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f" +"checksum rmp-serde 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4a31c0798045f039ace94e0166f76478b3ba83116ec7c9d4bc934c5b13b8df21" "checksum rustc-demangle 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" "checksum rusty-fork 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3dd93264e10c577503e926bd1430193eeb5d21b059148910082245309b424fae" "checksum ryu 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bfa8506c1de11c9c4e4c38863ccbe02a305c8188e85a05a784c9e11e1c3910c8" @@ -675,8 +739,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" "checksum syn 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)" = "ddc157159e2a7df58cd67b1cace10b8ed256a404fb0070593f137d8ba6bef4de" "checksum synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545" +"checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" "checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +"checksum thiserror 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6f357d1814b33bc2dc221243f8424104bfe72dbe911d5b71b3816a2dff1c977e" +"checksum thiserror-impl 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2e25d25307eb8436894f727aba8f65d07adf02e5b35a13cebed48bd282bfef" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum toml 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "01d1404644c8b12b16bfcffa4322403a91a451584daaaa7c28d3152e6cbc98cf" "checksum unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" diff --git a/Cargo.toml b/Cargo.toml index c65c7e6fa..9cc649f10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,9 @@ serde_json = "1.0" chrono = { version = "0.4.10", features = ["serde"] } failure = {version = "0.1.5", features = ["derive"] } clap = "~2.33.0" -kv = "0.9.3" +kv = {version = "0.10.0", features = ["msgpack-value"]} +lmdb-rkv = {version = "0.12.3"} [dev-dependencies] proptest = "0.9.4" +tempdir = "0.3.7" diff --git a/src/lib.rs b/src/lib.rs index 5c5f3aba6..8566989cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ // TODO: remove this eventually when there's an API #![allow(dead_code)] +#![allow(unused_variables)] #[macro_use] extern crate failure; diff --git a/src/replica.rs b/src/replica.rs index dc07b3290..c2df51fc8 100644 --- a/src/replica.rs +++ b/src/replica.rs @@ -47,17 +47,17 @@ impl Replica { } /// Get all tasks as an iterator of (&Uuid, &HashMap) - pub fn all_tasks<'a>(&'a self) -> Fallible + 'a> { + pub fn all_tasks<'a>(&'a mut self) -> Fallible> { self.taskdb.all_tasks() } /// Get the UUIDs of all tasks - pub fn all_task_uuids<'a>(&'a self) -> Fallible + 'a> { + pub fn all_task_uuids<'a>(&'a mut self) -> Fallible> { self.taskdb.all_task_uuids() } /// Get an existing task by its UUID - pub fn get_task(&self, uuid: &Uuid) -> Fallible> { + pub fn get_task(&mut self, uuid: &Uuid) -> Fallible> { self.taskdb.get_task(&uuid) } } @@ -102,7 +102,7 @@ mod tests { #[test] fn get_does_not_exist() { - let rep = Replica::new(DB::new_inmemory().into()); + let mut rep = Replica::new(DB::new_inmemory().into()); let uuid = Uuid::new_v4(); assert_eq!(rep.get_task(&uuid).unwrap(), None); } diff --git a/src/taskdb.rs b/src/taskdb.rs index ba448dafc..20a8b74c3 100644 --- a/src/taskdb.rs +++ b/src/taskdb.rs @@ -1,14 +1,12 @@ use crate::errors::Error; use crate::operation::Operation; use crate::server::{Server, VersionAdd}; -use crate::taskstorage::{TaskMap, TaskStorage}; +use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn}; use failure::Fallible; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::str; use uuid::Uuid; -#[derive(Debug)] pub struct DB { storage: Box, } @@ -35,23 +33,25 @@ impl DB { /// nothing and return an error (but leave the DB in a consistent state). pub fn apply(&mut self, op: Operation) -> Fallible<()> { // TODO: differentiate error types here? - if let err @ Err(_) = self.apply_op(&op) { + let mut txn = self.storage.txn()?; + if let err @ Err(_) = DB::apply_op(txn.as_mut(), &op) { return err; } - self.storage.add_operation(op)?; + txn.add_operation(op)?; + txn.commit()?; Ok(()) } - fn apply_op(&mut self, op: &Operation) -> Fallible<()> { + fn apply_op(txn: &mut dyn TaskStorageTxn, op: &Operation) -> Fallible<()> { match op { &Operation::Create { uuid } => { // insert if the task does not already exist - if !self.storage.create_task(uuid, HashMap::new())? { + if !txn.create_task(uuid)? { return Err(Error::DBError(format!("Task {} already exists", uuid)).into()); } } &Operation::Delete { ref uuid } => { - if !self.storage.delete_task(uuid)? { + if !txn.delete_task(uuid)? { return Err(Error::DBError(format!("Task {} does not exist", uuid)).into()); } } @@ -62,13 +62,13 @@ impl DB { timestamp: _, } => { // update if this task exists, otherwise ignore - if let Some(task) = self.storage.get_task(uuid)? { + if let Some(task) = txn.get_task(uuid)? { let mut task = task.clone(); match value { Some(ref val) => task.insert(property.to_string(), val.clone()), None => task.remove(property), }; - self.storage.set_task(uuid.clone(), task)?; + txn.set_task(uuid.clone(), task)?; } else { return Err(Error::DBError(format!("Task {} does not exist", uuid)).into()); } @@ -78,42 +78,43 @@ impl DB { Ok(()) } - /// Get all tasks. This is not a terribly efficient operation. - pub fn all_tasks<'a>(&'a self) -> Fallible + 'a> { - Ok(self - .all_task_uuids()? - // TODO: don't unwrap result (just option) - .map(move |u| (u, self.get_task(&u).unwrap().unwrap()))) + /// Get all tasks. + pub fn all_tasks<'a>(&'a mut self) -> Fallible> { + let mut txn = self.storage.txn()?; + txn.all_tasks() } /// Get the UUIDs of all tasks - pub fn all_task_uuids<'a>(&'a self) -> Fallible + 'a> { - self.storage.get_task_uuids() + pub fn all_task_uuids<'a>(&'a mut self) -> Fallible> { + let mut txn = self.storage.txn()?; + txn.all_task_uuids() } /// Get a single task, by uuid. - pub fn get_task(&self, uuid: &Uuid) -> Fallible> { - self.storage.get_task(uuid) + pub fn get_task(&mut self, uuid: &Uuid) -> Fallible> { + let mut txn = self.storage.txn()?; + txn.get_task(uuid) } /// Sync to the given server, pulling remote changes and pushing local changes. pub fn sync(&mut self, username: &str, server: &mut Server) -> Fallible<()> { + let mut txn = self.storage.txn()?; + // retry synchronizing until the server accepts our version (this allows for races between // replicas trying to sync to the same server) loop { // first pull changes and "rebase" on top of them - let new_versions = server.get_versions(username, self.storage.base_version()?); + let new_versions = server.get_versions(username, txn.base_version()?); for version_blob in new_versions { let version_str = str::from_utf8(&version_blob).unwrap(); let version: Version = serde_json::from_str(version_str).unwrap(); - assert_eq!(version.version, self.storage.base_version()? + 1); + assert_eq!(version.version, txn.base_version()? + 1); println!("applying version {:?} from server", version.version); - self.apply_version(version)?; + DB::apply_version(txn.as_mut(), version)?; } - let operations: Vec = - self.storage.operations()?.map(|o| o.clone()).collect(); + let operations: Vec = txn.operations()?.iter().map(|o| o.clone()).collect(); if operations.len() == 0 { // nothing to sync back to the server.. break; @@ -121,7 +122,7 @@ impl DB { // now make a version of our local changes and push those let new_version = Version { - version: self.storage.base_version()? + 1, + version: txn.base_version()? + 1, operations: operations, }; let new_version_str = serde_json::to_string(&new_version).unwrap(); @@ -129,15 +130,16 @@ impl DB { if let VersionAdd::Ok = server.add_version(username, new_version.version, new_version_str.into()) { - self.storage.local_operations_synced(new_version.version)?; + txn.local_operations_synced(new_version.version)?; break; } } + txn.commit()?; Ok(()) } - fn apply_version(&mut self, mut version: Version) -> Fallible<()> { + fn apply_version(txn: &mut dyn TaskStorageTxn, mut version: Version) -> Fallible<()> { // The situation here is that the server has already applied all server operations, and we // have already applied all local operations, so states have diverged by several // operations. We need to figure out what operations to apply locally and on the server in @@ -163,8 +165,7 @@ impl DB { // This is slightly complicated by the fact that the transform function can return None, // indicating no operation is required. If this happens for a local op, we can just omit // it. If it happens for server op, then we must copy the remaining local ops. - let mut local_operations: Vec = - self.storage.operations()?.map(|o| o.clone()).collect(); + let mut local_operations: Vec = txn.operations()?; for server_op in version.operations.drain(..) { let mut new_local_ops = Vec::with_capacity(local_operations.len()); let mut svr_op = Some(server_op); @@ -180,40 +181,41 @@ impl DB { } } if let Some(o) = svr_op { - if let Err(e) = self.apply_op(&o) { + if let Err(e) = DB::apply_op(txn, &o) { println!("Invalid operation when syncing: {} (ignored)", e); } } local_operations = new_local_ops; } - self.storage - .update_version(version.version, local_operations)?; + txn.update_version(version.version, local_operations)?; Ok(()) } // functions for supporting tests - pub fn sorted_tasks(&self) -> Vec<(Uuid, Vec<(String, String)>)> { + pub fn sorted_tasks(&mut self) -> Vec<(Uuid, Vec<(String, String)>)> { let mut res: Vec<(Uuid, Vec<(String, String)>)> = self .all_tasks() .unwrap() + .iter() .map(|(u, t)| { let mut t = t .iter() .map(|(p, v)| (p.clone(), v.clone())) .collect::>(); t.sort(); - (u, t) + (u.clone(), t) }) .collect(); res.sort(); res } - pub fn operations(&self) -> Vec { - self.storage - .operations() + pub fn operations(&mut self) -> Vec { + let mut txn = self.storage.txn().unwrap(); + txn.operations() .unwrap() + .iter() .map(|o| o.clone()) .collect() } @@ -223,6 +225,7 @@ impl DB { mod tests { use super::*; use chrono::Utc; + use std::collections::HashMap; use uuid::Uuid; #[test] diff --git a/src/taskstorage/inmemory.rs b/src/taskstorage/inmemory.rs index 790cd341d..e85dcd054 100644 --- a/src/taskstorage/inmemory.rs +++ b/src/taskstorage/inmemory.rs @@ -1,106 +1,145 @@ use crate::operation::Operation; -use crate::taskstorage::{TaskMap, TaskStorage}; +use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn}; use failure::Fallible; use std::collections::hash_map::Entry; use std::collections::HashMap; use uuid::Uuid; #[derive(PartialEq, Debug, Clone)] -pub struct InMemoryStorage { - // The current state, with all pending operations applied +struct Data { tasks: HashMap, - - // The version at which `operations` begins base_version: u64, - - // Operations applied since `base_version`, in order. - // - // INVARIANT: Given a snapshot at `base_version`, applying these operations produces `tasks`. operations: Vec, } -impl InMemoryStorage { - pub fn new() -> InMemoryStorage { - InMemoryStorage { - tasks: HashMap::new(), - base_version: 0, - operations: vec![], +struct Txn<'t> { + storage: &'t mut InMemoryStorage, + new_data: Option, +} + +impl<'t> Txn<'t> { + fn mut_data_ref(&mut self) -> &mut Data { + if self.new_data.is_none() { + self.new_data = Some(self.storage.data.clone()); + } + if let Some(ref mut data) = self.new_data { + data + } else { + unreachable!(); + } + } + + fn data_ref(&mut self) -> &Data { + if let Some(ref data) = self.new_data { + data + } else { + &self.storage.data } } } -impl TaskStorage for InMemoryStorage { - /// Get an (immutable) task, if it is in the storage - fn get_task(&self, uuid: &Uuid) -> Fallible> { - match self.tasks.get(uuid) { +impl<'t> TaskStorageTxn for Txn<'t> { + fn get_task(&mut self, uuid: &Uuid) -> Fallible> { + match self.data_ref().tasks.get(uuid) { None => Ok(None), Some(t) => Ok(Some(t.clone())), } } - /// Create a task, only if it does not already exist. Returns true if - /// the task was created (did not already exist). - fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible { - if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) { - ent.or_insert(task); + fn create_task(&mut self, uuid: Uuid) -> Fallible { + if let ent @ Entry::Vacant(_) = self.mut_data_ref().tasks.entry(uuid) { + ent.or_insert(TaskMap::new()); Ok(true) } else { Ok(false) } } - /// Set a task, overwriting any existing task. fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> { - self.tasks.insert(uuid, task); + self.mut_data_ref().tasks.insert(uuid, task); Ok(()) } - /// Delete a task, if it exists. Returns true if the task was deleted (already existed) fn delete_task(&mut self, uuid: &Uuid) -> Fallible { - if let Some(_) = self.tasks.remove(uuid) { + if let Some(_) = self.mut_data_ref().tasks.remove(uuid) { Ok(true) } else { Ok(false) } } - fn get_task_uuids<'a>(&'a self) -> Fallible + 'a>> { - Ok(Box::new(self.tasks.keys().map(|u| u.clone()))) + fn all_tasks<'a>(&mut self) -> Fallible> { + Ok(self + .data_ref() + .tasks + .iter() + .map(|(u, t)| (u.clone(), t.clone())) + .collect()) + } + + fn all_task_uuids<'a>(&mut self) -> Fallible> { + Ok(self.data_ref().tasks.keys().map(|u| u.clone()).collect()) } - /// Add an operation to the list of operations in the storage. Note that this merely *stores* - /// the operation; it is up to the TaskDB to apply it. fn add_operation(&mut self, op: Operation) -> Fallible<()> { - self.operations.push(op); + self.mut_data_ref().operations.push(op); Ok(()) } - /// Get the current base_version for this storage -- the last version synced from the server. - fn base_version(&self) -> Fallible { - Ok(self.base_version) + fn base_version(&mut self) -> Fallible { + Ok(self.data_ref().base_version) } - /// Get the current set of outstanding operations (operations that have not been sync'd to the - /// server yet) - fn operations<'a>(&'a self) -> Fallible + 'a>> { - Ok(Box::new(self.operations.iter())) + fn operations(&mut self) -> Fallible> { + Ok(self.data_ref().operations.clone()) } - /// Apply the next version from the server. This replaces the existing base_version and - /// operations. It's up to the caller (TaskDB) to ensure this is done consistently. fn update_version(&mut self, version: u64, new_operations: Vec) -> Fallible<()> { // ensure that we are applying the versions in order.. - assert_eq!(version, self.base_version + 1); - self.base_version = version; - self.operations = new_operations; + assert_eq!(version, self.data_ref().base_version + 1); + self.mut_data_ref().base_version = version; + self.mut_data_ref().operations = new_operations; Ok(()) } - /// Record the outstanding operations as synced to the server in the given version. fn local_operations_synced(&mut self, version: u64) -> Fallible<()> { - assert_eq!(version, self.base_version + 1); - self.base_version = version; - self.operations = vec![]; + assert_eq!(version, self.data_ref().base_version + 1); + self.mut_data_ref().base_version = version; + self.mut_data_ref().operations = vec![]; + Ok(()) + } + + fn commit(&mut self) -> Fallible<()> { + // copy the new_data back into storage to commit the transaction + if let Some(data) = self.new_data.take() { + self.storage.data = data; + } Ok(()) } } + +#[derive(PartialEq, Debug, Clone)] +pub struct InMemoryStorage { + data: Data, +} + +impl InMemoryStorage { + pub fn new() -> InMemoryStorage { + InMemoryStorage { + data: Data { + tasks: HashMap::new(), + base_version: 0, + operations: vec![], + }, + } + } +} + +impl TaskStorage for InMemoryStorage { + fn txn<'a>(&'a mut self) -> Fallible> { + Ok(Box::new(Txn { + storage: self, + new_data: None, + })) + } +} diff --git a/src/taskstorage/kv.rs b/src/taskstorage/kv.rs new file mode 100644 index 000000000..4e56f697d --- /dev/null +++ b/src/taskstorage/kv.rs @@ -0,0 +1,621 @@ +use crate::operation::Operation; +use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn}; +use failure::Fallible; +use kv::msgpack::Msgpack; +use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf}; +use std::convert::TryInto; +use std::path::Path; +use uuid::Uuid; + +/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form +/// of a UUID. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +struct Key(uuid::Bytes); + +impl From<&[u8]> for Key { + fn from(bytes: &[u8]) -> Key { + let key = Key(bytes.try_into().unwrap()); + key + } +} + +impl From<&Uuid> for Key { + fn from(uuid: &Uuid) -> Key { + let key = Key(uuid.as_bytes().clone()); + key + } +} + +impl From for Key { + fn from(uuid: Uuid) -> Key { + let key = Key(uuid.as_bytes().clone()); + key + } +} + +impl From for Uuid { + fn from(key: Key) -> Uuid { + Uuid::from_bytes(key.0) + } +} + +impl AsRef<[u8]> for Key { + fn as_ref(&self) -> &[u8] { + &self.0[..] + } +} + +/// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate. +pub struct KVStorage<'t> { + store: Store, + tasks_bucket: Bucket<'t, Key, ValueBuf>>, + numbers_bucket: Bucket<'t, Integer, ValueBuf>>, + operations_bucket: Bucket<'t, Integer, ValueBuf>>, +} + +const BASE_VERSION: u64 = 1; +const NEXT_OPERATION: u64 = 2; + +impl<'t> KVStorage<'t> { + pub fn new(directory: &Path) -> Fallible { + let mut config = Config::default(directory); + config.bucket("tasks", None); + config.bucket("numbers", None); + config.bucket("operations", None); + let store = Store::new(config)?; + + // tasks are stored indexed by uuid + let tasks_bucket = store.bucket::>>(Some("tasks"))?; + + // this bucket contains various u64s, indexed by constants above + let numbers_bucket = store.int_bucket::>>(Some("numbers"))?; + + // this bucket contains operations, numbered consecutively + let operations_bucket = + store.int_bucket::>>(Some("operations"))?; + + Ok(KVStorage { + store, + tasks_bucket, + numbers_bucket, + operations_bucket, + }) + } +} + +impl<'t> TaskStorage for KVStorage<'t> { + fn txn<'a>(&'a mut self) -> Fallible> { + Ok(Box::new(Txn { + storage: self, + txn: Some(self.store.write_txn()?), + })) + } +} + +struct Txn<'t> { + storage: &'t KVStorage<'t>, + txn: Option>, +} + +impl<'t> Txn<'t> { + // get the underlying kv Txn + fn kvtxn<'a>(&mut self) -> &mut kv::Txn<'t> { + if let Some(ref mut txn) = self.txn { + txn + } else { + panic!("cannot use transaction after commit"); + } + } + + // Access to buckets + fn tasks_bucket(&self) -> &'t Bucket<'t, Key, ValueBuf>> { + &self.storage.tasks_bucket + } + fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { + &self.storage.numbers_bucket + } + fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { + &self.storage.operations_bucket + } +} + +impl<'t> TaskStorageTxn for Txn<'t> { + fn get_task(&mut self, uuid: &Uuid) -> Fallible> { + let bucket = self.tasks_bucket(); + let buf = match self.kvtxn().get(bucket, uuid.into()) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(None), + Err(e) => return Err(e.into()), + }; + let value = buf.inner()?.to_serde(); + Ok(Some(value)) + } + + fn create_task(&mut self, uuid: Uuid) -> Fallible { + let bucket = self.tasks_bucket(); + let kvtxn = self.kvtxn(); + match kvtxn.get(bucket, uuid.into()) { + Err(Error::NotFound) => { + kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(TaskMap::new())?)?; + Ok(true) + } + Err(e) => Err(e.into()), + Ok(_) => Ok(false), + } + } + + fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> { + let bucket = self.tasks_bucket(); + let kvtxn = self.kvtxn(); + kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(task)?)?; + Ok(()) + } + + fn delete_task(&mut self, uuid: &Uuid) -> Fallible { + let bucket = self.tasks_bucket(); + let kvtxn = self.kvtxn(); + match kvtxn.del(bucket, uuid.into()) { + Err(Error::NotFound) => Ok(false), + Err(e) => Err(e.into()), + Ok(_) => Ok(true), + } + } + + fn all_tasks(&mut self) -> Fallible> { + let bucket = self.tasks_bucket(); + let kvtxn = self.kvtxn(); + let curs = kvtxn.read_cursor(bucket)?; + let all_tasks: Result, Error> = kvtxn + .read_cursor(bucket)? + .iter() + .map(|(k, v)| Ok((k.into(), v.inner()?.to_serde()))) + .collect(); + Ok(all_tasks?) + } + + fn all_task_uuids(&mut self) -> Fallible> { + let bucket = self.tasks_bucket(); + let kvtxn = self.kvtxn(); + let curs = kvtxn.read_cursor(bucket)?; + Ok(kvtxn + .read_cursor(bucket)? + .iter() + .map(|(k, _)| k.into()) + .collect()) + } + + fn add_operation(&mut self, op: Operation) -> Fallible<()> { + let numbers_bucket = self.numbers_bucket(); + let operations_bucket = self.operations_bucket(); + let kvtxn = self.kvtxn(); + + let next_op = match kvtxn.get(numbers_bucket, NEXT_OPERATION.into()) { + Ok(buf) => buf.inner()?.to_serde(), + Err(Error::NotFound) => 0, + Err(e) => return Err(e.into()), + }; + + kvtxn.set( + operations_bucket, + next_op.into(), + Msgpack::to_value_buf(op)?, + )?; + kvtxn.set( + numbers_bucket, + NEXT_OPERATION.into(), + Msgpack::to_value_buf(next_op + 1)?, + )?; + Ok(()) + } + + fn base_version(&mut self) -> Fallible { + let bucket = self.numbers_bucket(); + let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(0), + Err(e) => return Err(e.into()), + } + .inner()? + .to_serde(); + Ok(base_version) + } + + fn operations(&mut self) -> Fallible> { + let bucket = self.operations_bucket(); + let kvtxn = self.kvtxn(); + let curs = kvtxn.read_cursor(bucket)?; + let all_ops: Result, Error> = kvtxn + .read_cursor(bucket)? + .iter() + .map(|(i, v)| Ok((i.into(), v.inner()?.to_serde()))) + .collect(); + let mut all_ops = all_ops?; + // sort by key.. + all_ops.sort_by(|a, b| a.0.cmp(&b.0)); + // and return the values.. + Ok(all_ops.iter().map(|(_, v)| v.clone()).collect()) + } + + fn update_version(&mut self, version: u64, new_operations: Vec) -> Fallible<()> { + let numbers_bucket = self.numbers_bucket(); + let operations_bucket = self.operations_bucket(); + let kvtxn = self.kvtxn(); + + kvtxn.clear_db(operations_bucket)?; + + let mut i = 0u64; + for op in new_operations { + kvtxn.set(operations_bucket, i.into(), Msgpack::to_value_buf(op)?)?; + i += 1; + } + + kvtxn.set( + numbers_bucket, + BASE_VERSION.into(), + Msgpack::to_value_buf(version)?, + )?; + + kvtxn.set( + numbers_bucket, + NEXT_OPERATION.into(), + Msgpack::to_value_buf(i)?, + )?; + + Ok(()) + } + + fn local_operations_synced(&mut self, version: u64) -> Fallible<()> { + let numbers_bucket = self.numbers_bucket(); + let operations_bucket = self.operations_bucket(); + let kvtxn = self.kvtxn(); + + kvtxn.clear_db(operations_bucket)?; + + kvtxn.set( + numbers_bucket, + BASE_VERSION.into(), + Msgpack::to_value_buf(version)?, + )?; + + kvtxn.set( + numbers_bucket, + NEXT_OPERATION.into(), + Msgpack::to_value_buf(0)?, + )?; + + Ok(()) + } + + fn commit(&mut self) -> Fallible<()> { + if let Some(kvtxn) = self.txn.take() { + kvtxn.commit()?; + } else { + panic!("transaction already committed"); + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::taskstorage::taskmap_with; + use failure::Fallible; + use tempdir::TempDir; + + #[test] + fn test_create() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid = Uuid::new_v4(); + { + let mut txn = storage.txn()?; + assert!(txn.create_task(uuid.clone())?); + txn.commit()?; + } + { + let mut txn = storage.txn()?; + let task = txn.get_task(&uuid)?; + assert_eq!(task, Some(taskmap_with(vec![]))); + } + Ok(()) + } + + #[test] + fn test_create_exists() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid = Uuid::new_v4(); + { + let mut txn = storage.txn()?; + assert!(txn.create_task(uuid.clone())?); + txn.commit()?; + } + { + let mut txn = storage.txn()?; + assert!(!txn.create_task(uuid.clone())?); + txn.commit()?; + } + Ok(()) + } + + #[test] + fn test_get_missing() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid = Uuid::new_v4(); + { + let mut txn = storage.txn()?; + let task = txn.get_task(&uuid)?; + assert_eq!(task, None); + } + Ok(()) + } + + #[test] + fn test_set_task() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid = Uuid::new_v4(); + { + let mut txn = storage.txn()?; + txn.set_task( + uuid.clone(), + taskmap_with(vec![("k".to_string(), "v".to_string())]), + )?; + txn.commit()?; + } + { + let mut txn = storage.txn()?; + let task = txn.get_task(&uuid)?; + assert_eq!( + task, + Some(taskmap_with(vec![("k".to_string(), "v".to_string())])) + ); + } + Ok(()) + } + + #[test] + fn test_delete_task_missing() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid = Uuid::new_v4(); + { + let mut txn = storage.txn()?; + assert!(!txn.delete_task(&uuid)?); + } + Ok(()) + } + + #[test] + fn test_delete_task_exists() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid = Uuid::new_v4(); + { + let mut txn = storage.txn()?; + assert!(txn.create_task(uuid.clone())?); + txn.commit()?; + } + { + let mut txn = storage.txn()?; + assert!(txn.delete_task(&uuid)?); + } + Ok(()) + } + + #[test] + fn test_all_tasks_empty() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + { + let mut txn = storage.txn()?; + let tasks = txn.all_tasks()?; + assert_eq!(tasks, vec![]); + } + Ok(()) + } + + #[test] + fn test_all_tasks_and_uuids() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + { + let mut txn = storage.txn()?; + assert!(txn.create_task(uuid1.clone())?); + txn.set_task( + uuid1.clone(), + taskmap_with(vec![("num".to_string(), "1".to_string())]), + )?; + assert!(txn.create_task(uuid2.clone())?); + txn.set_task( + uuid2.clone(), + taskmap_with(vec![("num".to_string(), "2".to_string())]), + )?; + txn.commit()?; + } + { + let mut txn = storage.txn()?; + let mut tasks = txn.all_tasks()?; + + // order is nondeterministic, so sort by uuid + tasks.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut exp = vec![ + ( + uuid1.clone(), + taskmap_with(vec![("num".to_string(), "1".to_string())]), + ), + ( + uuid2.clone(), + taskmap_with(vec![("num".to_string(), "2".to_string())]), + ), + ]; + exp.sort_by(|a, b| a.0.cmp(&b.0)); + + assert_eq!(tasks, exp); + } + { + let mut txn = storage.txn()?; + let mut uuids = txn.all_task_uuids()?; + uuids.sort(); + + let mut exp = vec![uuid1.clone(), uuid2.clone()]; + exp.sort(); + + assert_eq!(uuids, exp); + } + Ok(()) + } + + #[test] + fn test_base_version_default() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + { + let mut txn = storage.txn()?; + assert_eq!(txn.base_version()?, 0); + } + Ok(()) + } + + #[test] + fn test_operations() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + + // create some operations + { + let mut txn = storage.txn()?; + txn.add_operation(Operation::Create { uuid: uuid1 })?; + txn.add_operation(Operation::Create { uuid: uuid2 })?; + txn.commit()?; + } + + // read them back + { + let mut txn = storage.txn()?; + let ops = txn.operations()?; + assert_eq!( + ops, + vec![ + Operation::Create { uuid: uuid1 }, + Operation::Create { uuid: uuid2 }, + ] + ); + } + + // report them sync'd to the server + { + let mut txn = storage.txn()?; + txn.local_operations_synced(1)?; + txn.commit()?; + } + + // check that the operations are gone and the base version is incremented + { + let mut txn = storage.txn()?; + let ops = txn.operations()?; + assert_eq!(ops, vec![]); + assert_eq!(txn.base_version()?, 1); + } + + // create some more operations (to test adding operations after clearing) + { + let mut txn = storage.txn()?; + txn.add_operation(Operation::Delete { uuid: uuid2 })?; + txn.add_operation(Operation::Delete { uuid: uuid1 })?; + txn.commit()?; + } + + // read them back + { + let mut txn = storage.txn()?; + let ops = txn.operations()?; + assert_eq!( + ops, + vec![ + Operation::Delete { uuid: uuid2 }, + Operation::Delete { uuid: uuid1 }, + ] + ); + } + Ok(()) + } + + #[test] + fn test_update_version() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + let uuid3 = Uuid::new_v4(); + let uuid4 = Uuid::new_v4(); + + // create some operations + { + let mut txn = storage.txn()?; + txn.add_operation(Operation::Create { uuid: uuid1 })?; + txn.add_operation(Operation::Create { uuid: uuid2 })?; + txn.add_operation(Operation::Create { uuid: uuid3 })?; + txn.add_operation(Operation::Delete { uuid: uuid2 })?; + txn.commit()?; + } + + // update version from the server.. + { + let mut txn = storage.txn()?; + txn.update_version( + 1, + vec![ + Operation::Create { uuid: uuid2 }, + Operation::Delete { uuid: uuid2 }, + ], + )?; + txn.commit()?; + } + + // check that the operations are updated and the base version is incremented + { + let mut txn = storage.txn()?; + let ops = txn.operations()?; + assert_eq!( + ops, + vec![ + Operation::Create { uuid: uuid2 }, + Operation::Delete { uuid: uuid2 }, + ] + ); + assert_eq!(txn.base_version()?, 1); + } + + // create some more operations (to test adding operations after updating) + { + let mut txn = storage.txn()?; + txn.add_operation(Operation::Create { uuid: uuid4 })?; + txn.add_operation(Operation::Delete { uuid: uuid4 })?; + txn.commit()?; + } + + // read them back + { + let mut txn = storage.txn()?; + let ops = txn.operations()?; + assert_eq!( + ops, + vec![ + Operation::Create { uuid: uuid2 }, + Operation::Delete { uuid: uuid2 }, + Operation::Create { uuid: uuid4 }, + Operation::Delete { uuid: uuid4 }, + ] + ); + } + Ok(()) + } +} diff --git a/src/taskstorage/lmdb.rs b/src/taskstorage/lmdb.rs deleted file mode 100644 index 1a7d75c7f..000000000 --- a/src/taskstorage/lmdb.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::operation::Operation; -use crate::taskstorage::{TaskMap, TaskStorage}; -use kv::{Config, Error, Manager, ValueRef}; -use uuid::Uuid; - -pub struct KVStorage { - // TODO: make the manager global with lazy-static - manager: Manager, - config: Config, -} - -impl KVStorage { - pub fn new(directory: &str) -> KVStorage { - let mut config = Config::default(directory); - config.bucket("base_version", None); - config.bucket("operations", None); - config.bucket("tasks", None); - KVStorage { - manager: Manager::new(), - config, - } - } -} - -impl TaskStorage for KVStorage { - /// Get an (immutable) task, if it is in the storage - fn get_task(&self, uuid: &Uuid) -> Option { - match self.tasks.get(uuid) { - None => None, - Some(t) => Some(t.clone()), - } - } - - /// Create a task, only if it does not already exist. Returns true if - /// the task was created (did not already exist). - fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool { - if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) { - ent.or_insert(task); - true - } else { - false - } - } - - /// Set a task, overwriting any existing task. - fn set_task(&mut self, uuid: Uuid, task: TaskMap) { - self.tasks.insert(uuid, task); - } - - /// Delete a task, if it exists. Returns true if the task was deleted (already existed) - fn delete_task(&mut self, uuid: &Uuid) -> bool { - if let Some(_) = self.tasks.remove(uuid) { - true - } else { - false - } - } - - fn get_task_uuids<'a>(&'a self) -> Box + 'a> { - Box::new(self.tasks.keys().map(|u| u.clone())) - } - - /// Add an operation to the list of operations in the storage. Note that this merely *stores* - /// the operation; it is up to the TaskDB to apply it. - fn add_operation(&mut self, op: Operation) { - self.operations.push(op); - } - - /// Get the current base_version for this storage -- the last version synced from the server. - fn base_version(&self) -> u64 { - return self.base_version; - } - - /// Get the current set of outstanding operations (operations that have not been sync'd to the - /// server yet) - fn operations<'a>(&'a self) -> Box + 'a> { - Box::new(self.operations.iter()) - } - - /// Apply the next version from the server. This replaces the existing base_version and - /// operations. It's up to the caller (TaskDB) to ensure this is done consistently. - fn update_version(&mut self, version: u64, new_operations: Vec) { - // ensure that we are applying the versions in order.. - assert_eq!(version, self.base_version + 1); - self.base_version = version; - self.operations = new_operations; - } - - /// Record the outstanding operations as synced to the server in the given version. - fn local_operations_synced(&mut self, version: u64) { - assert_eq!(version, self.base_version + 1); - self.base_version = version; - self.operations = vec![]; - } -} diff --git a/src/taskstorage/mod.rs b/src/taskstorage/mod.rs index 3f339ac29..d7a175596 100644 --- a/src/taskstorage/mod.rs +++ b/src/taskstorage/mod.rs @@ -1,51 +1,91 @@ use crate::Operation; use failure::Fallible; use std::collections::HashMap; -use std::fmt; use uuid::Uuid; mod inmemory; +mod kv; pub use inmemory::InMemoryStorage; /// An in-memory representation of a task as a simple hashmap pub type TaskMap = HashMap; -/// A trait for objects able to act as backing storage for a TaskDB. This API is optimized to be -/// easy to implement, with all of the semantic meaning of the data located in the TaskDB -/// implementation, which is the sole consumer of this trait. -pub trait TaskStorage: fmt::Debug { +#[cfg(test)] +fn taskmap_with(mut properties: Vec<(String, String)>) -> TaskMap { + let mut rv = TaskMap::new(); + for (p, v) in properties.drain(..) { + rv.insert(p, v); + } + rv +} + +/// A TaskStorage transaction, in which storage operations are performed. +/// Serializable consistency is maintained, and implementations do not optimize +/// for concurrent access so some may simply apply a mutex to limit access to +/// one transaction at a time. Transactions are aborted if they are dropped. +/// It's safe to drop transactions that did not modify any data. +pub trait TaskStorageTxn { /// Get an (immutable) task, if it is in the storage - fn get_task(&self, uuid: &Uuid) -> Fallible>; + fn get_task(&mut self, uuid: &Uuid) -> Fallible>; - /// Create a task, only if it does not already exist. Returns true if + /// Create an (empty) task, only if it does not already exist. Returns true if /// the task was created (did not already exist). - fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible; + fn create_task(&mut self, uuid: Uuid) -> Fallible; - /// Set a task, overwriting any existing task. + /// Set a task, overwriting any existing task. If the task does not exist, this implicitly + /// creates it (use `get_task` to check first, if necessary). fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()>; /// Delete a task, if it exists. Returns true if the task was deleted (already existed) fn delete_task(&mut self, uuid: &Uuid) -> Fallible; + /// Get the uuids and bodies of all tasks in the storage, in undefined order. + fn all_tasks<'a>(&mut self) -> Fallible>; + /// Get the uuids of all tasks in the storage, in undefined order. - fn get_task_uuids<'a>(&'a self) -> Fallible + 'a>>; + fn all_task_uuids<'a>(&mut self) -> Fallible>; /// Add an operation to the list of operations in the storage. Note that this merely *stores* - /// the operation; it is up to the TaskDB to apply it. + /// the operation; it is up to the DB to apply it. fn add_operation(&mut self, op: Operation) -> Fallible<()>; /// Get the current base_version for this storage -- the last version synced from the server. - fn base_version(&self) -> Fallible; + fn base_version(&mut self) -> Fallible; /// Get the current set of outstanding operations (operations that have not been sync'd to the /// server yet) - fn operations<'a>(&'a self) -> Fallible + 'a>>; + fn operations<'a>(&mut self) -> Fallible>; /// Apply the next version from the server. This replaces the existing base_version and - /// operations. It's up to the caller (TaskDB) to ensure this is done consistently. + /// operations. It's up to the caller (DB) to ensure this is done consistently. fn update_version(&mut self, version: u64, new_operations: Vec) -> Fallible<()>; - /// Record the outstanding operations as synced to the server in the given version. + /// Record the outstanding operations as synced to the server in the given version: set + /// the base_version to the given value, and empty the operations list. fn local_operations_synced(&mut self, version: u64) -> Fallible<()>; + + /// Commit any changes made in the transaction. It is an error to call this more than + /// once. + fn commit(&mut self) -> Fallible<()>; +} + +/// A trait for objects able to act as backing storage for a DB. This API is optimized to be +/// easy to implement, with all of the semantic meaning of the data located in the DB +/// implementation, which is the sole consumer of this trait. +/// +/// Conceptually, task storage contains the following: +/// +/// - tasks: a set of tasks indexed by uuid +/// - base_version: the number of the last version sync'd from the server +/// - operations: all operations performed since base_version +/// +/// The `operations` are already reflected in `tasks`, so the following invariant holds: +/// > Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical +/// > to `tasks`. +/// +/// It is up to the caller (DB) to maintain this invariant. +pub trait TaskStorage { + /// Begin a transaction + fn txn<'a>(&'a mut self) -> Fallible>; } diff --git a/tests/sync_action_sequences.rs b/tests/sync_action_sequences.rs index 58ed9ba77..ddc8d328c 100644 --- a/tests/sync_action_sequences.rs +++ b/tests/sync_action_sequences.rs @@ -63,10 +63,6 @@ proptest! { } } - println!("{:?}", dbs[0]); - println!("{:?}", dbs[1]); - println!("{:?}", dbs[2]); - assert_eq!(dbs[0].sorted_tasks(), dbs[0].sorted_tasks()); assert_eq!(dbs[1].sorted_tasks(), dbs[2].sorted_tasks()); }