support undo operations

This commit is contained in:
Dustin J. Mitchell 2021-12-21 00:43:26 +00:00
parent 1647ba9144
commit 9d93928996
6 changed files with 280 additions and 56 deletions

View file

@ -145,6 +145,12 @@ impl Replica {
Ok(())
}
/// Undo local operations until the most recent UndoPoint, returning false if there are no
/// local operations to undo.
pub fn undo(&mut self) -> anyhow::Result<bool> {
self.taskdb.undo()
}
/// Rebuild this replica's working set, based on whether tasks are pending or not. If
/// `renumber` is true, then existing tasks may be moved to new working-set indices; in any
/// case, on completion all pending tasks are in the working set and all non- pending tasks are

View file

@ -58,11 +58,47 @@ impl ReplicaOp {
Self::UndoPoint => None,
}
}
/// Generate a sequence of SyncOp's to reverse the effects of this ReplicaOp.
pub fn reverse_ops(self) -> Vec<SyncOp> {
match self {
Self::Create { uuid } => vec![SyncOp::Delete { uuid }],
Self::Delete { uuid, mut old_task } => {
let mut ops = vec![SyncOp::Create { uuid }];
// We don't have the original update timestamp, but it doesn't
// matter because this SyncOp will just be applied and discarded.
let timestamp = Utc::now();
for (property, value) in old_task.drain() {
ops.push(SyncOp::Update {
uuid,
property,
value: Some(value),
timestamp,
});
}
ops
}
Self::Update {
uuid,
property,
old_value,
timestamp,
..
} => vec![SyncOp::Update {
uuid,
property,
value: old_value,
timestamp,
}],
Self::UndoPoint => vec![],
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::taskmap_with;
use chrono::Utc;
use pretty_assertions::assert_eq;
@ -194,4 +230,54 @@ mod test {
fn test_into_sync_undo_point() {
assert_eq!(UndoPoint.into_sync(), None);
}
#[test]
fn test_reverse_create() {
let uuid = Uuid::new_v4();
assert_eq!(Create { uuid }.reverse_ops(), vec![SyncOp::Delete { uuid }]);
}
#[test]
fn test_reverse_delete() {
let uuid = Uuid::new_v4();
let reversed = Delete {
uuid,
old_task: taskmap_with(vec![("prop1".into(), "v1".into())]),
}
.reverse_ops();
assert_eq!(reversed.len(), 2);
assert_eq!(reversed[0], SyncOp::Create { uuid });
assert!(matches!(
&reversed[1],
SyncOp::Update { uuid: u, property: p, value: Some(v), ..}
if u == &uuid && p == "prop1" && v == "v1"
));
}
#[test]
fn test_reverse_update() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
assert_eq!(
Update {
uuid,
property: "prop".into(),
old_value: Some("foo".into()),
value: Some("v".into()),
timestamp,
}
.reverse_ops(),
vec![SyncOp::Update {
uuid,
property: "prop".into(),
value: Some("foo".into()),
timestamp,
}]
);
}
#[test]
fn test_reverse_undo_point() {
assert_eq!(UndoPoint.reverse_ops(), vec![]);
}
}

View file

@ -5,7 +5,7 @@ use crate::storage::{ReplicaOp, StorageTxn, TaskMap};
/// Apply the given SyncOp to the replica, updating both the task data and adding a
/// ReplicaOp to the list of operations. Returns the TaskMap of the task after the
/// operation has been applied (or an empty TaskMap for Delete).
pub(super) fn apply(txn: &mut dyn StorageTxn, op: SyncOp) -> anyhow::Result<TaskMap> {
pub(super) fn apply_and_record(txn: &mut dyn StorageTxn, op: SyncOp) -> anyhow::Result<TaskMap> {
match op {
SyncOp::Create { uuid } => {
let created = txn.create_task(uuid)?;
@ -63,6 +63,45 @@ pub(super) fn apply(txn: &mut dyn StorageTxn, op: SyncOp) -> anyhow::Result<Task
}
}
/// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations)
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<()> {
// TODO: test
// TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps
// unnecessariliy
match op {
SyncOp::Create { uuid } => {
// insert if the task does not already exist
if !txn.create_task(*uuid)? {
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
}
}
SyncOp::Delete { ref uuid } => {
if !txn.delete_task(*uuid)? {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
SyncOp::Update {
ref uuid,
ref property,
ref value,
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(mut task) = txn.get_task(*uuid)? {
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
txn.set_task(*uuid, task)?;
} else {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@ -80,7 +119,7 @@ mod tests {
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op)?;
let taskmap = apply_and_record(txn.as_mut(), op)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
@ -97,10 +136,13 @@ mod tests {
let op = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op.clone())?;
let taskmap = apply_and_record(txn.as_mut(), op.clone())?;
assert_eq!(taskmap.len(), 0);
assert_eq!(
apply(txn.as_mut(), op).err().unwrap().to_string(),
apply_and_record(txn.as_mut(), op)
.err()
.unwrap()
.to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
txn.commit()?;
@ -121,7 +163,7 @@ mod tests {
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op1)?;
let taskmap = apply_and_record(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
@ -134,7 +176,7 @@ mod tests {
};
{
let mut txn = db.storage.txn()?;
let mut taskmap = apply(txn.as_mut(), op2)?;
let mut taskmap = apply_and_record(txn.as_mut(), op2)?;
assert_eq!(
taskmap.drain().collect::<Vec<(_, _)>>(),
vec![("title".into(), "my task".into())]
@ -171,7 +213,7 @@ mod tests {
let op1 = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op1)?;
let taskmap = apply_and_record(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
@ -184,7 +226,7 @@ mod tests {
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op2)?;
let taskmap = apply_and_record(txn.as_mut(), op2)?;
assert_eq!(taskmap.get("title"), Some(&"my task".to_owned()));
txn.commit()?;
}
@ -197,7 +239,7 @@ mod tests {
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op3)?;
let taskmap = apply_and_record(txn.as_mut(), op3)?;
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
}
@ -210,7 +252,7 @@ mod tests {
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op4)?;
let taskmap = apply_and_record(txn.as_mut(), op4)?;
assert_eq!(taskmap.get("title"), None);
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
@ -268,7 +310,10 @@ mod tests {
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply(txn.as_mut(), op).err().unwrap().to_string(),
apply_and_record(txn.as_mut(), op)
.err()
.unwrap()
.to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
@ -286,7 +331,7 @@ mod tests {
let op1 = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op1)?;
let taskmap = apply_and_record(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
}
@ -298,7 +343,7 @@ mod tests {
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op2)?;
let taskmap = apply_and_record(txn.as_mut(), op2)?;
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
}
@ -306,7 +351,7 @@ mod tests {
let op3 = SyncOp::Delete { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply(txn.as_mut(), op3)?;
let taskmap = apply_and_record(txn.as_mut(), op3)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
@ -340,7 +385,10 @@ mod tests {
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply(txn.as_mut(), op).err().unwrap().to_string(),
apply_and_record(txn.as_mut(), op)
.err()
.unwrap()
.to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;

View file

@ -5,6 +5,7 @@ use uuid::Uuid;
mod apply;
mod snapshot;
mod sync;
mod undo;
mod working_set;
/// A TaskDb is the backend for a replica. It manages the storage, operations, synchronization,
@ -37,7 +38,7 @@ impl TaskDb {
/// (but leave the TaskDb in a consistent state).
pub fn apply(&mut self, op: SyncOp) -> anyhow::Result<TaskMap> {
let mut txn = self.storage.txn()?;
apply::apply(txn.as_mut(), op)
apply::apply_and_record(txn.as_mut(), op)
}
/// Add an UndoPoint operation to the list of replica operations.
@ -120,6 +121,13 @@ impl TaskDb {
sync::sync(server, txn.as_mut(), avoid_snapshots)
}
/// Undo local operations until the most recent UndoPoint, returning false if there are no
/// local operations to undo.
pub fn undo(&mut self) -> anyhow::Result<bool> {
let mut txn = self.storage.txn()?;
undo::undo(txn.as_mut())
}
// functions for supporting tests
#[cfg(test)]

View file

@ -1,4 +1,4 @@
use super::snapshot;
use super::{apply, snapshot};
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency, SyncOp};
use crate::storage::StorageTxn;
use crate::Error;
@ -11,44 +11,6 @@ struct Version {
operations: Vec<SyncOp>,
}
/// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations)
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<()> {
// TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps
// unnecessariliy
match op {
SyncOp::Create { uuid } => {
// insert if the task does not already exist
if !txn.create_task(*uuid)? {
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
}
}
SyncOp::Delete { ref uuid } => {
if !txn.delete_task(*uuid)? {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
SyncOp::Update {
ref uuid,
ref property,
ref value,
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(mut task) = txn.get_task(*uuid)? {
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
txn.set_task(*uuid, task)?;
} else {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
}
Ok(())
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub(super) fn sync(
server: &mut Box<dyn Server>,
@ -209,7 +171,7 @@ fn apply_version(
}
}
if let Some(o) = svr_op {
if let Err(e) = apply_op(txn, &o) {
if let Err(e) = apply::apply_op(txn, &o) {
warn!("Invalid operation when syncing: {} (ignored)", e);
}
}

View file

@ -0,0 +1,114 @@
use super::apply;
use crate::storage::{ReplicaOp, StorageTxn};
/// Undo local operations until an UndoPoint.
pub(super) fn undo(txn: &mut dyn StorageTxn) -> anyhow::Result<bool> {
let mut applied = false;
let mut popped = false;
let mut local_ops = txn.operations()?;
while let Some(op) = local_ops.pop() {
popped = true;
if op == ReplicaOp::UndoPoint {
break;
}
let rev_ops = op.reverse_ops();
for op in rev_ops {
apply::apply_op(txn, &op)?;
applied = true;
}
}
if popped {
txn.set_operations(local_ops)?;
txn.commit()?;
}
Ok(applied)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::server::SyncOp;
use crate::taskdb::TaskDb;
use chrono::Utc;
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[test]
fn test_apply_create() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
let timestamp = Utc::now();
// apply a few ops, capture the DB state, make an undo point, and then apply a few more
// ops.
db.apply(SyncOp::Create { uuid: uuid1 })?;
db.apply(SyncOp::Update {
uuid: uuid1,
property: "prop".into(),
value: Some("v1".into()),
timestamp,
})?;
db.apply(SyncOp::Create { uuid: uuid2 })?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop".into(),
value: Some("v2".into()),
timestamp,
})?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop2".into(),
value: Some("v3".into()),
timestamp,
})?;
let db_state = db.sorted_tasks();
db.add_undo_point()?;
db.apply(SyncOp::Delete { uuid: uuid1 })?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop".into(),
value: None,
timestamp,
})?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop2".into(),
value: Some("new-value".into()),
timestamp,
})?;
assert_eq!(db.operations().len(), 9);
{
let mut txn = db.storage.txn()?;
assert!(undo(txn.as_mut())?);
}
// undo took db back to the snapshot
assert_eq!(db.operations().len(), 5);
assert_eq!(db.sorted_tasks(), db_state);
{
let mut txn = db.storage.txn()?;
assert!(undo(txn.as_mut())?);
}
// empty db
assert_eq!(db.operations().len(), 0);
assert_eq!(db.sorted_tasks(), vec![]);
{
let mut txn = db.storage.txn()?;
// nothing left to undo, so undo() returns false
assert!(!undo(txn.as_mut())?);
}
Ok(())
}
}