add working_set support to taskstorage

This commit is contained in:
Dustin J. Mitchell 2020-01-19 14:56:09 -05:00
parent ad08991292
commit 12980da5fd
3 changed files with 384 additions and 0 deletions

View file

@ -10,6 +10,7 @@ struct Data {
tasks: HashMap<Uuid, TaskMap>,
base_version: u64,
operations: Vec<Operation>,
working_set: Vec<Option<Uuid>>,
}
struct Txn<'t> {
@ -104,6 +105,31 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(())
}
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>> {
Ok(self.data_ref().working_set.clone())
}
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<u64> {
let working_set = &mut self.mut_data_ref().working_set;
working_set.push(Some(uuid));
Ok(working_set.len() as u64)
}
fn remove_from_working_set(&mut self, index: u64) -> Fallible<()> {
let index = index as usize;
let working_set = &mut self.mut_data_ref().working_set;
if index >= working_set.len() || working_set[index].is_none() {
return Err(format_err!("No task found with index {}", index));
}
working_set[index] = None;
Ok(())
}
fn clear_working_set(&mut self) -> Fallible<()> {
self.mut_data_ref().working_set = vec![None];
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
// copy the new_data back into storage to commit the transaction
if let Some(data) = self.new_data.take() {
@ -125,6 +151,7 @@ impl InMemoryStorage {
tasks: HashMap::new(),
base_version: 0,
operations: vec![],
working_set: vec![None],
},
}
}
@ -138,3 +165,127 @@ impl TaskStorage for InMemoryStorage {
}))
}
}
#[cfg(test)]
mod test {
use super::*;
// (note: this module is heavily used in tests so most of its functionality is well-tested
// elsewhere and not tested here)
#[test]
fn get_working_set_empty() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None]);
}
Ok(())
}
#[test]
fn add_to_working_set() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]);
}
Ok(())
}
#[test]
fn add_and_remove_from_working_set_holes() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.remove_from_working_set(1)?;
txn.add_to_working_set(uuid1.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
#[test]
fn remove_working_set_doesnt_exist() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let res = txn.remove_from_working_set(0);
assert!(res.is_err());
let res = txn.remove_from_working_set(2);
assert!(res.is_err());
}
Ok(())
}
#[test]
fn clear_working_set() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(uuid1.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
}

View file

@ -51,10 +51,12 @@ pub struct KVStorage<'t> {
tasks_bucket: Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>>,
numbers_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<u64>>>,
operations_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>>,
working_set_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
}
const BASE_VERSION: u64 = 1;
const NEXT_OPERATION: u64 = 2;
const NEXT_WORKING_SET_INDEX: u64 = 3;
impl<'t> KVStorage<'t> {
pub fn new(directory: &Path) -> Fallible<KVStorage> {
@ -62,6 +64,7 @@ impl<'t> KVStorage<'t> {
config.bucket("tasks", None);
config.bucket("numbers", None);
config.bucket("operations", None);
config.bucket("working_set", None);
let store = Store::new(config)?;
// tasks are stored indexed by uuid
@ -75,11 +78,17 @@ impl<'t> KVStorage<'t> {
let operations_bucket =
store.int_bucket::<ValueBuf<Msgpack<Operation>>>(Some("operations"))?;
// this bucket contains operations, numbered consecutively; the NEXT_WORKING_SET_INDEX
// number gives the index of the next operation to insert
let working_set_bucket =
store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("working_set"))?;
Ok(KVStorage {
store,
tasks_bucket,
numbers_bucket,
operations_bucket,
working_set_bucket,
})
}
}
@ -118,6 +127,9 @@ impl<'t> Txn<'t> {
fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>> {
&self.storage.operations_bucket
}
fn working_set_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>> {
&self.storage.working_set_bucket
}
}
impl<'t> TaskStorageTxn for Txn<'t> {
@ -271,6 +283,90 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(())
}
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
let mut res = Vec::with_capacity(next_index as usize);
for _ in 0..next_index {
res.push(None)
}
let curs = kvtxn.read_cursor(working_set_bucket)?;
for (i, u) in kvtxn.read_cursor(working_set_bucket)?.iter() {
let i: u64 = i.into();
res[i as usize] = Some(u.inner()?.to_serde());
}
Ok(res)
}
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<u64> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
kvtxn.set(
working_set_bucket,
next_index.into(),
Msgpack::to_value_buf(uuid)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_WORKING_SET_INDEX.into(),
Msgpack::to_value_buf(next_index + 1)?,
)?;
Ok(next_index)
}
fn remove_from_working_set(&mut self, index: u64) -> Fallible<()> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
if index == 0 || index >= next_index {
return Err(format_err!("No task found with index {}", index));
}
match kvtxn.del(working_set_bucket, index.into()) {
Err(Error::NotFound) => Err(format_err!("No task found with index {}", index)),
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}
fn clear_working_set(&mut self) -> Fallible<()> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(working_set_bucket)?;
kvtxn.set(
numbers_bucket,
NEXT_WORKING_SET_INDEX.into(),
Msgpack::to_value_buf(1)?,
)?;
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
if let Some(kvtxn) = self.txn.take() {
kvtxn.commit()?;
@ -546,4 +642,125 @@ mod test {
}
Ok(())
}
#[test]
fn get_working_set_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None]);
}
Ok(())
}
#[test]
fn add_to_working_set() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]);
}
Ok(())
}
#[test]
fn add_and_remove_from_working_set_holes() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.remove_from_working_set(1)?;
txn.add_to_working_set(uuid1.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
#[test]
fn remove_working_set_doesnt_exist() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let res = txn.remove_from_working_set(0);
assert!(res.is_err());
let res = txn.remove_from_working_set(2);
assert!(res.is_err());
}
Ok(())
}
#[test]
fn clear_working_set() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(uuid1.clone())?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
}

View file

@ -64,6 +64,20 @@ pub trait TaskStorageTxn {
/// Replace the current list of operations with a new list.
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()>;
/// Get the entire working set, with each task UUID at its appropriate (1-based) index.
/// Element 0 is always None.
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>>;
/// Add a task to the working set and return its (one-based) index. This index will be one greater
/// than the highest used index.
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<u64>;
/// Remove a task from the working set. Other tasks' indexes are not affected.
fn remove_from_working_set(&mut self, index: u64) -> Fallible<()>;
/// Clear all tasks from the working set in preparation for a garbage-collection operation.
fn clear_working_set(&mut self) -> Fallible<()>;
/// Commit any changes made in the transaction. It is an error to call this more than
/// once.
fn commit(&mut self) -> Fallible<()>;
@ -78,6 +92,8 @@ pub trait TaskStorageTxn {
/// - tasks: a set of tasks indexed by uuid
/// - base_version: the number of the last version sync'd from the server
/// - operations: all operations performed since base_version
/// - working_set: a mapping from integer -> uuid, used to keep stable small-integer indexes
/// into the tasks. The replica maintains this list. It is not covered by operations.
///
/// The `operations` are already reflected in `tasks`, so the following invariant holds:
/// > Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical