From 79f07b57ade1153d078c9f903b47c44706bb9f71 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 11 Oct 2021 20:56:10 -0400 Subject: [PATCH 1/7] more taskdb refactoring --- taskchampion/src/taskdb/mod.rs | 394 +------------------------ taskchampion/src/taskdb/ops.rs | 196 ++++++++++++ taskchampion/src/taskdb/sync.rs | 131 ++++++++ taskchampion/src/taskdb/working_set.rs | 167 +++++++++++ 4 files changed, 499 insertions(+), 389 deletions(-) create mode 100644 taskchampion/src/taskdb/working_set.rs diff --git a/taskchampion/src/taskdb/mod.rs b/taskchampion/src/taskdb/mod.rs index 1b84f2c9b..850628719 100644 --- a/taskchampion/src/taskdb/mod.rs +++ b/taskchampion/src/taskdb/mod.rs @@ -1,10 +1,10 @@ use crate::server::Server; use crate::storage::{Operation, Storage, TaskMap}; -use std::collections::HashSet; use uuid::Uuid; mod ops; mod sync; +mod working_set; /// A TaskDb is the backend for a replica. It manages the storage, operations, synchronization, /// and so on, and all the invariants that come with it. It leaves the meaning of particular task @@ -74,57 +74,7 @@ impl TaskDb { where F: Fn(&TaskMap) -> bool, { - let mut txn = self.storage.txn()?; - - let mut new_ws = vec![None]; // index 0 is always None - let mut seen = HashSet::new(); - - // The goal here is for existing working-set items to be "compressed' down to index 1, so - // we begin by scanning the current working set and inserting any tasks that should still - // be in the set into new_ws, implicitly dropping any tasks that are no longer in the - // working set. - for elt in txn.get_working_set()?.drain(1..) { - if let Some(uuid) = elt { - if let Some(task) = txn.get_task(uuid)? { - if in_working_set(&task) { - new_ws.push(Some(uuid)); - seen.insert(uuid); - continue; - } - } - } - - // if we are not renumbering, then insert a blank working-set entry here - if !renumber { - new_ws.push(None); - } - } - - // if renumbering, clear the working set and re-add - if renumber { - txn.clear_working_set()?; - for elt in new_ws.drain(1..new_ws.len()).flatten() { - txn.add_to_working_set(elt)?; - } - } else { - // ..otherwise, just clear the None items determined above from the working set - for (i, elt) in new_ws.iter().enumerate().skip(1) { - if elt.is_none() { - txn.set_working_set_item(i, None)?; - } - } - } - - // Now go hunting for tasks that should be in this list but are not, adding them at the - // end of the list, whether renumbering or not - for (uuid, task) in txn.all_tasks()? { - if !seen.contains(&uuid) && in_working_set(&task) { - txn.add_to_working_set(uuid)?; - } - } - - txn.commit()?; - Ok(()) + working_set::rebuild(self.storage.txn()?.as_mut(), in_working_set, renumber) } /// Add the given uuid to the working set and return its index; if it is already in the working @@ -190,11 +140,12 @@ mod tests { use chrono::Utc; use pretty_assertions::assert_eq; use proptest::prelude::*; - use std::collections::HashMap; use uuid::Uuid; #[test] - fn test_apply_create() { + fn test_apply() { + // this verifies that the operation is both applied and included in the list of + // operations; more detailed tests are in the `ops` module. let mut db = TaskDb::new_inmemory(); let uuid = Uuid::new_v4(); let op = Operation::Create { uuid }; @@ -204,345 +155,10 @@ mod tests { assert_eq!(db.operations(), vec![op]); } - #[test] - fn test_apply_create_exists() { - let mut db = TaskDb::new_inmemory(); - let uuid = Uuid::new_v4(); - let op = Operation::Create { uuid }; - db.apply(op.clone()).unwrap(); - assert_eq!( - db.apply(op.clone()).err().unwrap().to_string(), - format!("Task Database Error: Task {} already exists", uuid) - ); - - assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]); - assert_eq!(db.operations(), vec![op]); - } - - #[test] - fn test_apply_create_update() { - let mut db = TaskDb::new_inmemory(); - let uuid = Uuid::new_v4(); - let op1 = Operation::Create { uuid }; - db.apply(op1.clone()).unwrap(); - let op2 = Operation::Update { - uuid, - property: String::from("title"), - value: Some("my task".into()), - timestamp: Utc::now(), - }; - db.apply(op2.clone()).unwrap(); - - assert_eq!( - db.sorted_tasks(), - vec![(uuid, vec![("title".into(), "my task".into())])] - ); - assert_eq!(db.operations(), vec![op1, op2]); - } - - #[test] - fn test_apply_create_update_delete_prop() { - let mut db = TaskDb::new_inmemory(); - let uuid = Uuid::new_v4(); - let op1 = Operation::Create { uuid }; - db.apply(op1.clone()).unwrap(); - - let op2 = Operation::Update { - uuid, - property: String::from("title"), - value: Some("my task".into()), - timestamp: Utc::now(), - }; - db.apply(op2.clone()).unwrap(); - - let op3 = Operation::Update { - uuid, - property: String::from("priority"), - value: Some("H".into()), - timestamp: Utc::now(), - }; - db.apply(op3.clone()).unwrap(); - - let op4 = Operation::Update { - uuid, - property: String::from("title"), - value: None, - timestamp: Utc::now(), - }; - db.apply(op4.clone()).unwrap(); - - let mut exp = HashMap::new(); - let mut task = HashMap::new(); - task.insert(String::from("priority"), String::from("H")); - exp.insert(uuid, task); - assert_eq!( - db.sorted_tasks(), - vec![(uuid, vec![("priority".into(), "H".into())])] - ); - assert_eq!(db.operations(), vec![op1, op2, op3, op4]); - } - - #[test] - fn test_apply_update_does_not_exist() { - let mut db = TaskDb::new_inmemory(); - let uuid = Uuid::new_v4(); - let op = Operation::Update { - uuid, - property: String::from("title"), - value: Some("my task".into()), - timestamp: Utc::now(), - }; - assert_eq!( - db.apply(op).err().unwrap().to_string(), - format!("Task Database Error: Task {} does not exist", uuid) - ); - - assert_eq!(db.sorted_tasks(), vec![]); - assert_eq!(db.operations(), vec![]); - } - - #[test] - fn test_apply_create_delete() { - let mut db = TaskDb::new_inmemory(); - let uuid = Uuid::new_v4(); - let op1 = Operation::Create { uuid }; - db.apply(op1.clone()).unwrap(); - - let op2 = Operation::Delete { uuid }; - db.apply(op2.clone()).unwrap(); - - assert_eq!(db.sorted_tasks(), vec![]); - assert_eq!(db.operations(), vec![op1, op2]); - } - - #[test] - fn test_apply_delete_not_present() { - let mut db = TaskDb::new_inmemory(); - let uuid = Uuid::new_v4(); - - let op1 = Operation::Delete { uuid }; - assert_eq!( - db.apply(op1).err().unwrap().to_string(), - format!("Task Database Error: Task {} does not exist", uuid) - ); - - assert_eq!(db.sorted_tasks(), vec![]); - assert_eq!(db.operations(), vec![]); - } - - #[test] - fn rebuild_working_set_renumber() -> anyhow::Result<()> { - rebuild_working_set(true) - } - - #[test] - fn rebuild_working_set_no_renumber() -> anyhow::Result<()> { - rebuild_working_set(false) - } - - fn rebuild_working_set(renumber: bool) -> anyhow::Result<()> { - let mut db = TaskDb::new_inmemory(); - let mut uuids = vec![]; - uuids.push(Uuid::new_v4()); - println!("uuids[0]: {:?} - pending, not in working set", uuids[0]); - uuids.push(Uuid::new_v4()); - println!("uuids[1]: {:?} - pending, in working set", uuids[1]); - uuids.push(Uuid::new_v4()); - println!("uuids[2]: {:?} - not pending, not in working set", uuids[2]); - uuids.push(Uuid::new_v4()); - println!("uuids[3]: {:?} - not pending, in working set", uuids[3]); - uuids.push(Uuid::new_v4()); - println!("uuids[4]: {:?} - pending, in working set", uuids[4]); - - // add everything to the TaskDb - for uuid in &uuids { - db.apply(Operation::Create { uuid: *uuid })?; - } - for i in &[0usize, 1, 4] { - db.apply(Operation::Update { - uuid: uuids[*i].clone(), - property: String::from("status"), - value: Some("pending".into()), - timestamp: Utc::now(), - })?; - } - - // set the existing working_set as we want it - { - let mut txn = db.storage.txn()?; - txn.clear_working_set()?; - - for i in &[1usize, 3, 4] { - txn.add_to_working_set(uuids[*i])?; - } - - txn.commit()?; - } - - assert_eq!( - db.working_set()?, - vec![ - None, - Some(uuids[1].clone()), - Some(uuids[3].clone()), - Some(uuids[4].clone()) - ] - ); - - db.rebuild_working_set( - |t| { - if let Some(status) = t.get("status") { - status == "pending" - } else { - false - } - }, - renumber, - )?; - - let exp = if renumber { - // uuids[1] and uuids[4] are already in the working set, so are compressed - // to the top, and then uuids[0] is added. - vec![ - None, - Some(uuids[1].clone()), - Some(uuids[4].clone()), - Some(uuids[0].clone()), - ] - } else { - // uuids[1] and uuids[4] are already in the working set, at indexes 1 and 3, - // and then uuids[0] is added. - vec![ - None, - Some(uuids[1].clone()), - None, - Some(uuids[4].clone()), - Some(uuids[0].clone()), - ] - }; - - assert_eq!(db.working_set()?, exp); - - Ok(()) - } - fn newdb() -> TaskDb { TaskDb::new(Box::new(InMemoryStorage::new())) } - #[test] - fn test_sync() { - let mut server: Box = Box::new(TestServer::new()); - - let mut db1 = newdb(); - db1.sync(&mut server).unwrap(); - - let mut db2 = newdb(); - db2.sync(&mut server).unwrap(); - - // make some changes in parallel to db1 and db2.. - let uuid1 = Uuid::new_v4(); - db1.apply(Operation::Create { uuid: uuid1 }).unwrap(); - db1.apply(Operation::Update { - uuid: uuid1, - property: "title".into(), - value: Some("my first task".into()), - timestamp: Utc::now(), - }) - .unwrap(); - - let uuid2 = Uuid::new_v4(); - db2.apply(Operation::Create { uuid: uuid2 }).unwrap(); - db2.apply(Operation::Update { - uuid: uuid2, - property: "title".into(), - value: Some("my second task".into()), - timestamp: Utc::now(), - }) - .unwrap(); - - // and synchronize those around - db1.sync(&mut server).unwrap(); - db2.sync(&mut server).unwrap(); - db1.sync(&mut server).unwrap(); - assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); - - // now make updates to the same task on both sides - db1.apply(Operation::Update { - uuid: uuid2, - property: "priority".into(), - value: Some("H".into()), - timestamp: Utc::now(), - }) - .unwrap(); - db2.apply(Operation::Update { - uuid: uuid2, - property: "project".into(), - value: Some("personal".into()), - timestamp: Utc::now(), - }) - .unwrap(); - - // and synchronize those around - db1.sync(&mut server).unwrap(); - db2.sync(&mut server).unwrap(); - db1.sync(&mut server).unwrap(); - assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); - } - - #[test] - fn test_sync_create_delete() { - let mut server: Box = Box::new(TestServer::new()); - - let mut db1 = newdb(); - db1.sync(&mut server).unwrap(); - - let mut db2 = newdb(); - db2.sync(&mut server).unwrap(); - - // create and update a task.. - let uuid = Uuid::new_v4(); - db1.apply(Operation::Create { uuid }).unwrap(); - db1.apply(Operation::Update { - uuid, - property: "title".into(), - value: Some("my first task".into()), - timestamp: Utc::now(), - }) - .unwrap(); - - // and synchronize those around - db1.sync(&mut server).unwrap(); - db2.sync(&mut server).unwrap(); - db1.sync(&mut server).unwrap(); - assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); - - // delete and re-create the task on db1 - db1.apply(Operation::Delete { uuid }).unwrap(); - db1.apply(Operation::Create { uuid }).unwrap(); - db1.apply(Operation::Update { - uuid, - property: "title".into(), - value: Some("my second task".into()), - timestamp: Utc::now(), - }) - .unwrap(); - - // and on db2, update a property of the task - db2.apply(Operation::Update { - uuid, - property: "project".into(), - value: Some("personal".into()), - timestamp: Utc::now(), - }) - .unwrap(); - - db1.sync(&mut server).unwrap(); - db2.sync(&mut server).unwrap(); - db1.sync(&mut server).unwrap(); - assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); - } - #[derive(Debug)] enum Action { Op(Operation), diff --git a/taskchampion/src/taskdb/ops.rs b/taskchampion/src/taskdb/ops.rs index 8bfd003e0..7e23d04ce 100644 --- a/taskchampion/src/taskdb/ops.rs +++ b/taskchampion/src/taskdb/ops.rs @@ -35,3 +35,199 @@ pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &Operation) -> anyhow::Resu Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::taskdb::TaskDb; + use chrono::Utc; + use pretty_assertions::assert_eq; + use std::collections::HashMap; + use uuid::Uuid; + + #[test] + fn test_apply_create() -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let op = Operation::Create { uuid }; + + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op)?; + txn.commit()?; + } + + assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]); + Ok(()) + } + + #[test] + fn test_apply_create_exists() -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let op = Operation::Create { uuid }; + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op)?; + assert_eq!( + apply_op(txn.as_mut(), &op).err().unwrap().to_string(), + format!("Task Database Error: Task {} already exists", uuid) + ); + txn.commit()?; + } + + // first op was applied + assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]); + + Ok(()) + } + + #[test] + fn test_apply_create_update() -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let op1 = Operation::Create { uuid }; + + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op1)?; + txn.commit()?; + } + + let op2 = Operation::Update { + uuid, + property: String::from("title"), + value: Some("my task".into()), + timestamp: Utc::now(), + }; + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op2)?; + txn.commit()?; + } + + assert_eq!( + db.sorted_tasks(), + vec![(uuid, vec![("title".into(), "my task".into())])] + ); + + Ok(()) + } + + #[test] + fn test_apply_create_update_delete_prop() -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let op1 = Operation::Create { uuid }; + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op1)?; + txn.commit()?; + } + + let op2 = Operation::Update { + uuid, + property: String::from("title"), + value: Some("my task".into()), + timestamp: Utc::now(), + }; + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op2)?; + txn.commit()?; + } + + let op3 = Operation::Update { + uuid, + property: String::from("priority"), + value: Some("H".into()), + timestamp: Utc::now(), + }; + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op3)?; + txn.commit()?; + } + + let op4 = Operation::Update { + uuid, + property: String::from("title"), + value: None, + timestamp: Utc::now(), + }; + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op4)?; + txn.commit()?; + } + + let mut exp = HashMap::new(); + let mut task = HashMap::new(); + task.insert(String::from("priority"), String::from("H")); + exp.insert(uuid, task); + assert_eq!( + db.sorted_tasks(), + vec![(uuid, vec![("priority".into(), "H".into())])] + ); + + Ok(()) + } + + #[test] + fn test_apply_update_does_not_exist() -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let op = Operation::Update { + uuid, + property: String::from("title"), + value: Some("my task".into()), + timestamp: Utc::now(), + }; + { + let mut txn = db.storage.txn()?; + assert_eq!( + apply_op(txn.as_mut(), &op).err().unwrap().to_string(), + format!("Task Database Error: Task {} does not exist", uuid) + ); + txn.commit()?; + } + + Ok(()) + } + + #[test] + fn test_apply_create_delete() -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let op1 = Operation::Create { uuid }; + let op2 = Operation::Delete { uuid }; + + { + let mut txn = db.storage.txn()?; + apply_op(txn.as_mut(), &op1)?; + apply_op(txn.as_mut(), &op2)?; + txn.commit()?; + } + + assert_eq!(db.sorted_tasks(), vec![]); + + Ok(()) + } + + #[test] + fn test_apply_delete_not_present() -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let op = Operation::Delete { uuid }; + { + let mut txn = db.storage.txn()?; + assert_eq!( + apply_op(txn.as_mut(), &op).err().unwrap().to_string(), + format!("Task Database Error: Task {} does not exist", uuid) + ); + txn.commit()?; + } + + Ok(()) + } +} diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index af076056e..d8c145857 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -143,3 +143,134 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu txn.set_operations(local_operations)?; Ok(()) } + +#[cfg(test)] +mod test { + use super::*; + use crate::server::test::TestServer; + use crate::storage::{InMemoryStorage, Operation}; + use crate::taskdb::TaskDb; + use chrono::Utc; + use uuid::Uuid; + + fn newdb() -> TaskDb { + TaskDb::new(Box::new(InMemoryStorage::new())) + } + + #[test] + fn test_sync() -> anyhow::Result<()> { + let mut server: Box = Box::new(TestServer::new()); + + let mut db1 = newdb(); + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + + let mut db2 = newdb(); + sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + + // make some changes in parallel to db1 and db2.. + let uuid1 = Uuid::new_v4(); + db1.apply(Operation::Create { uuid: uuid1 }).unwrap(); + db1.apply(Operation::Update { + uuid: uuid1, + property: "title".into(), + value: Some("my first task".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + let uuid2 = Uuid::new_v4(); + db2.apply(Operation::Create { uuid: uuid2 }).unwrap(); + db2.apply(Operation::Update { + uuid: uuid2, + property: "title".into(), + value: Some("my second task".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + // and synchronize those around + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); + + // now make updates to the same task on both sides + db1.apply(Operation::Update { + uuid: uuid2, + property: "priority".into(), + value: Some("H".into()), + timestamp: Utc::now(), + }) + .unwrap(); + db2.apply(Operation::Update { + uuid: uuid2, + property: "project".into(), + value: Some("personal".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + // and synchronize those around + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); + + Ok(()) + } + + #[test] + fn test_sync_create_delete() -> anyhow::Result<()> { + let mut server: Box = Box::new(TestServer::new()); + + let mut db1 = newdb(); + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + + let mut db2 = newdb(); + sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + + // create and update a task.. + let uuid = Uuid::new_v4(); + db1.apply(Operation::Create { uuid }).unwrap(); + db1.apply(Operation::Update { + uuid, + property: "title".into(), + value: Some("my first task".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + // and synchronize those around + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); + + // delete and re-create the task on db1 + db1.apply(Operation::Delete { uuid }).unwrap(); + db1.apply(Operation::Create { uuid }).unwrap(); + db1.apply(Operation::Update { + uuid, + property: "title".into(), + value: Some("my second task".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + // and on db2, update a property of the task + db2.apply(Operation::Update { + uuid, + property: "project".into(), + value: Some("personal".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); + + Ok(()) + } +} diff --git a/taskchampion/src/taskdb/working_set.rs b/taskchampion/src/taskdb/working_set.rs new file mode 100644 index 000000000..d5e0774b0 --- /dev/null +++ b/taskchampion/src/taskdb/working_set.rs @@ -0,0 +1,167 @@ +use crate::storage::{StorageTxn, TaskMap}; +use std::collections::HashSet; + +/// Rebuild the working set using a function to identify tasks that should be in the set. This +/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that +/// are not already in the working set but should be. The rebuild occurs in a single +/// trasnsaction against the storage backend. +pub fn rebuild(txn: &mut dyn StorageTxn, in_working_set: F, renumber: bool) -> anyhow::Result<()> +where + F: Fn(&TaskMap) -> bool, +{ + let mut new_ws = vec![None]; // index 0 is always None + let mut seen = HashSet::new(); + + // The goal here is for existing working-set items to be "compressed' down to index 1, so + // we begin by scanning the current working set and inserting any tasks that should still + // be in the set into new_ws, implicitly dropping any tasks that are no longer in the + // working set. + for elt in txn.get_working_set()?.drain(1..) { + if let Some(uuid) = elt { + if let Some(task) = txn.get_task(uuid)? { + if in_working_set(&task) { + new_ws.push(Some(uuid)); + seen.insert(uuid); + continue; + } + } + } + + // if we are not renumbering, then insert a blank working-set entry here + if !renumber { + new_ws.push(None); + } + } + + // if renumbering, clear the working set and re-add + if renumber { + txn.clear_working_set()?; + for elt in new_ws.drain(1..new_ws.len()).flatten() { + txn.add_to_working_set(elt)?; + } + } else { + // ..otherwise, just clear the None items determined above from the working set + for (i, elt) in new_ws.iter().enumerate().skip(1) { + if elt.is_none() { + txn.set_working_set_item(i, None)?; + } + } + } + + // Now go hunting for tasks that should be in this list but are not, adding them at the + // end of the list, whether renumbering or not + for (uuid, task) in txn.all_tasks()? { + if !seen.contains(&uuid) && in_working_set(&task) { + txn.add_to_working_set(uuid)?; + } + } + + txn.commit()?; + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::Operation; + use crate::taskdb::TaskDb; + use chrono::Utc; + use uuid::Uuid; + + #[test] + fn rebuild_working_set_renumber() -> anyhow::Result<()> { + rebuild_working_set(true) + } + + #[test] + fn rebuild_working_set_no_renumber() -> anyhow::Result<()> { + rebuild_working_set(false) + } + + fn rebuild_working_set(renumber: bool) -> anyhow::Result<()> { + let mut db = TaskDb::new_inmemory(); + let mut uuids = vec![]; + uuids.push(Uuid::new_v4()); + println!("uuids[0]: {:?} - pending, not in working set", uuids[0]); + uuids.push(Uuid::new_v4()); + println!("uuids[1]: {:?} - pending, in working set", uuids[1]); + uuids.push(Uuid::new_v4()); + println!("uuids[2]: {:?} - not pending, not in working set", uuids[2]); + uuids.push(Uuid::new_v4()); + println!("uuids[3]: {:?} - not pending, in working set", uuids[3]); + uuids.push(Uuid::new_v4()); + println!("uuids[4]: {:?} - pending, in working set", uuids[4]); + + // add everything to the TaskDb + for uuid in &uuids { + db.apply(Operation::Create { uuid: *uuid })?; + } + for i in &[0usize, 1, 4] { + db.apply(Operation::Update { + uuid: uuids[*i].clone(), + property: String::from("status"), + value: Some("pending".into()), + timestamp: Utc::now(), + })?; + } + + // set the existing working_set as we want it + { + let mut txn = db.storage.txn()?; + txn.clear_working_set()?; + + for i in &[1usize, 3, 4] { + txn.add_to_working_set(uuids[*i])?; + } + + txn.commit()?; + } + + assert_eq!( + db.working_set()?, + vec![ + None, + Some(uuids[1].clone()), + Some(uuids[3].clone()), + Some(uuids[4].clone()) + ] + ); + + rebuild( + db.storage.txn()?.as_mut(), + |t| { + if let Some(status) = t.get("status") { + status == "pending" + } else { + false + } + }, + renumber, + )?; + + let exp = if renumber { + // uuids[1] and uuids[4] are already in the working set, so are compressed + // to the top, and then uuids[0] is added. + vec![ + None, + Some(uuids[1].clone()), + Some(uuids[4].clone()), + Some(uuids[0].clone()), + ] + } else { + // uuids[1] and uuids[4] are already in the working set, at indexes 1 and 3, + // and then uuids[0] is added. + vec![ + None, + Some(uuids[1].clone()), + None, + Some(uuids[4].clone()), + Some(uuids[0].clone()), + ] + }; + + assert_eq!(db.working_set()?, exp); + + Ok(()) + } +} From b8d892878cd4fa69fe3711a4c6da502f1db4ea6c Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 11 Oct 2021 09:01:37 -0400 Subject: [PATCH 2/7] document sync data formats --- docs/src/sync-protocol.md | 35 ++++++++++++++ taskchampion/src/storage/operation.rs | 66 +++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/docs/src/sync-protocol.md b/docs/src/sync-protocol.md index 437f44b1e..1f111312c 100644 --- a/docs/src/sync-protocol.md +++ b/docs/src/sync-protocol.md @@ -38,6 +38,41 @@ This observation allows the server to discard older versions. The third invariant prevents the server from discarding versions if there is no snapshot. The fourth invariant prevents the server from discarding versions newer than the snapshot. +## Data Formats + +### Encryption + +TBD (#299) + +### Version + +The decrypted form of a version is a JSON array containing operations in the order they should be applied. +Each operation has the form `{TYPE: DATA}`, for example: + + * `{"Create":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7"}}` + * `{"Delete":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7"}}` + * `{"Update":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7","property":"prop","value":"v","timestamp":"2021-10-11T12:47:07.188090948Z"}}` + * `{"Update":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7","property":"prop","value":null,"timestamp":"2021-10-11T12:47:07.188090948Z"}}` (to delete a property) + +Timestamps are in RFC3339 format with a `Z` suffix. + +### Snapshot + +The decrypted form of a snapshot is a JSON object mapping task IDs to task properties. +For example (pretty-printed for clarity): + +```json +{ + "56e0be07-c61f-494c-a54c-bdcfdd52d2a7": { + "description": "a task", + "priority": "H" + }, + "4b7ed904-f7b0-4293-8a10-ad452422c7b3": { + "description": "another task" + } +} +``` + ## Transactions ### AddVersion diff --git a/taskchampion/src/storage/operation.rs b/taskchampion/src/storage/operation.rs index 68f5fe7d0..ed9a5182d 100644 --- a/taskchampion/src/storage/operation.rs +++ b/taskchampion/src/storage/operation.rs @@ -274,6 +274,72 @@ mod test { ); } + #[test] + fn test_json_create() -> anyhow::Result<()> { + let uuid = Uuid::new_v4(); + let op = Create { uuid }; + assert_eq!( + serde_json::to_string(&op)?, + format!(r#"{{"Create":{{"uuid":"{}"}}}}"#, uuid), + ); + Ok(()) + } + + #[test] + fn test_json_delete() -> anyhow::Result<()> { + let uuid = Uuid::new_v4(); + let op = Delete { uuid }; + assert_eq!( + serde_json::to_string(&op)?, + format!(r#"{{"Delete":{{"uuid":"{}"}}}}"#, uuid), + ); + Ok(()) + } + + #[test] + fn test_json_update() -> anyhow::Result<()> { + let uuid = Uuid::new_v4(); + let timestamp = Utc::now(); + + let op = Update { + uuid, + property: "abc".into(), + value: Some("false".into()), + timestamp, + }; + + assert_eq!( + serde_json::to_string(&op)?, + format!( + r#"{{"Update":{{"uuid":"{}","property":"abc","value":"false","timestamp":"{:?}"}}}}"#, + uuid, timestamp, + ), + ); + Ok(()) + } + + #[test] + fn test_json_update_none() -> anyhow::Result<()> { + let uuid = Uuid::new_v4(); + let timestamp = Utc::now(); + + let op = Update { + uuid, + property: "abc".into(), + value: None, + timestamp, + }; + + assert_eq!( + serde_json::to_string(&op)?, + format!( + r#"{{"Update":{{"uuid":"{}","property":"abc","value":null,"timestamp":"{:?}"}}}}"#, + uuid, timestamp, + ), + ); + Ok(()) + } + fn uuid_strategy() -> impl Strategy { prop_oneof![ Just(Uuid::parse_str("83a2f9ef-f455-4195-b92e-a54c161eebfc").unwrap()), From bde19d7f07b6764128bfd6f5e0e1949a45f065fb Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 7 Oct 2021 19:35:06 -0400 Subject: [PATCH 3/7] Return SnapshotUrgency from AddVersion --- taskchampion/src/server/local.rs | 22 +++++++++++++-------- taskchampion/src/server/remote/mod.rs | 28 +++++++++++++++++++++++---- taskchampion/src/server/test.rs | 12 +++++++----- taskchampion/src/server/types.rs | 15 ++++++++++++-- taskchampion/src/taskdb/sync.rs | 3 ++- 5 files changed, 60 insertions(+), 20 deletions(-) diff --git a/taskchampion/src/server/local.rs b/taskchampion/src/server/local.rs index ccd2c8d3c..3c2632cda 100644 --- a/taskchampion/src/server/local.rs +++ b/taskchampion/src/server/local.rs @@ -1,5 +1,6 @@ use crate::server::{ - AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NIL_VERSION_ID, + AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, + NIL_VERSION_ID, }; use crate::storage::sqlite::StoredUuid; use anyhow::Context; @@ -116,14 +117,17 @@ impl Server for LocalServer { &mut self, parent_version_id: VersionId, history_segment: HistorySegment, - ) -> anyhow::Result { + ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { // no client lookup // no signature validation // check the parent_version_id for linearity let latest_version_id = self.get_latest_version_id()?; if latest_version_id != NIL_VERSION_ID && parent_version_id != latest_version_id { - return Ok(AddVersionResult::ExpectedParentVersion(latest_version_id)); + return Ok(( + AddVersionResult::ExpectedParentVersion(latest_version_id), + SnapshotUrgency::None, + )); } // invent a new ID for this version @@ -136,7 +140,7 @@ impl Server for LocalServer { })?; self.set_latest_version_id(version_id)?; - Ok(AddVersionResult::Ok(version_id)) + Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None)) } /// Get a vector of all versions after `since_version` @@ -176,7 +180,7 @@ mod test { let tmp_dir = TempDir::new()?; let mut server = LocalServer::new(&tmp_dir.path())?; let history = b"1234".to_vec(); - match server.add_version(NIL_VERSION_ID, history.clone())? { + match server.add_version(NIL_VERSION_ID, history.clone())?.0 { AddVersionResult::ExpectedParentVersion(_) => { panic!("should have accepted the version") } @@ -204,7 +208,7 @@ mod test { let parent_version_id = Uuid::new_v4() as VersionId; // This is OK because the server has no latest_version_id yet - match server.add_version(parent_version_id, history.clone())? { + match server.add_version(parent_version_id, history.clone())?.0 { AddVersionResult::ExpectedParentVersion(_) => { panic!("should have accepted the version") } @@ -232,14 +236,16 @@ mod test { let parent_version_id = Uuid::new_v4() as VersionId; // add a version - if let AddVersionResult::ExpectedParentVersion(_) = + if let (AddVersionResult::ExpectedParentVersion(_), SnapshotUrgency::None) = server.add_version(parent_version_id, history.clone())? { panic!("should have accepted the version") } // then add another, not based on that one - if let AddVersionResult::Ok(_) = server.add_version(parent_version_id, history.clone())? { + if let (AddVersionResult::Ok(_), SnapshotUrgency::None) = + server.add_version(parent_version_id, history.clone())? + { panic!("should not have accepted the version") } diff --git a/taskchampion/src/server/remote/mod.rs b/taskchampion/src/server/remote/mod.rs index f6fbb4b0a..9551a448a 100644 --- a/taskchampion/src/server/remote/mod.rs +++ b/taskchampion/src/server/remote/mod.rs @@ -1,4 +1,6 @@ -use crate::server::{AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId}; +use crate::server::{ + AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, +}; use std::convert::TryInto; use std::time::Duration; use uuid::Uuid; @@ -43,12 +45,24 @@ fn get_uuid_header(resp: &ureq::Response, name: &str) -> anyhow::Result { Ok(value) } +/// Read the X-Snapshot-Request header and return a SnapshotUrgency +fn get_snapshot_urgency(resp: &ureq::Response) -> SnapshotUrgency { + match resp.header("X-Snapshot-Request") { + None => SnapshotUrgency::None, + Some(hdr) => match hdr { + "urgency=low" => SnapshotUrgency::Low, + "urgency=high" => SnapshotUrgency::High, + _ => SnapshotUrgency::None, + }, + } +} + impl Server for RemoteServer { fn add_version( &mut self, parent_version_id: VersionId, history_segment: HistorySegment, - ) -> anyhow::Result { + ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { let url = format!( "{}/v1/client/add-version/{}", self.origin, parent_version_id @@ -70,11 +84,17 @@ impl Server for RemoteServer { { Ok(resp) => { let version_id = get_uuid_header(&resp, "X-Version-Id")?; - Ok(AddVersionResult::Ok(version_id)) + Ok(( + AddVersionResult::Ok(version_id), + get_snapshot_urgency(&resp), + )) } Err(ureq::Error::Status(status, resp)) if status == 409 => { let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?; - Ok(AddVersionResult::ExpectedParentVersion(parent_version_id)) + Ok(( + AddVersionResult::ExpectedParentVersion(parent_version_id), + SnapshotUrgency::None, + )) } Err(err) => Err(err.into()), } diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index 18a7e62bd..aca9e1b94 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -1,5 +1,6 @@ use crate::server::{ - AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NIL_VERSION_ID, + AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, + NIL_VERSION_ID, }; use std::collections::HashMap; use uuid::Uuid; @@ -33,15 +34,16 @@ impl Server for TestServer { &mut self, parent_version_id: VersionId, history_segment: HistorySegment, - ) -> anyhow::Result { + ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { // no client lookup // no signature validation // check the parent_version_id for linearity if self.latest_version_id != NIL_VERSION_ID { if parent_version_id != self.latest_version_id { - return Ok(AddVersionResult::ExpectedParentVersion( - self.latest_version_id, + return Ok(( + AddVersionResult::ExpectedParentVersion(self.latest_version_id), + SnapshotUrgency::None, )); } } @@ -59,7 +61,7 @@ impl Server for TestServer { ); self.latest_version_id = version_id; - Ok(AddVersionResult::Ok(version_id)) + Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None)) } /// Get a vector of all versions after `since_version` diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index 5995dfed5..35169c5cd 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -10,7 +10,7 @@ pub const NIL_VERSION_ID: VersionId = Uuid::nil(); /// data is pre-encoded, and from the protocol level appears as a sequence of bytes. pub type HistorySegment = Vec; -/// VersionAdd is the response type from [`crate::server::Server::add_version`]. +/// AddVersionResult is the response type from [`crate::server::Server::add_version`]. #[derive(Debug, PartialEq)] pub enum AddVersionResult { /// OK, version added with the given ID @@ -19,6 +19,17 @@ pub enum AddVersionResult { ExpectedParentVersion(VersionId), } +/// SnapshotUrgency indicates how much the server would like this replica to send a snapshot. +#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)] +pub enum SnapshotUrgency { + /// Don't need a snapshot right now. + None, + /// A snapshot would be good, but can wait for other replicas to provide it. + Low, + /// A snapshot is needed right now. + High, +} + /// A version as downloaded from the server #[derive(Debug, PartialEq)] pub enum GetVersionResult { @@ -40,7 +51,7 @@ pub trait Server { &mut self, parent_version_id: VersionId, history_segment: HistorySegment, - ) -> anyhow::Result; + ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)>; /// Get the version with the given parent VersionId fn get_child_version( diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index d8c145857..91dd40f24 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -57,7 +57,8 @@ pub(super) fn sync(server: &mut Box, txn: &mut dyn StorageTxn) -> an let new_version = Version { operations }; let history_segment = serde_json::to_string(&new_version).unwrap().into(); info!("sending new version to server"); - match server.add_version(base_version_id, history_segment)? { + let (res, _snapshot_urgency) = server.add_version(base_version_id, history_segment)?; + match res { AddVersionResult::Ok(new_version_id) => { info!("version {:?} received by server", new_version_id); txn.set_base_version(new_version_id)?; From 13a96efacbf2068b2ed019a393132950fe3762ab Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 7 Oct 2021 21:24:27 -0400 Subject: [PATCH 4/7] Add snapshot encoding / decoding --- Cargo.lock | 1 + taskchampion/Cargo.toml | 1 + taskchampion/src/taskdb/mod.rs | 1 + taskchampion/src/taskdb/snapshot.rs | 186 ++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+) create mode 100644 taskchampion/src/taskdb/snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index bb79505da..10f51edf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2962,6 +2962,7 @@ version = "0.4.1" dependencies = [ "anyhow", "chrono", + "flate2", "log", "pretty_assertions", "proptest", diff --git a/taskchampion/Cargo.toml b/taskchampion/Cargo.toml index 168d4ba1f..c5da9898c 100644 --- a/taskchampion/Cargo.toml +++ b/taskchampion/Cargo.toml @@ -23,6 +23,7 @@ tindercrypt = { version = "^0.2.2", default-features = false } rusqlite = { version = "0.25", features = ["bundled"] } strum = "0.21" strum_macros = "0.21" +flate2 = "1" [dev-dependencies] proptest = "^1.0.0" diff --git a/taskchampion/src/taskdb/mod.rs b/taskchampion/src/taskdb/mod.rs index 850628719..0394b8c85 100644 --- a/taskchampion/src/taskdb/mod.rs +++ b/taskchampion/src/taskdb/mod.rs @@ -3,6 +3,7 @@ use crate::storage::{Operation, Storage, TaskMap}; use uuid::Uuid; mod ops; +mod snapshot; mod sync; mod working_set; diff --git a/taskchampion/src/taskdb/snapshot.rs b/taskchampion/src/taskdb/snapshot.rs new file mode 100644 index 000000000..e054612b3 --- /dev/null +++ b/taskchampion/src/taskdb/snapshot.rs @@ -0,0 +1,186 @@ +use crate::storage::{StorageTxn, TaskMap, VersionId}; +use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression}; +use serde::de::{Deserialize, Deserializer, MapAccess, Visitor}; +use serde::ser::{Serialize, SerializeMap, Serializer}; +use std::fmt; +use uuid::Uuid; + +/// A newtype to wrap the result of [`crate::storage::StorageTxn::all_tasks`] +pub(super) struct SnapshotTasks(Vec<(Uuid, TaskMap)>); + +impl Serialize for SnapshotTasks { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for (k, v) in &self.0 { + map.serialize_entry(k, v)?; + } + map.end() + } +} + +struct TaskDbVisitor; + +impl<'de> Visitor<'de> for TaskDbVisitor { + type Value = SnapshotTasks; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a map representing a task snapshot") + } + + fn visit_map(self, mut access: M) -> Result + where + M: MapAccess<'de>, + { + let mut map = SnapshotTasks(Vec::with_capacity(access.size_hint().unwrap_or(0))); + + while let Some((key, value)) = access.next_entry()? { + map.0.push((key, value)); + } + + Ok(map) + } +} + +impl<'de> Deserialize<'de> for SnapshotTasks { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_map(TaskDbVisitor) + } +} + +impl SnapshotTasks { + pub(super) fn encode(&self) -> anyhow::Result> { + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + serde_json::to_writer(&mut encoder, &self)?; + Ok(encoder.finish()?) + } + + pub(super) fn decode(snapshot: &[u8]) -> anyhow::Result { + let decoder = ZlibDecoder::new(snapshot); + Ok(serde_json::from_reader(decoder)?) + } + + pub(super) fn into_inner(self) -> Vec<(Uuid, TaskMap)> { + self.0 + } +} + +#[allow(dead_code)] +/// Generate a snapshot (compressed, unencrypted) for the current state of the taskdb in the given +/// storage. +pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result> { + let all_tasks = SnapshotTasks(txn.all_tasks()?); + all_tasks.encode() +} + +#[allow(dead_code)] +/// Apply the given snapshot (compressed, unencrypted) to the taskdb's storage. +pub(super) fn apply_snapshot( + txn: &mut dyn StorageTxn, + version: VersionId, + snapshot: &[u8], +) -> anyhow::Result<()> { + let all_tasks = SnapshotTasks::decode(snapshot)?; + + // first, verify that the taskdb truly is empty + let mut empty = true; + empty = empty && txn.all_tasks()?.is_empty(); + empty = empty && txn.get_working_set()? == vec![None]; + empty = empty && txn.base_version()? == Uuid::nil(); + empty = empty && txn.operations()?.is_empty(); + + if !empty { + anyhow::bail!("Cannot apply snapshot to a non-empty task database"); + } + + for (uuid, task) in all_tasks.into_inner().drain(..) { + txn.set_task(uuid, task)?; + } + txn.set_base_version(version)?; + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::{InMemoryStorage, Storage, TaskMap}; + use pretty_assertions::assert_eq; + + #[test] + fn test_serialize_empty() -> anyhow::Result<()> { + let empty = SnapshotTasks(vec![]); + assert_eq!(serde_json::to_vec(&empty)?, b"{}".to_owned()); + Ok(()) + } + + #[test] + fn test_serialize_tasks() -> anyhow::Result<()> { + let u = Uuid::new_v4(); + let m: TaskMap = vec![("description".to_owned(), "my task".to_owned())] + .drain(..) + .collect(); + let all_tasks = SnapshotTasks(vec![(u, m)]); + assert_eq!( + serde_json::to_vec(&all_tasks)?, + format!("{{\"{}\":{{\"description\":\"my task\"}}}}", u).into_bytes(), + ); + Ok(()) + } + + #[test] + fn test_round_trip() -> anyhow::Result<()> { + let mut storage = InMemoryStorage::new(); + let version = Uuid::new_v4(); + + let task1 = ( + Uuid::new_v4(), + vec![("description".to_owned(), "one".to_owned())] + .drain(..) + .collect::(), + ); + let task2 = ( + Uuid::new_v4(), + vec![("description".to_owned(), "two".to_owned())] + .drain(..) + .collect::(), + ); + + { + let mut txn = storage.txn()?; + txn.set_task(task1.0, task1.1.clone())?; + txn.set_task(task2.0, task2.1.clone())?; + txn.commit()?; + } + + let snap = { + let mut txn = storage.txn()?; + make_snapshot(txn.as_mut())? + }; + + // apply that snapshot to a fresh bit of fake + let mut storage = InMemoryStorage::new(); + { + let mut txn = storage.txn()?; + apply_snapshot(txn.as_mut(), version, &snap)?; + txn.commit()? + } + + { + let mut txn = storage.txn()?; + assert_eq!(txn.get_task(task1.0)?, Some(task1.1)); + assert_eq!(txn.get_task(task2.0)?, Some(task2.1)); + assert_eq!(txn.all_tasks()?.len(), 2); + assert_eq!(txn.base_version()?, version); + assert_eq!(txn.operations()?.len(), 0); + assert_eq!(txn.get_working_set()?.len(), 1); + } + + Ok(()) + } +} From b97f6dc4d53a31096fa3f2c8168c232dd7bb8cb1 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Sun, 10 Oct 2021 17:27:29 -0400 Subject: [PATCH 5/7] Send snapshots to server --- taskchampion/src/server/local.rs | 12 ++- taskchampion/src/server/remote/crypto.rs | 115 +++++++++++------------ taskchampion/src/server/remote/mod.rs | 49 +++++++--- taskchampion/src/server/test.rs | 74 +++++++++++---- taskchampion/src/server/types.rs | 7 ++ taskchampion/src/taskdb/sync.rs | 53 +++++++++-- 6 files changed, 206 insertions(+), 104 deletions(-) diff --git a/taskchampion/src/server/local.rs b/taskchampion/src/server/local.rs index 3c2632cda..b8c0198a2 100644 --- a/taskchampion/src/server/local.rs +++ b/taskchampion/src/server/local.rs @@ -1,6 +1,6 @@ use crate::server::{ - AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, - NIL_VERSION_ID, + AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency, + VersionId, NIL_VERSION_ID, }; use crate::storage::sqlite::StoredUuid; use anyhow::Context; @@ -111,8 +111,6 @@ impl Server for LocalServer { // TODO: better transaction isolation for add_version (gets and sets should be in the same // transaction) - /// Add a new version. If the given version number is incorrect, this responds with the - /// appropriate version and expects the caller to try again. fn add_version( &mut self, parent_version_id: VersionId, @@ -143,7 +141,6 @@ impl Server for LocalServer { Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None)) } - /// Get a vector of all versions after `since_version` fn get_child_version( &mut self, parent_version_id: VersionId, @@ -158,6 +155,11 @@ impl Server for LocalServer { Ok(GetVersionResult::NoSuchVersion) } } + + fn add_snapshot(&mut self, _version_id: VersionId, _snapshot: Snapshot) -> anyhow::Result<()> { + // the local server never requests a snapshot, so it should never get one + unreachable!() + } } #[cfg(test)] diff --git a/taskchampion/src/server/remote/crypto.rs b/taskchampion/src/server/remote/crypto.rs index 512846479..40103f422 100644 --- a/taskchampion/src/server/remote/crypto.rs +++ b/taskchampion/src/server/remote/crypto.rs @@ -1,5 +1,4 @@ use crate::server::HistorySegment; -use std::convert::TryFrom; use std::io::Read; use tindercrypt::cryptors::RingCryptor; use uuid::Uuid; @@ -18,45 +17,31 @@ impl AsRef<[u8]> for Secret { } } -/// A cleartext payload containing a history segment. -pub(super) struct HistoryCleartext { - pub(super) parent_version_id: Uuid, - pub(super) history_segment: HistorySegment, +/// A cleartext payload with an attached version_id. The version_id is used to +/// validate the context of the payload. +pub(super) struct Cleartext { + pub(super) version_id: Uuid, + pub(super) payload: HistorySegment, } -impl HistoryCleartext { +impl Cleartext { /// Seal the payload into its ciphertext - pub(super) fn seal(self, secret: &Secret) -> anyhow::Result { - let cryptor = RingCryptor::new().with_aad(self.parent_version_id.as_bytes()); - let ciphertext = cryptor.seal_with_passphrase(secret.as_ref(), &self.history_segment)?; - Ok(HistoryCiphertext(ciphertext)) + pub(super) fn seal(self, secret: &Secret) -> anyhow::Result { + let cryptor = RingCryptor::new().with_aad(self.version_id.as_bytes()); + let ciphertext = cryptor.seal_with_passphrase(secret.as_ref(), &self.payload)?; + Ok(Ciphertext(ciphertext)) } } -/// An ecrypted payload containing a history segment -pub(super) struct HistoryCiphertext(pub(super) Vec); +/// An ecrypted payload +pub(super) struct Ciphertext(pub(super) Vec); -impl HistoryCiphertext { - pub(super) fn open( - self, - secret: &Secret, - parent_version_id: Uuid, - ) -> anyhow::Result { - let cryptor = RingCryptor::new().with_aad(parent_version_id.as_bytes()); - let plaintext = cryptor.open(secret.as_ref(), &self.0)?; - - Ok(HistoryCleartext { - parent_version_id, - history_segment: plaintext, - }) - } -} - -impl TryFrom for HistoryCiphertext { - type Error = anyhow::Error; - - fn try_from(resp: ureq::Response) -> Result { - if let Some("application/vnd.taskchampion.history-segment") = resp.header("Content-Type") { +impl Ciphertext { + pub(super) fn from_resp( + resp: ureq::Response, + content_type: &str, + ) -> Result { + if resp.header("Content-Type") == Some(content_type) { let mut reader = resp.into_reader(); let mut bytes = vec![]; reader.read_to_end(&mut bytes)?; @@ -67,9 +52,19 @@ impl TryFrom for HistoryCiphertext { )) } } + + pub(super) fn open(self, secret: &Secret, version_id: Uuid) -> anyhow::Result { + let cryptor = RingCryptor::new().with_aad(version_id.as_bytes()); + let plaintext = cryptor.open(secret.as_ref(), &self.0)?; + + Ok(Cleartext { + version_id, + payload: plaintext, + }) + } } -impl AsRef<[u8]> for HistoryCiphertext { +impl AsRef<[u8]> for Ciphertext { fn as_ref(&self) -> &[u8] { self.0.as_ref() } @@ -82,52 +77,50 @@ mod test { #[test] fn round_trip() { - let parent_version_id = Uuid::new_v4(); - let history_segment = b"HISTORY REPEATS ITSELF".to_vec(); + let version_id = Uuid::new_v4(); + let payload = b"HISTORY REPEATS ITSELF".to_vec(); let secret = Secret(b"SEKRIT".to_vec()); - let history_cleartext = HistoryCleartext { - parent_version_id, - history_segment: history_segment.clone(), + let cleartext = Cleartext { + version_id, + payload: payload.clone(), }; - let history_ciphertext = history_cleartext.seal(&secret).unwrap(); - let history_cleartext = history_ciphertext.open(&secret, parent_version_id).unwrap(); + let ciphertext = cleartext.seal(&secret).unwrap(); + let cleartext = ciphertext.open(&secret, version_id).unwrap(); - assert_eq!(history_cleartext.history_segment, history_segment); - assert_eq!(history_cleartext.parent_version_id, parent_version_id); + assert_eq!(cleartext.payload, payload); + assert_eq!(cleartext.version_id, version_id); } #[test] fn round_trip_bad_key() { - let parent_version_id = Uuid::new_v4(); - let history_segment = b"HISTORY REPEATS ITSELF".to_vec(); + let version_id = Uuid::new_v4(); + let payload = b"HISTORY REPEATS ITSELF".to_vec(); let secret = Secret(b"SEKRIT".to_vec()); - let history_cleartext = HistoryCleartext { - parent_version_id, - history_segment: history_segment.clone(), + let cleartext = Cleartext { + version_id, + payload: payload.clone(), }; - let history_ciphertext = history_cleartext.seal(&secret).unwrap(); + let ciphertext = cleartext.seal(&secret).unwrap(); let secret = Secret(b"BADSEKRIT".to_vec()); - assert!(history_ciphertext.open(&secret, parent_version_id).is_err()); + assert!(ciphertext.open(&secret, version_id).is_err()); } #[test] - fn round_trip_bad_pvid() { - let parent_version_id = Uuid::new_v4(); - let history_segment = b"HISTORY REPEATS ITSELF".to_vec(); + fn round_trip_bad_version() { + let version_id = Uuid::new_v4(); + let payload = b"HISTORY REPEATS ITSELF".to_vec(); let secret = Secret(b"SEKRIT".to_vec()); - let history_cleartext = HistoryCleartext { - parent_version_id, - history_segment: history_segment.clone(), + let cleartext = Cleartext { + version_id, + payload: payload.clone(), }; - let history_ciphertext = history_cleartext.seal(&secret).unwrap(); + let ciphertext = cleartext.seal(&secret).unwrap(); - let bad_parent_version_id = Uuid::new_v4(); - assert!(history_ciphertext - .open(&secret, bad_parent_version_id) - .is_err()); + let bad_version_id = Uuid::new_v4(); + assert!(ciphertext.open(&secret, bad_version_id).is_err()); } } diff --git a/taskchampion/src/server/remote/mod.rs b/taskchampion/src/server/remote/mod.rs index 9551a448a..139f5dc9f 100644 --- a/taskchampion/src/server/remote/mod.rs +++ b/taskchampion/src/server/remote/mod.rs @@ -1,12 +1,12 @@ use crate::server::{ - AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, + AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency, + VersionId, }; -use std::convert::TryInto; use std::time::Duration; use uuid::Uuid; mod crypto; -use crypto::{HistoryCiphertext, HistoryCleartext, Secret}; +use crypto::{Ciphertext, Cleartext, Secret}; pub struct RemoteServer { origin: String, @@ -15,6 +15,12 @@ pub struct RemoteServer { agent: ureq::Agent, } +/// The content-type for history segments (opaque blobs of bytes) +const HISTORY_SEGMENT_CONTENT_TYPE: &str = "application/vnd.taskchampion.history-segment"; + +/// The content-type for snapshots (opaque blobs of bytes) +const SNAPSHOT_CONTENT_TYPE: &str = "application/vnd.taskchampion.snapshot"; + /// A RemoeServer communicates with a remote server over HTTP (such as with /// taskchampion-sync-server). impl RemoteServer { @@ -67,20 +73,17 @@ impl Server for RemoteServer { "{}/v1/client/add-version/{}", self.origin, parent_version_id ); - let history_cleartext = HistoryCleartext { - parent_version_id, - history_segment, + let cleartext = Cleartext { + version_id: parent_version_id, + payload: history_segment, }; - let history_ciphertext = history_cleartext.seal(&self.encryption_secret)?; + let ciphertext = cleartext.seal(&self.encryption_secret)?; match self .agent .post(&url) - .set( - "Content-Type", - "application/vnd.taskchampion.history-segment", - ) + .set("Content-Type", HISTORY_SEGMENT_CONTENT_TYPE) .set("X-Client-Key", &self.client_key.to_string()) - .send_bytes(history_ciphertext.as_ref()) + .send_bytes(ciphertext.as_ref()) { Ok(resp) => { let version_id = get_uuid_header(&resp, "X-Version-Id")?; @@ -117,10 +120,10 @@ impl Server for RemoteServer { Ok(resp) => { let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?; let version_id = get_uuid_header(&resp, "X-Version-Id")?; - let history_ciphertext: HistoryCiphertext = resp.try_into()?; - let history_segment = history_ciphertext + let ciphertext = Ciphertext::from_resp(resp, HISTORY_SEGMENT_CONTENT_TYPE)?; + let history_segment = ciphertext .open(&self.encryption_secret, parent_version_id)? - .history_segment; + .payload; Ok(GetVersionResult::Version { version_id, parent_version_id, @@ -133,4 +136,20 @@ impl Server for RemoteServer { Err(err) => Err(err.into()), } } + + fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> { + let url = format!("{}/v1/client/add-snapshot/{}", self.origin, version_id); + let cleartext = Cleartext { + version_id, + payload: snapshot, + }; + let ciphertext = cleartext.seal(&self.encryption_secret)?; + Ok(self + .agent + .post(&url) + .set("Content-Type", SNAPSHOT_CONTENT_TYPE) + .set("X-Client-Key", &self.client_key.to_string()) + .send_bytes(ciphertext.as_ref()) + .map(|_| ())?) + } } diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index aca9e1b94..3901e2fb6 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -1,8 +1,9 @@ use crate::server::{ - AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, - NIL_VERSION_ID, + AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency, + VersionId, NIL_VERSION_ID, }; use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use uuid::Uuid; struct Version { @@ -11,19 +12,44 @@ struct Version { history_segment: HistorySegment, } -pub(crate) struct TestServer { +#[derive(Clone)] + +/// TestServer implements the Server trait with a test implementation. +pub(crate) struct TestServer(Arc>); + +pub(crate) struct Inner { latest_version_id: VersionId, // NOTE: indexed by parent_version_id! versions: HashMap, + snapshot_urgency: SnapshotUrgency, + snapshot: Option<(VersionId, Snapshot)>, } impl TestServer { /// A test server has no notion of clients, signatures, encryption, etc. - pub fn new() -> TestServer { - TestServer { + pub(crate) fn new() -> TestServer { + TestServer(Arc::new(Mutex::new(Inner { latest_version_id: NIL_VERSION_ID, versions: HashMap::new(), - } + snapshot_urgency: SnapshotUrgency::None, + snapshot: None, + }))) + } + + /// Get a boxed Server implementation referring to this TestServer + pub(crate) fn server(&self) -> Box { + Box::new(self.clone()) + } + + pub(crate) fn set_snapshot_urgency(&self, urgency: SnapshotUrgency) { + let mut inner = self.0.lock().unwrap(); + inner.snapshot_urgency = urgency; + } + + /// Get the latest snapshot added to this server + pub(crate) fn snapshot(&self) -> Option<(VersionId, Snapshot)> { + let inner = self.0.lock().unwrap(); + inner.snapshot.as_ref().cloned() } } @@ -35,23 +61,24 @@ impl Server for TestServer { parent_version_id: VersionId, history_segment: HistorySegment, ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { + let mut inner = self.0.lock().unwrap(); + // no client lookup // no signature validation // check the parent_version_id for linearity - if self.latest_version_id != NIL_VERSION_ID { - if parent_version_id != self.latest_version_id { - return Ok(( - AddVersionResult::ExpectedParentVersion(self.latest_version_id), - SnapshotUrgency::None, - )); - } + if inner.latest_version_id != NIL_VERSION_ID && parent_version_id != inner.latest_version_id + { + return Ok(( + AddVersionResult::ExpectedParentVersion(inner.latest_version_id), + SnapshotUrgency::None, + )); } // invent a new ID for this version let version_id = Uuid::new_v4(); - self.versions.insert( + inner.versions.insert( parent_version_id, Version { version_id, @@ -59,9 +86,12 @@ impl Server for TestServer { history_segment, }, ); - self.latest_version_id = version_id; + inner.latest_version_id = version_id; - Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None)) + // reply with the configured urgency and reset it to None + let urgency = inner.snapshot_urgency; + inner.snapshot_urgency = SnapshotUrgency::None; + Ok((AddVersionResult::Ok(version_id), urgency)) } /// Get a vector of all versions after `since_version` @@ -69,7 +99,9 @@ impl Server for TestServer { &mut self, parent_version_id: VersionId, ) -> anyhow::Result { - if let Some(version) = self.versions.get(&parent_version_id) { + let inner = self.0.lock().unwrap(); + + if let Some(version) = inner.versions.get(&parent_version_id) { Ok(GetVersionResult::Version { version_id: version.version_id, parent_version_id: version.parent_version_id, @@ -79,4 +111,12 @@ impl Server for TestServer { Ok(GetVersionResult::NoSuchVersion) } } + + fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> { + let mut inner = self.0.lock().unwrap(); + + // test implementation -- does not perform any validation + inner.snapshot = Some((version_id, snapshot)); + Ok(()) + } } diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index 35169c5cd..3a1178c41 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -10,6 +10,10 @@ pub const NIL_VERSION_ID: VersionId = Uuid::nil(); /// data is pre-encoded, and from the protocol level appears as a sequence of bytes. pub type HistorySegment = Vec; +/// A snapshot of the state of the task database. This is encoded by the taskdb implementation +/// and treated as a sequence of bytes by the server implementation. +pub type Snapshot = Vec; + /// AddVersionResult is the response type from [`crate::server::Server::add_version`]. #[derive(Debug, PartialEq)] pub enum AddVersionResult { @@ -58,4 +62,7 @@ pub trait Server { &mut self, parent_version_id: VersionId, ) -> anyhow::Result; + + /// Add a snapshot on the server + fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()>; } diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index 91dd40f24..c0762f79b 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -1,5 +1,5 @@ -use super::ops; -use crate::server::{AddVersionResult, GetVersionResult, Server}; +use super::{ops, snapshot}; +use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency}; use crate::storage::{Operation, StorageTxn}; use log::{info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -57,12 +57,19 @@ pub(super) fn sync(server: &mut Box, txn: &mut dyn StorageTxn) -> an let new_version = Version { operations }; let history_segment = serde_json::to_string(&new_version).unwrap().into(); info!("sending new version to server"); - let (res, _snapshot_urgency) = server.add_version(base_version_id, history_segment)?; + let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?; match res { AddVersionResult::Ok(new_version_id) => { info!("version {:?} received by server", new_version_id); txn.set_base_version(new_version_id)?; txn.set_operations(vec![])?; + + // TODO: configurable urgency levels + if snapshot_urgency != SnapshotUrgency::None { + let snapshot = snapshot::make_snapshot(txn)?; + server.add_snapshot(new_version_id, snapshot)?; + } + break; } AddVersionResult::ExpectedParentVersion(parent_version_id) => { @@ -150,8 +157,9 @@ mod test { use super::*; use crate::server::test::TestServer; use crate::storage::{InMemoryStorage, Operation}; - use crate::taskdb::TaskDb; + use crate::taskdb::{snapshot::SnapshotTasks, TaskDb}; use chrono::Utc; + use pretty_assertions::assert_eq; use uuid::Uuid; fn newdb() -> TaskDb { @@ -160,7 +168,7 @@ mod test { #[test] fn test_sync() -> anyhow::Result<()> { - let mut server: Box = Box::new(TestServer::new()); + let mut server: Box = TestServer::new().server(); let mut db1 = newdb(); sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); @@ -222,7 +230,7 @@ mod test { #[test] fn test_sync_create_delete() -> anyhow::Result<()> { - let mut server: Box = Box::new(TestServer::new()); + let mut server: Box = TestServer::new().server(); let mut db1 = newdb(); sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); @@ -274,4 +282,37 @@ mod test { Ok(()) } + + #[test] + fn test_sync_adds_snapshot() -> anyhow::Result<()> { + let test_server = TestServer::new(); + + let mut server: Box = test_server.server(); + let mut db1 = newdb(); + + let uuid = Uuid::new_v4(); + db1.apply(Operation::Create { uuid }).unwrap(); + db1.apply(Operation::Update { + uuid, + property: "title".into(), + value: Some("my first task".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + test_server.set_snapshot_urgency(SnapshotUrgency::High); + sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + + // assert that a snapshot was added + let base_version = db1.storage.txn()?.base_version()?; + let (v, s) = test_server + .snapshot() + .ok_or_else(|| anyhow::anyhow!("no snapshot"))?; + assert_eq!(v, base_version); + + let tasks = SnapshotTasks::decode(&s)?.into_inner(); + assert_eq!(tasks[0].0, uuid); + + Ok(()) + } } From ed3475d9eab0b2ee910d0058d34365a6502128c5 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 11 Oct 2021 17:14:26 -0400 Subject: [PATCH 6/7] support avoiding snapshots --- cli/src/invocation/cmd/sync.rs | 3 +- replica-server-tests/tests/cross-sync.rs | 10 ++-- taskchampion/src/replica.rs | 17 +++++- taskchampion/src/taskdb/mod.rs | 17 ++++-- taskchampion/src/taskdb/sync.rs | 69 +++++++++++++++++------- 5 files changed, 85 insertions(+), 31 deletions(-) diff --git a/cli/src/invocation/cmd/sync.rs b/cli/src/invocation/cmd/sync.rs index b43b01362..f7c1151d2 100644 --- a/cli/src/invocation/cmd/sync.rs +++ b/cli/src/invocation/cmd/sync.rs @@ -6,7 +6,8 @@ pub(crate) fn execute( replica: &mut Replica, server: &mut Box, ) -> Result<(), crate::Error> { - replica.sync(server)?; + // TODO: configurable avoid_snapshots + replica.sync(server, false)?; writeln!(w, "sync complete.")?; Ok(()) } diff --git a/replica-server-tests/tests/cross-sync.rs b/replica-server-tests/tests/cross-sync.rs index 8ade694da..881a7481b 100644 --- a/replica-server-tests/tests/cross-sync.rs +++ b/replica-server-tests/tests/cross-sync.rs @@ -41,8 +41,8 @@ async fn cross_sync() -> anyhow::Result<()> { t1.start()?; let t1 = t1.into_immut(); - rep1.sync(&mut serv1)?; - rep2.sync(&mut serv2)?; + rep1.sync(&mut serv1, false)?; + rep2.sync(&mut serv2, false)?; // those tasks should exist on rep2 now let t12 = rep2 @@ -66,9 +66,9 @@ async fn cross_sync() -> anyhow::Result<()> { t12.set_status(Status::Completed)?; // sync those changes back and forth - rep1.sync(&mut serv1)?; // rep1 -> server - rep2.sync(&mut serv2)?; // server -> rep2, rep2 -> server - rep1.sync(&mut serv1)?; // server -> rep1 + rep1.sync(&mut serv1, false)?; // rep1 -> server + rep2.sync(&mut serv2, false)?; // server -> rep2, rep2 -> server + rep1.sync(&mut serv1, false)?; // server -> rep1 let t1 = rep1 .get_task(t1.get_uuid())? diff --git a/taskchampion/src/replica.rs b/taskchampion/src/replica.rs index 89ec5ab80..cc6338ceb 100644 --- a/taskchampion/src/replica.rs +++ b/taskchampion/src/replica.rs @@ -126,8 +126,21 @@ impl Replica { /// Synchronize this replica against the given server. The working set is rebuilt after /// this occurs, but without renumbering, so any newly-pending tasks should appear in /// the working set. - pub fn sync(&mut self, server: &mut Box) -> anyhow::Result<()> { - self.taskdb.sync(server).context("Failed to synchronize")?; + /// + /// If `avoid_snapshots` is true, the sync operations produces a snapshot only when the server + /// indicate it is urgent (snapshot urgency "high"). This allows time for other replicas to + /// create a snapshot before this one does. + /// + /// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop + /// system + pub fn sync( + &mut self, + server: &mut Box, + avoid_snapshots: bool, + ) -> anyhow::Result<()> { + self.taskdb + .sync(server, avoid_snapshots) + .context("Failed to synchronize")?; self.rebuild_working_set(false) .context("Failed to rebuild working set after sync")?; Ok(()) diff --git a/taskchampion/src/taskdb/mod.rs b/taskchampion/src/taskdb/mod.rs index 0394b8c85..65546c462 100644 --- a/taskchampion/src/taskdb/mod.rs +++ b/taskchampion/src/taskdb/mod.rs @@ -96,9 +96,20 @@ impl TaskDb { } /// Sync to the given server, pulling remote changes and pushing local changes. - pub fn sync(&mut self, server: &mut Box) -> anyhow::Result<()> { + /// + /// If `avoid_snapshots` is true, the sync operations produces a snapshot only when the server + /// indicate it is urgent (snapshot urgency "high"). This allows time for other replicas to + /// create a snapshot before this one does. + /// + /// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop + /// system + pub fn sync( + &mut self, + server: &mut Box, + avoid_snapshots: bool, + ) -> anyhow::Result<()> { let mut txn = self.storage.txn()?; - sync::sync(server, txn.as_mut()) + sync::sync(server, txn.as_mut(), avoid_snapshots) } // functions for supporting tests @@ -212,7 +223,7 @@ mod tests { println!(" {:?} (ignored)", e); } }, - Action::Sync => db.sync(&mut server).unwrap(), + Action::Sync => db.sync(&mut server, false).unwrap(), } } diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index c0762f79b..2cd8c2717 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -11,7 +11,11 @@ struct Version { } /// Sync to the given server, pulling remote changes and pushing local changes. -pub(super) fn sync(server: &mut Box, txn: &mut dyn StorageTxn) -> anyhow::Result<()> { +pub(super) fn sync( + server: &mut Box, + txn: &mut dyn StorageTxn, + avoid_snapshots: bool, +) -> anyhow::Result<()> { // retry synchronizing until the server accepts our version (this allows for races between // replicas trying to sync to the same server). If the server insists on the same base // version twice, then we have diverged. @@ -64,8 +68,13 @@ pub(super) fn sync(server: &mut Box, txn: &mut dyn StorageTxn) -> an txn.set_base_version(new_version_id)?; txn.set_operations(vec![])?; - // TODO: configurable urgency levels - if snapshot_urgency != SnapshotUrgency::None { + // make a snapshot if the server indicates it is urgent enough + let base_urgency = if avoid_snapshots { + SnapshotUrgency::High + } else { + SnapshotUrgency::Low + }; + if snapshot_urgency >= base_urgency { let snapshot = snapshot::make_snapshot(txn)?; server.add_snapshot(new_version_id, snapshot)?; } @@ -171,10 +180,10 @@ mod test { let mut server: Box = TestServer::new().server(); let mut db1 = newdb(); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); let mut db2 = newdb(); - sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap(); // make some changes in parallel to db1 and db2.. let uuid1 = Uuid::new_v4(); @@ -198,9 +207,9 @@ mod test { .unwrap(); // and synchronize those around - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // now make updates to the same task on both sides @@ -220,9 +229,9 @@ mod test { .unwrap(); // and synchronize those around - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); Ok(()) @@ -233,10 +242,10 @@ mod test { let mut server: Box = TestServer::new().server(); let mut db1 = newdb(); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); let mut db2 = newdb(); - sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap(); // create and update a task.. let uuid = Uuid::new_v4(); @@ -250,9 +259,9 @@ mod test { .unwrap(); // and synchronize those around - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // delete and re-create the task on db1 @@ -275,9 +284,9 @@ mod test { }) .unwrap(); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); Ok(()) @@ -301,7 +310,7 @@ mod test { .unwrap(); test_server.set_snapshot_urgency(SnapshotUrgency::High); - sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); // assert that a snapshot was added let base_version = db1.storage.txn()?.base_version()?; @@ -315,4 +324,24 @@ mod test { Ok(()) } + + #[test] + fn test_sync_avoids_snapshot() -> anyhow::Result<()> { + let test_server = TestServer::new(); + + let mut server: Box = test_server.server(); + let mut db1 = newdb(); + + let uuid = Uuid::new_v4(); + db1.apply(Operation::Create { uuid }).unwrap(); + + test_server.set_snapshot_urgency(SnapshotUrgency::Low); + sync(&mut server, db1.storage.txn()?.as_mut(), true).unwrap(); + + // assert that a snapshot was not added, because we indicated + // we wanted to avoid snapshots and it was only low urgency + assert_eq!(test_server.snapshot(), None); + + Ok(()) + } } From 333cb370914f09819565bf4de212c8362093c69d Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 11 Oct 2021 17:25:28 -0400 Subject: [PATCH 7/7] Support `add_snapshots` on cli --- .changelogs/2021-10-11-issue23-client.md | 2 ++ cli/src/invocation/cmd/sync.rs | 8 +++++--- cli/src/invocation/mod.rs | 2 +- cli/src/settings/settings.rs | 21 +++++++++++++++++++++ docs/src/config-file.md | 6 ++++++ 5 files changed, 35 insertions(+), 4 deletions(-) create mode 100644 .changelogs/2021-10-11-issue23-client.md diff --git a/.changelogs/2021-10-11-issue23-client.md b/.changelogs/2021-10-11-issue23-client.md new file mode 100644 index 000000000..91a6b0f9e --- /dev/null +++ b/.changelogs/2021-10-11-issue23-client.md @@ -0,0 +1,2 @@ +- The `avoid_snapshots` configuration value, if set, will cause the replica to + avoid creating snapshots unless required. diff --git a/cli/src/invocation/cmd/sync.rs b/cli/src/invocation/cmd/sync.rs index f7c1151d2..a042a5bf4 100644 --- a/cli/src/invocation/cmd/sync.rs +++ b/cli/src/invocation/cmd/sync.rs @@ -1,13 +1,14 @@ +use crate::settings::Settings; use taskchampion::{server::Server, Replica}; use termcolor::WriteColor; pub(crate) fn execute( w: &mut W, replica: &mut Replica, + settings: &Settings, server: &mut Box, ) -> Result<(), crate::Error> { - // TODO: configurable avoid_snapshots - replica.sync(server, false)?; + replica.sync(server, settings.avoid_snapshots)?; writeln!(w, "sync complete.")?; Ok(()) } @@ -25,9 +26,10 @@ mod test { let mut replica = test_replica(); let server_dir = TempDir::new().unwrap(); let mut server = test_server(&server_dir); + let settings = Settings::default(); // Note that the details of the actual sync are tested thoroughly in the taskchampion crate - execute(&mut w, &mut replica, &mut server).unwrap(); + execute(&mut w, &mut replica, &settings, &mut server).unwrap(); assert_eq!(&w.into_string(), "sync complete.\n") } } diff --git a/cli/src/invocation/mod.rs b/cli/src/invocation/mod.rs index 640925a69..2f225f86a 100644 --- a/cli/src/invocation/mod.rs +++ b/cli/src/invocation/mod.rs @@ -87,7 +87,7 @@ pub(crate) fn invoke(command: Command, settings: Settings) -> Result<(), crate:: .. } => { let mut server = get_server(&settings)?; - return cmd::sync::execute(&mut w, &mut replica, &mut server); + return cmd::sync::execute(&mut w, &mut replica, &settings, &mut server); } // handled in the first match, but here to ensure this match is exhaustive diff --git a/cli/src/settings/settings.rs b/cli/src/settings/settings.rs index 7e94a637b..3dba86409 100644 --- a/cli/src/settings/settings.rs +++ b/cli/src/settings/settings.rs @@ -22,6 +22,7 @@ pub(crate) struct Settings { /// replica pub(crate) data_dir: PathBuf, + pub(crate) avoid_snapshots: bool, /// remote sync server pub(crate) server_client_key: Option, @@ -91,6 +92,7 @@ impl Settings { let table_keys = [ "data_dir", "modification_count_prompt", + "avoid_snapshots", "server_client_key", "server_origin", "encryption_secret", @@ -124,6 +126,20 @@ impl Settings { Ok(()) } + fn get_bool_cfg( + table: &Table, + name: &'static str, + setter: F, + ) -> Result<()> { + if let Some(v) = table.get(name) { + setter( + v.as_bool() + .ok_or_else(|| anyhow!(".{}: not a boolean value", name))?, + ); + } + Ok(()) + } + get_str_cfg(table, "data_dir", |v| { self.data_dir = v.into(); })?; @@ -132,6 +148,10 @@ impl Settings { self.modification_count_prompt = Some(v); })?; + get_bool_cfg(table, "avoid_snapshots", |v| { + self.avoid_snapshots = v; + })?; + get_str_cfg(table, "server_client_key", |v| { self.server_client_key = Some(v); })?; @@ -313,6 +333,7 @@ impl Default for Settings { filename: None, data_dir, modification_count_prompt: None, + avoid_snapshots: false, server_client_key: None, server_origin: None, encryption_secret: None, diff --git a/docs/src/config-file.md b/docs/src/config-file.md index 03639546f..73dc28b60 100644 --- a/docs/src/config-file.md +++ b/docs/src/config-file.md @@ -46,6 +46,12 @@ If using a remote server: * `server_client_key` - Client key to identify this replica to the sync server (a UUID) If not set, then sync is done to a local server. +## Snapshots + +* `avoid_snapshots` - If running on a CPU-, memory-, or bandwidth-constrained + device, set this to true. The effect is that this replica will wait longer + to produce a snapshot, in the hopes that other replicas will do so first. + ## Reports * `reports` - a mapping of each report's name to its definition.