Apply SyncOps, but keep a list of ReplicaOps

This changes a lot of function signatures, but basically:
 * TaskDB::apply now takes a SyncOp, not a ReplicaOp
 * Replica::update_task returns a TaskMap
This commit is contained in:
Dustin J. Mitchell 2021-12-19 20:54:48 +00:00
parent 0b29efab31
commit fee25fa742
8 changed files with 440 additions and 565 deletions

View file

@ -1,6 +1,5 @@
use crate::errors::Error;
use crate::server::Server;
use crate::storage::{ReplicaOp, Storage, TaskMap};
use crate::server::{Server, SyncOp};
use crate::storage::{Storage, TaskMap};
use crate::task::{Status, Task};
use crate::taskdb::TaskDb;
use crate::workingset::WorkingSet;
@ -51,12 +50,12 @@ impl Replica {
uuid: Uuid,
property: S1,
value: Option<S2>,
) -> anyhow::Result<()>
) -> anyhow::Result<TaskMap>
where
S1: Into<String>,
S2: Into<String>,
{
self.taskdb.apply(ReplicaOp::Update {
self.taskdb.apply(SyncOp::Update {
uuid,
property: property.into(),
value: value.map(|v| v.into()),
@ -100,9 +99,9 @@ impl Replica {
/// Create a new task. The task must not already exist.
pub fn new_task(&mut self, status: Status, description: String) -> anyhow::Result<Task> {
let uuid = Uuid::new_v4();
self.taskdb.apply(ReplicaOp::Create { uuid })?;
let taskmap = self.taskdb.apply(SyncOp::Create { uuid })?;
trace!("task {} created", uuid);
let mut task = Task::new(uuid, TaskMap::new()).into_mut(self);
let mut task = Task::new(uuid, taskmap).into_mut(self);
task.set_description(description)?;
task.set_status(status)?;
Ok(task.into_immut())
@ -113,12 +112,7 @@ impl Replica {
/// should only occur through expiration.
#[allow(dead_code)]
fn delete_task(&mut self, uuid: Uuid) -> anyhow::Result<()> {
// check that it already exists; this is a convenience check, as the task may already exist
// when this Create operation is finally sync'd with operations from other replicas
if self.taskdb.get_task(uuid)?.is_none() {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
self.taskdb.apply(ReplicaOp::Delete { uuid })?;
self.taskdb.apply(SyncOp::Delete { uuid })?;
trace!("task {} deleted", uuid);
Ok(())
}

View file

@ -215,20 +215,20 @@ mod test {
// the transform function.
let mut db1 = TaskDb::new_inmemory();
if let Some(ref o) = setup {
db1.apply_sync_tmp(o.clone()).unwrap();
db1.apply(o.clone()).unwrap();
}
db1.apply_sync_tmp(o1).unwrap();
db1.apply(o1).unwrap();
if let Some(o) = o2p {
db1.apply_sync_tmp(o).unwrap();
db1.apply(o).unwrap();
}
let mut db2 = TaskDb::new_inmemory();
if let Some(ref o) = setup {
db2.apply_sync_tmp(o.clone()).unwrap();
db2.apply(o.clone()).unwrap();
}
db2.apply_sync_tmp(o2).unwrap();
db2.apply(o2).unwrap();
if let Some(o) = o1p {
db2.apply_sync_tmp(o).unwrap();
db2.apply(o).unwrap();
}
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
@ -380,39 +380,39 @@ mod test {
// Ensure that any expected tasks already exist
if let Update{ uuid, .. } = o1 {
let _ = db1.apply_sync_tmp(Create{uuid});
let _ = db2.apply_sync_tmp(Create{uuid});
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
if let Update{ uuid, .. } = o2 {
let _ = db1.apply_sync_tmp(Create{uuid});
let _ = db2.apply_sync_tmp(Create{uuid});
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
if let Delete{ uuid } = o1 {
let _ = db1.apply_sync_tmp(Create{uuid});
let _ = db2.apply_sync_tmp(Create{uuid});
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
if let Delete{ uuid } = o2 {
let _ = db1.apply_sync_tmp(Create{uuid});
let _ = db2.apply_sync_tmp(Create{uuid});
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
// if applying the initial operations fail, that indicates the operation was invalid
// in the base state, so consider the case successful.
if db1.apply_sync_tmp(o1).is_err() {
if db1.apply(o1).is_err() {
return Ok(());
}
if db2.apply_sync_tmp(o2).is_err() {
if db2.apply(o2).is_err() {
return Ok(());
}
if let Some(o) = o2p {
db1.apply_sync_tmp(o).map_err(|e| TestCaseError::Fail(format!("Applying to db1: {}", e).into()))?;
db1.apply(o).map_err(|e| TestCaseError::Fail(format!("Applying to db1: {}", e).into()))?;
}
if let Some(o) = o1p {
db2.apply_sync_tmp(o).map_err(|e| TestCaseError::Fail(format!("Applying to db2: {}", e).into()))?;
db2.apply(o).map_err(|e| TestCaseError::Fail(format!("Applying to db2: {}", e).into()))?;
}
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}

View file

@ -126,163 +126,17 @@ impl ReplicaOp {
#[cfg(test)]
mod test {
use super::*;
use crate::storage::InMemoryStorage;
use crate::taskdb::TaskDb;
use chrono::{Duration, Utc};
use chrono::Utc;
use pretty_assertions::assert_eq;
use proptest::prelude::*;
// note that `tests/operation_transform_invariant.rs` tests the transform function quite
// thoroughly, so this testing is light.
fn test_transform(
setup: Option<ReplicaOp>,
o1: ReplicaOp,
o2: ReplicaOp,
exp1p: Option<ReplicaOp>,
exp2p: Option<ReplicaOp>,
) {
let (o1p, o2p) = ReplicaOp::transform(o1.clone(), o2.clone());
assert_eq!((&o1p, &o2p), (&exp1p, &exp2p));
// check that the two operation sequences have the same effect, enforcing the invariant of
// the transform function.
let mut db1 = TaskDb::new_inmemory();
if let Some(ref o) = setup {
db1.apply(o.clone()).unwrap();
}
db1.apply(o1).unwrap();
if let Some(o) = o2p {
db1.apply(o).unwrap();
}
let mut db2 = TaskDb::new_inmemory();
if let Some(ref o) = setup {
db2.apply(o.clone()).unwrap();
}
db2.apply(o2).unwrap();
if let Some(o) = o1p {
db2.apply(o).unwrap();
}
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
#[test]
fn test_unrelated_create() {
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
test_transform(
None,
Create { uuid: uuid1 },
Create { uuid: uuid2 },
Some(Create { uuid: uuid1 }),
Some(Create { uuid: uuid2 }),
);
}
#[test]
fn test_related_updates_different_props() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
},
Update {
uuid,
property: "def".into(),
value: Some("false".into()),
timestamp,
},
Some(Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
}),
Some(Update {
uuid,
property: "def".into(),
value: Some("false".into()),
timestamp,
}),
);
}
#[test]
fn test_related_updates_same_prop() {
let uuid = Uuid::new_v4();
let timestamp1 = Utc::now();
let timestamp2 = timestamp1 + Duration::seconds(10);
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp: timestamp1,
},
Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp: timestamp2,
},
None,
Some(Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp: timestamp2,
}),
);
}
#[test]
fn test_related_updates_same_prop_same_time() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
},
Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp,
},
Some(Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
}),
None,
);
}
#[test]
fn test_json_create() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Create { uuid };
assert_eq!(
serde_json::to_string(&op)?,
format!(r#"{{"Create":{{"uuid":"{}"}}}}"#, uuid),
);
let json = serde_json::to_string(&op)?;
assert_eq!(json, format!(r#"{{"Create":{{"uuid":"{}"}}}}"#, uuid));
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
@ -290,10 +144,10 @@ mod test {
fn test_json_delete() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Delete { uuid };
assert_eq!(
serde_json::to_string(&op)?,
format!(r#"{{"Delete":{{"uuid":"{}"}}}}"#, uuid),
);
let json = serde_json::to_string(&op)?;
assert_eq!(json, format!(r#"{{"Delete":{{"uuid":"{}"}}}}"#, uuid));
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
@ -309,13 +163,16 @@ mod test {
timestamp,
};
let json = serde_json::to_string(&op)?;
assert_eq!(
serde_json::to_string(&op)?,
json,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":"false","timestamp":"{:?}"}}}}"#,
uuid, timestamp,
),
)
);
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
@ -331,90 +188,16 @@ mod test {
timestamp,
};
let json = serde_json::to_string(&op)?;
assert_eq!(
serde_json::to_string(&op)?,
json,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":null,"timestamp":"{:?}"}}}}"#,
uuid, timestamp,
),
)
);
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
fn uuid_strategy() -> impl Strategy<Value = Uuid> {
prop_oneof![
Just(Uuid::parse_str("83a2f9ef-f455-4195-b92e-a54c161eebfc").unwrap()),
Just(Uuid::parse_str("56e0be07-c61f-494c-a54c-bdcfdd52d2a7").unwrap()),
Just(Uuid::parse_str("4b7ed904-f7b0-4293-8a10-ad452422c7b3").unwrap()),
Just(Uuid::parse_str("9bdd0546-07c8-4e1f-a9bc-9d6299f4773b").unwrap()),
]
}
fn operation_strategy() -> impl Strategy<Value = ReplicaOp> {
prop_oneof![
uuid_strategy().prop_map(|uuid| ReplicaOp::Create { uuid }),
uuid_strategy().prop_map(|uuid| ReplicaOp::Delete { uuid }),
(uuid_strategy(), "(title|project|status)").prop_map(|(uuid, property)| {
ReplicaOp::Update {
uuid,
property,
value: Some("true".into()),
timestamp: Utc::now(),
}
}),
]
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 1024, .. ProptestConfig::default()
})]
#[test]
// check that the two operation sequences have the same effect, enforcing the invariant of
// the transform function.
fn transform_invariant_holds(o1 in operation_strategy(), o2 in operation_strategy()) {
let (o1p, o2p) = ReplicaOp::transform(o1.clone(), o2.clone());
let mut db1 = TaskDb::new(Box::new(InMemoryStorage::new()));
let mut db2 = TaskDb::new(Box::new(InMemoryStorage::new()));
// Ensure that any expected tasks already exist
if let ReplicaOp::Update{ ref uuid, .. } = o1 {
let _ = db1.apply(ReplicaOp::Create{uuid: uuid.clone()});
let _ = db2.apply(ReplicaOp::Create{uuid: uuid.clone()});
}
if let ReplicaOp::Update{ ref uuid, .. } = o2 {
let _ = db1.apply(ReplicaOp::Create{uuid: uuid.clone()});
let _ = db2.apply(ReplicaOp::Create{uuid: uuid.clone()});
}
if let ReplicaOp::Delete{ ref uuid } = o1 {
let _ = db1.apply(ReplicaOp::Create{uuid: uuid.clone()});
let _ = db2.apply(ReplicaOp::Create{uuid: uuid.clone()});
}
if let ReplicaOp::Delete{ ref uuid } = o2 {
let _ = db1.apply(ReplicaOp::Create{uuid: uuid.clone()});
let _ = db2.apply(ReplicaOp::Create{uuid: uuid.clone()});
}
// if applying the initial operations fail, that indicates the operation was invalid
// in the base state, so consider the case successful.
if let Err(_) = db1.apply(o1) {
return Ok(());
}
if let Err(_) = db2.apply(o2) {
return Ok(());
}
if let Some(o) = o2p {
db1.apply(o).map_err(|e| TestCaseError::Fail(format!("Applying to db1: {}", e).into()))?;
}
if let Some(o) = o1p {
db2.apply(o).map_err(|e| TestCaseError::Fail(format!("Applying to db2: {}", e).into()))?;
}
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
}
}

View file

@ -0,0 +1,313 @@
use crate::errors::Error;
use crate::server::SyncOp;
use crate::storage::{ReplicaOp, StorageTxn, TaskMap};
/// Apply the given SyncOp to the replica, updating both the task data and adding a
/// ReplicaOp to the list of operations. Returns the TaskMap of the task after the
/// operation has been applied (or an empty TaskMap for Delete).
pub(super) fn apply(txn: &mut dyn StorageTxn, op: SyncOp) -> anyhow::Result<TaskMap> {
match op {
SyncOp::Create { uuid } => {
let created = txn.create_task(uuid)?;
if created {
txn.add_operation(ReplicaOp::Create { uuid })?;
txn.commit()?;
Ok(TaskMap::new())
} else {
// TODO: differentiate error types here?
Err(Error::Database(format!("Task {} already exists", uuid)).into())
}
}
SyncOp::Delete { uuid } => {
let task = txn.get_task(uuid)?;
// (we'll need _task in the next commit)
if let Some(_task) = task {
txn.delete_task(uuid)?;
txn.add_operation(ReplicaOp::Delete { uuid })?;
txn.commit()?;
Ok(TaskMap::new())
} else {
Err(Error::Database(format!("Task {} does not exist", uuid)).into())
}
}
SyncOp::Update {
uuid,
property,
value,
timestamp,
} => {
let task = txn.get_task(uuid)?;
if let Some(mut task) = task {
if let Some(ref v) = value {
task.insert(property.clone(), v.clone());
} else {
task.remove(&property);
}
txn.set_task(uuid, task.clone())?;
txn.add_operation(ReplicaOp::Update {
uuid,
property,
value,
timestamp,
})?;
txn.commit()?;
Ok(task)
} else {
Err(Error::Database(format!("Task {} does not exist", uuid)).into())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::taskdb::TaskDb;
use chrono::Utc;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn test_apply_create() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
assert_eq!(db.operations(), vec![ReplicaOp::Create { uuid }]);
Ok(())
}
#[test]
fn test_apply_create_exists() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op.clone())?;
assert_eq!(taskmap.len(), 0);
assert_eq!(
apply(txn.as_mut(), op).err().unwrap().to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
txn.commit()?;
}
// first op was applied
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]);
assert_eq!(db.operations(), vec![ReplicaOp::Create { uuid }]);
Ok(())
}
#[test]
fn test_apply_create_update() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let now = Utc::now();
let op1 = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
let op2 = SyncOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let mut taskmap = apply(txn.as_mut(), op2)?;
assert_eq!(
taskmap.drain().collect::<Vec<(_, _)>>(),
vec![("title".into(), "my task".into())]
);
txn.commit()?;
}
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("title".into(), "my task".into())])]
);
assert_eq!(
db.operations(),
vec![
ReplicaOp::Create { uuid },
ReplicaOp::Update {
uuid,
property: "title".into(),
value: Some("my task".into()),
timestamp: now
}
]
);
Ok(())
}
#[test]
fn test_apply_create_update_delete_prop() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let now = Utc::now();
let op1 = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
let op2 = SyncOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op2)?;
assert_eq!(taskmap.get("title"), Some(&"my task".to_owned()));
txn.commit()?;
}
let op3 = SyncOp::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op3)?;
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
}
let op4 = SyncOp::Update {
uuid,
property: String::from("title"),
value: None,
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op4)?;
assert_eq!(taskmap.get("title"), None);
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
}
let mut exp = HashMap::new();
let mut task = HashMap::new();
task.insert(String::from("priority"), String::from("H"));
exp.insert(uuid, task);
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("priority".into(), "H".into())])]
);
assert_eq!(
db.operations(),
vec![
ReplicaOp::Create { uuid },
ReplicaOp::Update {
uuid,
property: "title".into(),
value: Some("my task".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid,
property: "priority".into(),
value: Some("H".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid,
property: "title".into(),
value: None,
timestamp: now,
}
]
);
Ok(())
}
#[test]
fn test_apply_update_does_not_exist() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply(txn.as_mut(), op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_apply_create_delete() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = SyncOp::Create { uuid };
let op2 = SyncOp::Delete { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
let taskmap = apply(txn.as_mut(), op2)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(
db.operations(),
vec![ReplicaOp::Create { uuid }, ReplicaOp::Delete { uuid },]
);
Ok(())
}
#[test]
fn test_apply_delete_not_present() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Delete { uuid };
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply(txn.as_mut(), op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
}

View file

@ -1,8 +1,11 @@
use crate::server::{Server, SyncOp};
use crate::storage::{ReplicaOp, Storage, TaskMap};
use crate::storage::{Storage, TaskMap};
use uuid::Uuid;
mod ops;
#[cfg(test)]
use crate::storage::ReplicaOp;
mod apply;
mod snapshot;
mod sync;
mod working_set;
@ -28,39 +31,16 @@ impl TaskDb {
TaskDb::new(Box::new(InMemoryStorage::new()))
}
/// Apply an operation to the TaskDb. Aside from synchronization operations, this is the only way
/// to modify the TaskDb. In cases where an operation does not make sense, this function will do
/// nothing and return an error (but leave the TaskDb in a consistent state).
pub fn apply(&mut self, op: ReplicaOp) -> anyhow::Result<()> {
// TODO: differentiate error types here?
/// Apply an operation to the TaskDb. This will update the set of tasks and add a ReplicaOp to
/// the set of operations in the TaskDb, and return the TaskMap containing the resulting task's
/// properties (or an empty TaskMap for deletion).
///
/// Aside from synchronization operations, this is the only way to modify the TaskDb. In cases
/// where an operation does not make sense, this function will do nothing and return an error
/// (but leave the TaskDb in a consistent state).
pub fn apply(&mut self, op: SyncOp) -> anyhow::Result<TaskMap> {
let mut txn = self.storage.txn()?;
if let err @ Err(_) = ops::apply_op(txn.as_mut(), &op) {
return err;
}
txn.add_operation(op)?;
txn.commit()?;
Ok(())
}
// temporary
pub fn apply_sync_tmp(&mut self, op: SyncOp) -> anyhow::Result<()> {
// create an op from SyncOp
let op = match op {
SyncOp::Create { uuid } => ReplicaOp::Create { uuid },
SyncOp::Delete { uuid } => ReplicaOp::Delete { uuid },
SyncOp::Update {
uuid,
property,
value,
timestamp,
} => ReplicaOp::Update {
uuid,
property,
value,
timestamp,
},
};
self.apply(op)
apply::apply(txn.as_mut(), op)
}
/// Get all tasks.
@ -172,7 +152,7 @@ impl TaskDb {
mod tests {
use super::*;
use crate::server::test::TestServer;
use crate::storage::InMemoryStorage;
use crate::storage::{InMemoryStorage, ReplicaOp};
use chrono::Utc;
use pretty_assertions::assert_eq;
use proptest::prelude::*;
@ -181,14 +161,14 @@ mod tests {
#[test]
fn test_apply() {
// this verifies that the operation is both applied and included in the list of
// operations; more detailed tests are in the `ops` module.
// operations; more detailed tests are in the `apply` module.
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = ReplicaOp::Create { uuid };
let op = SyncOp::Create { uuid };
db.apply(op.clone()).unwrap();
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
assert_eq!(db.operations(), vec![op]);
assert_eq!(db.operations(), vec![ReplicaOp::Create { uuid }]);
}
fn newdb() -> TaskDb {
@ -197,7 +177,7 @@ mod tests {
#[derive(Debug)]
enum Action {
Op(ReplicaOp),
Op(SyncOp),
Sync,
}
@ -209,14 +189,14 @@ mod tests {
.chunks(2)
.map(|action_on| {
let action = match action_on[0] {
b'C' => Action::Op(ReplicaOp::Create { uuid }),
b'U' => Action::Op(ReplicaOp::Update {
b'C' => Action::Op(SyncOp::Create { uuid }),
b'U' => Action::Op(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("foo".into()),
timestamp: Utc::now(),
}),
b'D' => Action::Op(ReplicaOp::Delete { uuid }),
b'D' => Action::Op(SyncOp::Delete { uuid }),
b'S' => Action::Sync,
_ => unreachable!(),
};

View file

@ -1,233 +0,0 @@
use crate::errors::Error;
use crate::storage::{ReplicaOp, StorageTxn};
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &ReplicaOp) -> anyhow::Result<()> {
match op {
ReplicaOp::Create { uuid } => {
// insert if the task does not already exist
if !txn.create_task(*uuid)? {
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
}
}
ReplicaOp::Delete { ref uuid } => {
if !txn.delete_task(*uuid)? {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
ReplicaOp::Update {
ref uuid,
ref property,
ref value,
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(mut task) = txn.get_task(*uuid)? {
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
txn.set_task(*uuid, task)?;
} else {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::taskdb::TaskDb;
use chrono::Utc;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn test_apply_create() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = ReplicaOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op)?;
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
Ok(())
}
#[test]
fn test_apply_create_exists() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = ReplicaOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op)?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
txn.commit()?;
}
// first op was applied
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]);
Ok(())
}
#[test]
fn test_apply_create_update() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = ReplicaOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
txn.commit()?;
}
let op2 = ReplicaOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("title".into(), "my task".into())])]
);
Ok(())
}
#[test]
fn test_apply_create_update_delete_prop() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = ReplicaOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
txn.commit()?;
}
let op2 = ReplicaOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
let op3 = ReplicaOp::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op3)?;
txn.commit()?;
}
let op4 = ReplicaOp::Update {
uuid,
property: String::from("title"),
value: None,
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op4)?;
txn.commit()?;
}
let mut exp = HashMap::new();
let mut task = HashMap::new();
task.insert(String::from("priority"), String::from("H"));
exp.insert(uuid, task);
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("priority".into(), "H".into())])]
);
Ok(())
}
#[test]
fn test_apply_update_does_not_exist() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = ReplicaOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_apply_create_delete() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = ReplicaOp::Create { uuid };
let op2 = ReplicaOp::Delete { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![]);
Ok(())
}
#[test]
fn test_apply_delete_not_present() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = ReplicaOp::Delete { uuid };
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
}

View file

@ -1,4 +1,4 @@
use super::{ops, snapshot};
use super::snapshot;
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency};
use crate::storage::{ReplicaOp, StorageTxn};
use crate::Error;
@ -11,6 +11,44 @@ struct Version {
operations: Vec<ReplicaOp>,
}
/// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations)
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &ReplicaOp) -> anyhow::Result<()> {
// TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps
// unnecessariliy
match op {
ReplicaOp::Create { uuid } => {
// insert if the task does not already exist
if !txn.create_task(*uuid)? {
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
}
}
ReplicaOp::Delete { ref uuid } => {
if !txn.delete_task(*uuid)? {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
ReplicaOp::Update {
ref uuid,
ref property,
ref value,
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(mut task) = txn.get_task(*uuid)? {
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
txn.set_task(*uuid, task)?;
} else {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
}
Ok(())
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub(super) fn sync(
server: &mut Box<dyn Server>,
@ -161,7 +199,7 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu
}
}
if let Some(o) = svr_op {
if let Err(e) = ops::apply_op(txn, &o) {
if let Err(e) = apply_op(txn, &o) {
warn!("Invalid operation when syncing: {} (ignored)", e);
}
}
@ -174,8 +212,8 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu
#[cfg(test)]
mod test {
use super::*;
use crate::server::test::TestServer;
use crate::storage::{InMemoryStorage, ReplicaOp};
use crate::server::{test::TestServer, SyncOp};
use crate::storage::InMemoryStorage;
use crate::taskdb::{snapshot::SnapshotTasks, TaskDb};
use chrono::Utc;
use pretty_assertions::assert_eq;
@ -197,8 +235,8 @@ mod test {
// make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4();
db1.apply(ReplicaOp::Create { uuid: uuid1 }).unwrap();
db1.apply(ReplicaOp::Update {
db1.apply(SyncOp::Create { uuid: uuid1 }).unwrap();
db1.apply(SyncOp::Update {
uuid: uuid1,
property: "title".into(),
value: Some("my first task".into()),
@ -207,8 +245,8 @@ mod test {
.unwrap();
let uuid2 = Uuid::new_v4();
db2.apply(ReplicaOp::Create { uuid: uuid2 }).unwrap();
db2.apply(ReplicaOp::Update {
db2.apply(SyncOp::Create { uuid: uuid2 }).unwrap();
db2.apply(SyncOp::Update {
uuid: uuid2,
property: "title".into(),
value: Some("my second task".into()),
@ -223,14 +261,14 @@ mod test {
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// now make updates to the same task on both sides
db1.apply(ReplicaOp::Update {
db1.apply(SyncOp::Update {
uuid: uuid2,
property: "priority".into(),
value: Some("H".into()),
timestamp: Utc::now(),
})
.unwrap();
db2.apply(ReplicaOp::Update {
db2.apply(SyncOp::Update {
uuid: uuid2,
property: "project".into(),
value: Some("personal".into()),
@ -259,8 +297,8 @@ mod test {
// create and update a task..
let uuid = Uuid::new_v4();
db1.apply(ReplicaOp::Create { uuid }).unwrap();
db1.apply(ReplicaOp::Update {
db1.apply(SyncOp::Create { uuid }).unwrap();
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
@ -275,9 +313,9 @@ mod test {
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// delete and re-create the task on db1
db1.apply(ReplicaOp::Delete { uuid }).unwrap();
db1.apply(ReplicaOp::Create { uuid }).unwrap();
db1.apply(ReplicaOp::Update {
db1.apply(SyncOp::Delete { uuid }).unwrap();
db1.apply(SyncOp::Create { uuid }).unwrap();
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my second task".into()),
@ -286,7 +324,7 @@ mod test {
.unwrap();
// and on db2, update a property of the task
db2.apply(ReplicaOp::Update {
db2.apply(SyncOp::Update {
uuid,
property: "project".into(),
value: Some("personal".into()),
@ -310,8 +348,8 @@ mod test {
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(ReplicaOp::Create { uuid })?;
db1.apply(ReplicaOp::Update {
db1.apply(SyncOp::Create { uuid })?;
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
@ -332,7 +370,7 @@ mod test {
assert_eq!(tasks[0].0, uuid);
// update the taskdb and sync again
db1.apply(ReplicaOp::Update {
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my first task, updated".into()),
@ -362,7 +400,7 @@ mod test {
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(ReplicaOp::Create { uuid }).unwrap();
db1.apply(SyncOp::Create { uuid }).unwrap();
test_server.set_snapshot_urgency(SnapshotUrgency::Low);
sync(&mut server, db1.storage.txn()?.as_mut(), true).unwrap();

View file

@ -63,7 +63,7 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::storage::ReplicaOp;
use crate::server::SyncOp;
use crate::taskdb::TaskDb;
use chrono::Utc;
use uuid::Uuid;
@ -94,10 +94,10 @@ mod test {
// add everything to the TaskDb
for uuid in &uuids {
db.apply(ReplicaOp::Create { uuid: *uuid })?;
db.apply(SyncOp::Create { uuid: *uuid })?;
}
for i in &[0usize, 1, 4] {
db.apply(ReplicaOp::Update {
db.apply(SyncOp::Update {
uuid: uuids[*i].clone(),
property: String::from("status"),
value: Some("pending".into()),