diff --git a/src/taskstorage/inmemory.rs b/src/taskstorage/inmemory.rs index ef654b0f3..e89d31bc7 100644 --- a/src/taskstorage/inmemory.rs +++ b/src/taskstorage/inmemory.rs @@ -10,6 +10,7 @@ struct Data { tasks: HashMap, base_version: u64, operations: Vec, + working_set: Vec>, } struct Txn<'t> { @@ -104,6 +105,31 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(()) } + fn get_working_set(&mut self) -> Fallible>> { + Ok(self.data_ref().working_set.clone()) + } + + fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible { + let working_set = &mut self.mut_data_ref().working_set; + working_set.push(Some(uuid)); + Ok(working_set.len() as u64) + } + + fn remove_from_working_set(&mut self, index: u64) -> Fallible<()> { + let index = index as usize; + let working_set = &mut self.mut_data_ref().working_set; + if index >= working_set.len() || working_set[index].is_none() { + return Err(format_err!("No task found with index {}", index)); + } + working_set[index] = None; + Ok(()) + } + + fn clear_working_set(&mut self) -> Fallible<()> { + self.mut_data_ref().working_set = vec![None]; + Ok(()) + } + fn commit(&mut self) -> Fallible<()> { // copy the new_data back into storage to commit the transaction if let Some(data) = self.new_data.take() { @@ -125,6 +151,7 @@ impl InMemoryStorage { tasks: HashMap::new(), base_version: 0, operations: vec![], + working_set: vec![None], }, } } @@ -138,3 +165,127 @@ impl TaskStorage for InMemoryStorage { })) } } + +#[cfg(test)] +mod test { + use super::*; + + // (note: this module is heavily used in tests so most of its functionality is well-tested + // elsewhere and not tested here) + + #[test] + fn get_working_set_empty() -> Fallible<()> { + let mut storage = InMemoryStorage::new(); + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None]); + } + + Ok(()) + } + + #[test] + fn add_to_working_set() -> Fallible<()> { + let mut storage = InMemoryStorage::new(); + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(uuid2.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]); + } + + Ok(()) + } + + #[test] + fn add_and_remove_from_working_set_holes() -> Fallible<()> { + let mut storage = InMemoryStorage::new(); + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(uuid2.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + txn.remove_from_working_set(1)?; + txn.add_to_working_set(uuid1.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None, None, Some(uuid2), Some(uuid1)]); + } + + Ok(()) + } + + #[test] + fn remove_working_set_doesnt_exist() -> Fallible<()> { + let mut storage = InMemoryStorage::new(); + let uuid1 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let res = txn.remove_from_working_set(0); + assert!(res.is_err()); + let res = txn.remove_from_working_set(2); + assert!(res.is_err()); + } + + Ok(()) + } + + #[test] + fn clear_working_set() -> Fallible<()> { + let mut storage = InMemoryStorage::new(); + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(uuid2.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + txn.clear_working_set()?; + txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(uuid1.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]); + } + + Ok(()) + } +} diff --git a/src/taskstorage/kv.rs b/src/taskstorage/kv.rs index 20c4da301..c8a6d85d5 100644 --- a/src/taskstorage/kv.rs +++ b/src/taskstorage/kv.rs @@ -51,10 +51,12 @@ pub struct KVStorage<'t> { tasks_bucket: Bucket<'t, Key, ValueBuf>>, numbers_bucket: Bucket<'t, Integer, ValueBuf>>, operations_bucket: Bucket<'t, Integer, ValueBuf>>, + working_set_bucket: Bucket<'t, Integer, ValueBuf>>, } const BASE_VERSION: u64 = 1; const NEXT_OPERATION: u64 = 2; +const NEXT_WORKING_SET_INDEX: u64 = 3; impl<'t> KVStorage<'t> { pub fn new(directory: &Path) -> Fallible { @@ -62,6 +64,7 @@ impl<'t> KVStorage<'t> { config.bucket("tasks", None); config.bucket("numbers", None); config.bucket("operations", None); + config.bucket("working_set", None); let store = Store::new(config)?; // tasks are stored indexed by uuid @@ -75,11 +78,17 @@ impl<'t> KVStorage<'t> { let operations_bucket = store.int_bucket::>>(Some("operations"))?; + // this bucket contains operations, numbered consecutively; the NEXT_WORKING_SET_INDEX + // number gives the index of the next operation to insert + let working_set_bucket = + store.int_bucket::>>(Some("working_set"))?; + Ok(KVStorage { store, tasks_bucket, numbers_bucket, operations_bucket, + working_set_bucket, }) } } @@ -118,6 +127,9 @@ impl<'t> Txn<'t> { fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { &self.storage.operations_bucket } + fn working_set_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { + &self.storage.working_set_bucket + } } impl<'t> TaskStorageTxn for Txn<'t> { @@ -271,6 +283,90 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(()) } + fn get_working_set(&mut self) -> Fallible>> { + let working_set_bucket = self.working_set_bucket(); + let numbers_bucket = self.numbers_bucket(); + let kvtxn = self.kvtxn(); + + let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) { + Ok(buf) => buf.inner()?.to_serde(), + Err(Error::NotFound) => 1, + Err(e) => return Err(e.into()), + }; + + let mut res = Vec::with_capacity(next_index as usize); + for _ in 0..next_index { + res.push(None) + } + + let curs = kvtxn.read_cursor(working_set_bucket)?; + for (i, u) in kvtxn.read_cursor(working_set_bucket)?.iter() { + let i: u64 = i.into(); + res[i as usize] = Some(u.inner()?.to_serde()); + } + Ok(res) + } + + fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible { + let working_set_bucket = self.working_set_bucket(); + let numbers_bucket = self.numbers_bucket(); + let kvtxn = self.kvtxn(); + + let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) { + Ok(buf) => buf.inner()?.to_serde(), + Err(Error::NotFound) => 1, + Err(e) => return Err(e.into()), + }; + + kvtxn.set( + working_set_bucket, + next_index.into(), + Msgpack::to_value_buf(uuid)?, + )?; + kvtxn.set( + numbers_bucket, + NEXT_WORKING_SET_INDEX.into(), + Msgpack::to_value_buf(next_index + 1)?, + )?; + Ok(next_index) + } + + fn remove_from_working_set(&mut self, index: u64) -> Fallible<()> { + let working_set_bucket = self.working_set_bucket(); + let numbers_bucket = self.numbers_bucket(); + let kvtxn = self.kvtxn(); + + let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) { + Ok(buf) => buf.inner()?.to_serde(), + Err(Error::NotFound) => 1, + Err(e) => return Err(e.into()), + }; + if index == 0 || index >= next_index { + return Err(format_err!("No task found with index {}", index)); + } + + match kvtxn.del(working_set_bucket, index.into()) { + Err(Error::NotFound) => Err(format_err!("No task found with index {}", index)), + Err(e) => Err(e.into()), + Ok(_) => Ok(()), + } + } + + fn clear_working_set(&mut self) -> Fallible<()> { + let working_set_bucket = self.working_set_bucket(); + let numbers_bucket = self.numbers_bucket(); + let kvtxn = self.kvtxn(); + + kvtxn.clear_db(working_set_bucket)?; + kvtxn.set( + numbers_bucket, + NEXT_WORKING_SET_INDEX.into(), + Msgpack::to_value_buf(1)?, + )?; + + Ok(()) + } + fn commit(&mut self) -> Fallible<()> { if let Some(kvtxn) = self.txn.take() { kvtxn.commit()?; @@ -546,4 +642,125 @@ mod test { } Ok(()) } + + #[test] + fn get_working_set_empty() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None]); + } + + Ok(()) + } + + #[test] + fn add_to_working_set() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(uuid2.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]); + } + + Ok(()) + } + + #[test] + fn add_and_remove_from_working_set_holes() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(uuid2.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + txn.remove_from_working_set(1)?; + txn.add_to_working_set(uuid1.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None, None, Some(uuid2), Some(uuid1)]); + } + + Ok(()) + } + + #[test] + fn remove_working_set_doesnt_exist() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid1 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let res = txn.remove_from_working_set(0); + assert!(res.is_err()); + let res = txn.remove_from_working_set(2); + assert!(res.is_err()); + } + + Ok(()) + } + + #[test] + fn clear_working_set() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + + { + let mut txn = storage.txn()?; + txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(uuid2.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + txn.clear_working_set()?; + txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(uuid1.clone())?; + txn.commit()?; + } + + { + let mut txn = storage.txn()?; + let ws = txn.get_working_set()?; + assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]); + } + + Ok(()) + } } diff --git a/src/taskstorage/mod.rs b/src/taskstorage/mod.rs index 092707225..5c2d879bf 100644 --- a/src/taskstorage/mod.rs +++ b/src/taskstorage/mod.rs @@ -64,6 +64,20 @@ pub trait TaskStorageTxn { /// Replace the current list of operations with a new list. fn set_operations(&mut self, ops: Vec) -> Fallible<()>; + /// Get the entire working set, with each task UUID at its appropriate (1-based) index. + /// Element 0 is always None. + fn get_working_set(&mut self) -> Fallible>>; + + /// Add a task to the working set and return its (one-based) index. This index will be one greater + /// than the highest used index. + fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible; + + /// Remove a task from the working set. Other tasks' indexes are not affected. + fn remove_from_working_set(&mut self, index: u64) -> Fallible<()>; + + /// Clear all tasks from the working set in preparation for a garbage-collection operation. + fn clear_working_set(&mut self) -> Fallible<()>; + /// Commit any changes made in the transaction. It is an error to call this more than /// once. fn commit(&mut self) -> Fallible<()>; @@ -78,6 +92,8 @@ pub trait TaskStorageTxn { /// - tasks: a set of tasks indexed by uuid /// - base_version: the number of the last version sync'd from the server /// - operations: all operations performed since base_version +/// - working_set: a mapping from integer -> uuid, used to keep stable small-integer indexes +/// into the tasks. The replica maintains this list. It is not covered by operations. /// /// The `operations` are already reflected in `tasks`, so the following invariant holds: /// > Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical