rename taskstorage to storage

This commit is contained in:
Dustin J. Mitchell 2021-01-10 21:11:55 -05:00
parent a318ceebe2
commit 15ffc62279
10 changed files with 32 additions and 36 deletions

View file

@ -0,0 +1,238 @@
#![allow(clippy::new_without_default)]
use crate::storage::{Operation, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
use failure::{bail, Fallible};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(PartialEq, Debug, Clone)]
struct Data {
tasks: HashMap<Uuid, TaskMap>,
base_version: VersionId,
operations: Vec<Operation>,
working_set: Vec<Option<Uuid>>,
}
struct Txn<'t> {
storage: &'t mut InMemoryStorage,
new_data: Option<Data>,
}
impl<'t> Txn<'t> {
fn mut_data_ref(&mut self) -> &mut Data {
if self.new_data.is_none() {
self.new_data = Some(self.storage.data.clone());
}
if let Some(ref mut data) = self.new_data {
data
} else {
unreachable!();
}
}
fn data_ref(&mut self) -> &Data {
if let Some(ref data) = self.new_data {
data
} else {
&self.storage.data
}
}
}
impl<'t> StorageTxn for Txn<'t> {
fn get_task(&mut self, uuid: Uuid) -> Fallible<Option<TaskMap>> {
match self.data_ref().tasks.get(&uuid) {
None => Ok(None),
Some(t) => Ok(Some(t.clone())),
}
}
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool> {
if let ent @ Entry::Vacant(_) = self.mut_data_ref().tasks.entry(uuid) {
ent.or_insert_with(TaskMap::new);
Ok(true)
} else {
Ok(false)
}
}
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> {
self.mut_data_ref().tasks.insert(uuid, task);
Ok(())
}
fn delete_task(&mut self, uuid: Uuid) -> Fallible<bool> {
Ok(self.mut_data_ref().tasks.remove(&uuid).is_some())
}
fn all_tasks<'a>(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
Ok(self
.data_ref()
.tasks
.iter()
.map(|(u, t)| (*u, t.clone()))
.collect())
}
fn all_task_uuids<'a>(&mut self) -> Fallible<Vec<Uuid>> {
Ok(self.data_ref().tasks.keys().copied().collect())
}
fn base_version(&mut self) -> Fallible<VersionId> {
Ok(self.data_ref().base_version)
}
fn set_base_version(&mut self, version: VersionId) -> Fallible<()> {
self.mut_data_ref().base_version = version;
Ok(())
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
Ok(self.data_ref().operations.clone())
}
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
self.mut_data_ref().operations.push(op);
Ok(())
}
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()> {
self.mut_data_ref().operations = ops;
Ok(())
}
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>> {
Ok(self.data_ref().working_set.clone())
}
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<usize> {
let working_set = &mut self.mut_data_ref().working_set;
working_set.push(Some(uuid));
Ok(working_set.len())
}
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Fallible<()> {
let working_set = &mut self.mut_data_ref().working_set;
if index >= working_set.len() {
bail!("Index {} is not in the working set", index);
}
working_set[index] = uuid;
Ok(())
}
fn clear_working_set(&mut self) -> Fallible<()> {
self.mut_data_ref().working_set = vec![None];
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
// copy the new_data back into storage to commit the transaction
if let Some(data) = self.new_data.take() {
self.storage.data = data;
}
Ok(())
}
}
/// InMemoryStorage is a simple in-memory task storage implementation. It is not useful for
/// production data, but is useful for testing purposes.
#[derive(PartialEq, Debug, Clone)]
pub struct InMemoryStorage {
data: Data,
}
impl InMemoryStorage {
pub fn new() -> InMemoryStorage {
InMemoryStorage {
data: Data {
tasks: HashMap::new(),
base_version: DEFAULT_BASE_VERSION,
operations: vec![],
working_set: vec![None],
},
}
}
}
impl Storage for InMemoryStorage {
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn StorageTxn + 'a>> {
Ok(Box::new(Txn {
storage: self,
new_data: None,
}))
}
}
#[cfg(test)]
mod test {
use super::*;
// (note: this module is heavily used in tests so most of its functionality is well-tested
// elsewhere and not tested here)
#[test]
fn get_working_set_empty() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None]);
}
Ok(())
}
#[test]
fn add_to_working_set() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1)?;
txn.add_to_working_set(uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]);
}
Ok(())
}
#[test]
fn clear_working_set() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1)?;
txn.add_to_working_set(uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(uuid2)?;
txn.add_to_working_set(uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
}

View file

@ -0,0 +1,685 @@
use crate::storage::{Operation, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
use crate::utils::Key;
use failure::{bail, Fallible};
use kv::msgpack::Msgpack;
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
use std::path::Path;
use uuid::Uuid;
/// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate.
pub struct KVStorage<'t> {
store: Store,
tasks_bucket: Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>>,
numbers_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<u64>>>,
uuids_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
operations_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>>,
working_set_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
}
const BASE_VERSION: u64 = 1;
const NEXT_OPERATION: u64 = 2;
const NEXT_WORKING_SET_INDEX: u64 = 3;
impl<'t> KVStorage<'t> {
pub fn new<P: AsRef<Path>>(directory: P) -> Fallible<KVStorage<'t>> {
let mut config = Config::default(directory);
config.bucket("tasks", None);
config.bucket("numbers", None);
config.bucket("uuids", None);
config.bucket("operations", None);
config.bucket("working_set", None);
let store = Store::new(config)?;
// tasks are stored indexed by uuid
let tasks_bucket = store.bucket::<Key, ValueBuf<Msgpack<TaskMap>>>(Some("tasks"))?;
// this bucket contains various u64s, indexed by constants above
let numbers_bucket = store.int_bucket::<ValueBuf<Msgpack<u64>>>(Some("numbers"))?;
// this bucket contains various Uuids, indexed by constants above
let uuids_bucket = store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("uuids"))?;
// this bucket contains operations, numbered consecutively; the NEXT_OPERATION number gives
// the index of the next operation to insert
let operations_bucket =
store.int_bucket::<ValueBuf<Msgpack<Operation>>>(Some("operations"))?;
// this bucket contains operations, numbered consecutively; the NEXT_WORKING_SET_INDEX
// number gives the index of the next operation to insert
let working_set_bucket =
store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("working_set"))?;
Ok(KVStorage {
store,
tasks_bucket,
numbers_bucket,
uuids_bucket,
operations_bucket,
working_set_bucket,
})
}
}
impl<'t> Storage for KVStorage<'t> {
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn StorageTxn + 'a>> {
Ok(Box::new(Txn {
storage: self,
txn: Some(self.store.write_txn()?),
}))
}
}
struct Txn<'t> {
storage: &'t KVStorage<'t>,
txn: Option<kv::Txn<'t>>,
}
impl<'t> Txn<'t> {
// get the underlying kv Txn
fn kvtxn(&mut self) -> &mut kv::Txn<'t> {
if let Some(ref mut txn) = self.txn {
txn
} else {
panic!("cannot use transaction after commit");
}
}
// Access to buckets
fn tasks_bucket(&self) -> &'t Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>> {
&self.storage.tasks_bucket
}
fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<u64>>> {
&self.storage.numbers_bucket
}
fn uuids_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>> {
&self.storage.uuids_bucket
}
fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>> {
&self.storage.operations_bucket
}
fn working_set_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>> {
&self.storage.working_set_bucket
}
}
impl<'t> StorageTxn for Txn<'t> {
fn get_task(&mut self, uuid: Uuid) -> Fallible<Option<TaskMap>> {
let bucket = self.tasks_bucket();
let buf = match self.kvtxn().get(bucket, uuid.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(None),
Err(e) => return Err(e.into()),
};
let value = buf.inner()?.to_serde();
Ok(Some(value))
}
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
match kvtxn.get(bucket, uuid.into()) {
Err(Error::NotFound) => {
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(TaskMap::new())?)?;
Ok(true)
}
Err(e) => Err(e.into()),
Ok(_) => Ok(false),
}
}
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(task)?)?;
Ok(())
}
fn delete_task(&mut self, uuid: Uuid) -> Fallible<bool> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
match kvtxn.del(bucket, uuid.into()) {
Err(Error::NotFound) => Ok(false),
Err(e) => Err(e.into()),
Ok(_) => Ok(true),
}
}
fn all_tasks(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
let all_tasks: Result<Vec<(Uuid, TaskMap)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(k, v)| Ok((k.into(), v.inner()?.to_serde())))
.collect();
Ok(all_tasks?)
}
fn all_task_uuids(&mut self) -> Fallible<Vec<Uuid>> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
Ok(kvtxn
.read_cursor(bucket)?
.iter()
.map(|(k, _)| k.into())
.collect())
}
fn base_version(&mut self) -> Fallible<VersionId> {
let bucket = self.uuids_bucket();
let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(DEFAULT_BASE_VERSION),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version as VersionId)
}
fn set_base_version(&mut self, version: VersionId) -> Fallible<()> {
let uuids_bucket = self.uuids_bucket();
let kvtxn = self.kvtxn();
kvtxn.set(
uuids_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version as Uuid)?,
)?;
Ok(())
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
let bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let all_ops: Result<Vec<(u64, Operation)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(i, v)| Ok((i.into(), v.inner()?.to_serde())))
.collect();
let mut all_ops = all_ops?;
// sort by key..
all_ops.sort_by(|a, b| a.0.cmp(&b.0));
// and return the values..
Ok(all_ops.iter().map(|(_, v)| v.clone()).collect())
}
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let next_op = match kvtxn.get(numbers_bucket, NEXT_OPERATION.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 0,
Err(e) => return Err(e.into()),
};
kvtxn.set(
operations_bucket,
next_op.into(),
Msgpack::to_value_buf(op)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(next_op + 1)?,
)?;
Ok(())
}
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(operations_bucket)?;
let mut i = 0u64;
for op in ops {
kvtxn.set(operations_bucket, i.into(), Msgpack::to_value_buf(op)?)?;
i += 1;
}
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(i)?,
)?;
Ok(())
}
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
let mut res = Vec::with_capacity(next_index as usize);
for _ in 0..next_index {
res.push(None)
}
for (i, u) in kvtxn.read_cursor(working_set_bucket)?.iter() {
let i: u64 = i.into();
res[i as usize] = Some(u.inner()?.to_serde());
}
Ok(res)
}
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<usize> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
kvtxn.set(
working_set_bucket,
next_index.into(),
Msgpack::to_value_buf(uuid)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_WORKING_SET_INDEX.into(),
Msgpack::to_value_buf(next_index + 1)?,
)?;
Ok(next_index as usize)
}
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Fallible<()> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let index = index as u64;
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
if index >= next_index {
bail!("Index {} is not in the working set", index);
}
if let Some(uuid) = uuid {
kvtxn.set(
working_set_bucket,
index.into(),
Msgpack::to_value_buf(uuid)?,
)?;
} else {
kvtxn.del(working_set_bucket, index.into())?;
}
Ok(())
}
fn clear_working_set(&mut self) -> Fallible<()> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(working_set_bucket)?;
kvtxn.set(
numbers_bucket,
NEXT_WORKING_SET_INDEX.into(),
Msgpack::to_value_buf(1)?,
)?;
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
if let Some(kvtxn) = self.txn.take() {
kvtxn.commit()?;
} else {
panic!("transaction already committed");
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::taskmap_with;
use failure::Fallible;
use tempdir::TempDir;
#[test]
fn test_create() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid)?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let task = txn.get_task(uuid)?;
assert_eq!(task, Some(taskmap_with(vec![])));
}
Ok(())
}
#[test]
fn test_create_exists() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid)?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert!(!txn.create_task(uuid)?);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_get_missing() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
let task = txn.get_task(uuid)?;
assert_eq!(task, None);
}
Ok(())
}
#[test]
fn test_set_task() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.set_task(uuid, taskmap_with(vec![("k".to_string(), "v".to_string())]))?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let task = txn.get_task(uuid)?;
assert_eq!(
task,
Some(taskmap_with(vec![("k".to_string(), "v".to_string())]))
);
}
Ok(())
}
#[test]
fn test_delete_task_missing() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(!txn.delete_task(uuid)?);
}
Ok(())
}
#[test]
fn test_delete_task_exists() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid)?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert!(txn.delete_task(uuid)?);
}
Ok(())
}
#[test]
fn test_all_tasks_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
let tasks = txn.all_tasks()?;
assert_eq!(tasks, vec![]);
}
Ok(())
}
#[test]
fn test_all_tasks_and_uuids() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid1.clone())?);
txn.set_task(
uuid1.clone(),
taskmap_with(vec![("num".to_string(), "1".to_string())]),
)?;
assert!(txn.create_task(uuid2.clone())?);
txn.set_task(
uuid2.clone(),
taskmap_with(vec![("num".to_string(), "2".to_string())]),
)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let mut tasks = txn.all_tasks()?;
// order is nondeterministic, so sort by uuid
tasks.sort_by(|a, b| a.0.cmp(&b.0));
let mut exp = vec![
(
uuid1.clone(),
taskmap_with(vec![("num".to_string(), "1".to_string())]),
),
(
uuid2.clone(),
taskmap_with(vec![("num".to_string(), "2".to_string())]),
),
];
exp.sort_by(|a, b| a.0.cmp(&b.0));
assert_eq!(tasks, exp);
}
{
let mut txn = storage.txn()?;
let mut uuids = txn.all_task_uuids()?;
uuids.sort();
let mut exp = vec![uuid1.clone(), uuid2.clone()];
exp.sort();
assert_eq!(uuids, exp);
}
Ok(())
}
#[test]
fn test_base_version_default() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, DEFAULT_BASE_VERSION);
}
Ok(())
}
#[test]
fn test_base_version_setting() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let u = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.set_base_version(u)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, u);
}
Ok(())
}
#[test]
fn test_operations() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
let uuid3 = Uuid::new_v4();
// create some operations
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid1 })?;
txn.add_operation(Operation::Create { uuid: uuid2 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid1 },
Operation::Create { uuid: uuid2 },
]
);
}
// set them to a different bunch
{
let mut txn = storage.txn()?;
txn.set_operations(vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
])?;
txn.commit()?;
}
// create some more operations (to test adding operations after clearing)
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid3 })?;
txn.add_operation(Operation::Delete { uuid: uuid3 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
Operation::Create { uuid: uuid3 },
Operation::Delete { uuid: uuid3 },
]
);
}
Ok(())
}
#[test]
fn get_working_set_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None]);
}
Ok(())
}
#[test]
fn add_to_working_set() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1)?;
txn.add_to_working_set(uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]);
}
Ok(())
}
#[test]
fn clear_working_set() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1)?;
txn.add_to_working_set(uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(uuid2)?;
txn.add_to_working_set(uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
}

View file

@ -0,0 +1,109 @@
use failure::Fallible;
use std::collections::HashMap;
use uuid::Uuid;
mod inmemory;
mod kv;
mod operation;
pub use self::kv::KVStorage;
pub use inmemory::InMemoryStorage;
pub use operation::Operation;
/// An in-memory representation of a task as a simple hashmap
pub type TaskMap = HashMap<String, String>;
#[cfg(test)]
fn taskmap_with(mut properties: Vec<(String, String)>) -> TaskMap {
let mut rv = TaskMap::new();
for (p, v) in properties.drain(..) {
rv.insert(p, v);
}
rv
}
/// The type of VersionIds
pub use crate::server::VersionId;
/// The default for base_version.
pub(crate) const DEFAULT_BASE_VERSION: Uuid = crate::server::NO_VERSION_ID;
/// A Storage transaction, in which storage operations are performed.
///
/// # Concurrency
///
/// Serializable consistency must be maintained. Concurrent access is unusual
/// and some implementations may simply apply a mutex to limit access to
/// one transaction at a time.
///
/// # Commiting and Aborting
///
/// A transaction is not visible to other readers until it is committed with
/// [`crate::storage::StorageTxn::commit`]. Transactions are aborted if they are dropped.
/// It is safe and performant to drop transactions that did not modify any data without committing.
pub trait StorageTxn {
/// Get an (immutable) task, if it is in the storage
fn get_task(&mut self, uuid: Uuid) -> Fallible<Option<TaskMap>>;
/// Create an (empty) task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool>;
/// Set a task, overwriting any existing task. If the task does not exist, this implicitly
/// creates it (use `get_task` to check first, if necessary).
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()>;
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: Uuid) -> Fallible<bool>;
/// Get the uuids and bodies of all tasks in the storage, in undefined order.
fn all_tasks(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>>;
/// Get the uuids of all tasks in the storage, in undefined order.
fn all_task_uuids(&mut self) -> Fallible<Vec<Uuid>>;
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&mut self) -> Fallible<VersionId>;
/// Set the current base_version for this storage.
fn set_base_version(&mut self, version: VersionId) -> Fallible<()>;
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations(&mut self) -> Fallible<Vec<Operation>>;
/// Add an operation to the end of the list of operations in the storage. Note that this
/// merely *stores* the operation; it is up to the TaskDB to apply it.
fn add_operation(&mut self, op: Operation) -> Fallible<()>;
/// Replace the current list of operations with a new list.
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()>;
/// Get the entire working set, with each task UUID at its appropriate (1-based) index.
/// Element 0 is always None.
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>>;
/// Add a task to the working set and return its (one-based) index. This index will be one greater
/// than the highest used index.
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<usize>;
/// Update the working set task at the given index. This cannot add a new item to the
/// working set.
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Fallible<()>;
/// Clear all tasks from the working set in preparation for a garbage-collection operation.
/// Note that this is the only way items are removed from the set.
fn clear_working_set(&mut self) -> Fallible<()>;
/// Commit any changes made in the transaction. It is an error to call this more than
/// once.
fn commit(&mut self) -> Fallible<()>;
}
/// A trait for objects able to act as task storage. Most of the interesting behavior is in the
/// [`crate::storage::StorageTxn`] trait.
pub trait Storage {
/// Begin a transaction
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn StorageTxn + 'a>>;
}

View file

@ -0,0 +1,352 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// An Operation defines a single change to the task database
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
/// Create a new task.
///
/// On application, if the task already exists, the operation does nothing.
Create { uuid: Uuid },
/// Delete an existing task.
///
/// On application, if the task does not exist, the operation does nothing.
Delete { uuid: Uuid },
/// Update an existing task, setting the given property to the given value. If the value is
/// None, then the corresponding property is deleted.
///
/// If the given task does not exist, the operation does nothing.
Update {
uuid: Uuid,
property: String,
value: Option<String>,
timestamp: DateTime<Utc>,
},
}
use Operation::*;
impl Operation {
// Transform takes two operations A and B that happened concurrently and produces two
// operations A' and B' such that `apply(apply(S, A), B') = apply(apply(S, B), A')`. This
// function is used to serialize operations in a process similar to a Git "rebase".
//
// *
// / \
// op1 / \ op2
// / \
// * *
//
// this function "completes the diamond:
//
// * *
// \ /
// op2' \ / op1'
// \ /
// *
//
// such that applying op2' after op1 has the same effect as applying op1' after op2. This
// allows two different systems which have already applied op1 and op2, respectively, and thus
// reached different states, to return to the same state by applying op2' and op1',
// respectively.
pub fn transform(
operation1: Operation,
operation2: Operation,
) -> (Option<Operation>, Option<Operation>) {
match (&operation1, &operation2) {
// Two creations or deletions of the same uuid reach the same state, so there's no need
// for any further operations to bring the state together.
(&Create { uuid: uuid1 }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => (None, None),
(&Delete { uuid: uuid1 }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => (None, None),
// Given a create and a delete of the same task, one of the operations is invalid: the
// create implies the task does not exist, but the delete implies it exists. Somewhat
// arbitrarily, we prefer the Create
(&Create { uuid: uuid1 }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => {
(Some(operation1), None)
}
(&Delete { uuid: uuid1 }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => {
(None, Some(operation2))
}
// And again from an Update and a Create, prefer the Update
(&Update { uuid: uuid1, .. }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => {
(Some(operation1), None)
}
(&Create { uuid: uuid1 }, &Update { uuid: uuid2, .. }) if uuid1 == uuid2 => {
(None, Some(operation2))
}
// Given a delete and an update, prefer the delete
(&Update { uuid: uuid1, .. }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => {
(None, Some(operation2))
}
(&Delete { uuid: uuid1 }, &Update { uuid: uuid2, .. }) if uuid1 == uuid2 => {
(Some(operation1), None)
}
// Two updates to the same property of the same task might conflict.
(
&Update {
uuid: ref uuid1,
property: ref property1,
value: ref value1,
timestamp: ref timestamp1,
},
&Update {
uuid: ref uuid2,
property: ref property2,
value: ref value2,
timestamp: ref timestamp2,
},
) if uuid1 == uuid2 && property1 == property2 => {
// if the value is the same, there's no conflict
if value1 == value2 {
(None, None)
} else if timestamp1 < timestamp2 {
// prefer the later modification
(None, Some(operation2))
} else {
// prefer the later modification or, if the modifications are the same,
// just choose one of them
(Some(operation1), None)
}
}
// anything else is not a conflict of any sort, so return the operations unchanged
(_, _) => (Some(operation1), Some(operation2)),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::InMemoryStorage;
use crate::taskdb::TaskDB;
use chrono::{Duration, Utc};
use proptest::prelude::*;
// note that `tests/operation_transform_invariant.rs` tests the transform function quite
// thoroughly, so this testing is light.
fn test_transform(
setup: Option<Operation>,
o1: Operation,
o2: Operation,
exp1p: Option<Operation>,
exp2p: Option<Operation>,
) {
let (o1p, o2p) = Operation::transform(o1.clone(), o2.clone());
assert_eq!((&o1p, &o2p), (&exp1p, &exp2p));
// check that the two operation sequences have the same effect, enforcing the invariant of
// the transform function.
let mut db1 = TaskDB::new_inmemory();
if let Some(ref o) = setup {
db1.apply(o.clone()).unwrap();
}
db1.apply(o1).unwrap();
if let Some(o) = o2p {
db1.apply(o).unwrap();
}
let mut db2 = TaskDB::new_inmemory();
if let Some(ref o) = setup {
db2.apply(o.clone()).unwrap();
}
db2.apply(o2).unwrap();
if let Some(o) = o1p {
db2.apply(o).unwrap();
}
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
#[test]
fn test_unrelated_create() {
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
test_transform(
None,
Create { uuid: uuid1 },
Create { uuid: uuid2 },
Some(Create { uuid: uuid1 }),
Some(Create { uuid: uuid2 }),
);
}
#[test]
fn test_related_updates_different_props() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
},
Update {
uuid,
property: "def".into(),
value: Some("false".into()),
timestamp,
},
Some(Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
}),
Some(Update {
uuid,
property: "def".into(),
value: Some("false".into()),
timestamp,
}),
);
}
#[test]
fn test_related_updates_same_prop() {
let uuid = Uuid::new_v4();
let timestamp1 = Utc::now();
let timestamp2 = timestamp1 + Duration::seconds(10);
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp: timestamp1,
},
Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp: timestamp2,
},
None,
Some(Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp: timestamp2,
}),
);
}
#[test]
fn test_related_updates_same_prop_same_time() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
},
Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp,
},
Some(Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
}),
None,
);
}
fn uuid_strategy() -> impl Strategy<Value = Uuid> {
prop_oneof![
Just(Uuid::parse_str("83a2f9ef-f455-4195-b92e-a54c161eebfc").unwrap()),
Just(Uuid::parse_str("56e0be07-c61f-494c-a54c-bdcfdd52d2a7").unwrap()),
Just(Uuid::parse_str("4b7ed904-f7b0-4293-8a10-ad452422c7b3").unwrap()),
Just(Uuid::parse_str("9bdd0546-07c8-4e1f-a9bc-9d6299f4773b").unwrap()),
]
}
fn operation_strategy() -> impl Strategy<Value = Operation> {
prop_oneof![
uuid_strategy().prop_map(|uuid| Operation::Create { uuid }),
uuid_strategy().prop_map(|uuid| Operation::Delete { uuid }),
(uuid_strategy(), "(title|project|status)").prop_map(|(uuid, property)| {
Operation::Update {
uuid,
property,
value: Some("true".into()),
timestamp: Utc::now(),
}
}),
]
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 1024, .. ProptestConfig::default()
})]
#[test]
// check that the two operation sequences have the same effect, enforcing the invariant of
// the transform function.
fn transform_invariant_holds(o1 in operation_strategy(), o2 in operation_strategy()) {
let (o1p, o2p) = Operation::transform(o1.clone(), o2.clone());
let mut db1 = TaskDB::new(Box::new(InMemoryStorage::new()));
let mut db2 = TaskDB::new(Box::new(InMemoryStorage::new()));
// Ensure that any expected tasks already exist
if let Operation::Update{ ref uuid, .. } = o1 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
}
if let Operation::Update{ ref uuid, .. } = o2 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
}
if let Operation::Delete{ ref uuid } = o1 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
}
if let Operation::Delete{ ref uuid } = o2 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
}
// if applying the initial operations fail, that indicates the operation was invalid
// in the base state, so consider the case successful.
if let Err(_) = db1.apply(o1) {
return Ok(());
}
if let Err(_) = db2.apply(o2) {
return Ok(());
}
if let Some(o) = o2p {
db1.apply(o).map_err(|e| TestCaseError::Fail(format!("Applying to db1: {}", e).into()))?;
}
if let Some(o) = o1p {
db2.apply(o).map_err(|e| TestCaseError::Fail(format!("Applying to db2: {}", e).into()))?;
}
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
}
}