From c72cae648df5dffcabf07e6916dcec14ffdc2cea Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Tue, 19 Oct 2021 22:01:37 -0400 Subject: [PATCH 1/2] Apply snapshots automatically on empty taskdbs --- taskchampion/src/server/local.rs | 4 +++ taskchampion/src/server/remote/mod.rs | 21 ++++++++++++++ taskchampion/src/server/test.rs | 15 ++++++++-- taskchampion/src/server/types.rs | 2 ++ taskchampion/src/storage/mod.rs | 10 +++++++ taskchampion/src/taskdb/snapshot.rs | 12 ++------ taskchampion/src/taskdb/sync.rs | 40 +++++++++++++++++++++++---- 7 files changed, 86 insertions(+), 18 deletions(-) diff --git a/taskchampion/src/server/local.rs b/taskchampion/src/server/local.rs index b8c0198a2..37cb06614 100644 --- a/taskchampion/src/server/local.rs +++ b/taskchampion/src/server/local.rs @@ -160,6 +160,10 @@ impl Server for LocalServer { // the local server never requests a snapshot, so it should never get one unreachable!() } + + fn get_snapshot(&mut self) -> anyhow::Result> { + Ok(None) + } } #[cfg(test)] diff --git a/taskchampion/src/server/remote/mod.rs b/taskchampion/src/server/remote/mod.rs index 139f5dc9f..c7d3362e5 100644 --- a/taskchampion/src/server/remote/mod.rs +++ b/taskchampion/src/server/remote/mod.rs @@ -152,4 +152,25 @@ impl Server for RemoteServer { .send_bytes(ciphertext.as_ref()) .map(|_| ())?) } + + fn get_snapshot(&mut self) -> anyhow::Result> { + let url = format!("{}/v1/client/snapshot", self.origin); + match self + .agent + .get(&url) + .set("X-Client-Key", &self.client_key.to_string()) + .call() + { + Ok(resp) => { + let version_id = get_uuid_header(&resp, "X-Version-Id")?; + let ciphertext = Ciphertext::from_resp(resp, SNAPSHOT_CONTENT_TYPE)?; + let snapshot = ciphertext + .open(&self.encryption_secret, version_id)? + .payload; + Ok(Some((version_id, snapshot))) + } + Err(ureq::Error::Status(status, _)) if status == 404 => Ok(None), + Err(err) => Err(err.into()), + } + } } diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index 3901e2fb6..1fff611cc 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -12,9 +12,8 @@ struct Version { history_segment: HistorySegment, } -#[derive(Clone)] - /// TestServer implements the Server trait with a test implementation. +#[derive(Clone)] pub(crate) struct TestServer(Arc>); pub(crate) struct Inner { @@ -35,6 +34,7 @@ impl TestServer { snapshot: None, }))) } + // feel free to add any test utility functions here /// Get a boxed Server implementation referring to this TestServer pub(crate) fn server(&self) -> Box { @@ -51,6 +51,12 @@ impl TestServer { let inner = self.0.lock().unwrap(); inner.snapshot.as_ref().cloned() } + + /// Delete a version from storage + pub(crate) fn delete_version(&mut self, parent_version_id: VersionId) { + let mut inner = self.0.lock().unwrap(); + inner.versions.remove(&parent_version_id); + } } impl Server for TestServer { @@ -119,4 +125,9 @@ impl Server for TestServer { inner.snapshot = Some((version_id, snapshot)); Ok(()) } + + fn get_snapshot(&mut self) -> anyhow::Result> { + let inner = self.0.lock().unwrap(); + Ok(inner.snapshot.clone()) + } } diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index 3a1178c41..fada6c04a 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -65,4 +65,6 @@ pub trait Server { /// Add a snapshot on the server fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()>; + + fn get_snapshot(&mut self) -> anyhow::Result>; } diff --git a/taskchampion/src/storage/mod.rs b/taskchampion/src/storage/mod.rs index 0b3e3da32..a16bb5a7e 100644 --- a/taskchampion/src/storage/mod.rs +++ b/taskchampion/src/storage/mod.rs @@ -105,6 +105,16 @@ pub trait StorageTxn { /// Note that this is the only way items are removed from the set. fn clear_working_set(&mut self) -> Result<()>; + /// Check whether this storage is entirely empty + fn is_empty(&mut self) -> Result { + let mut empty = true; + empty = empty && self.all_tasks()?.is_empty(); + empty = empty && self.get_working_set()? == vec![None]; + empty = empty && self.base_version()? == Uuid::nil(); + empty = empty && self.operations()?.is_empty(); + Ok(empty) + } + /// Commit any changes made in the transaction. It is an error to call this more than /// once. fn commit(&mut self) -> Result<()>; diff --git a/taskchampion/src/taskdb/snapshot.rs b/taskchampion/src/taskdb/snapshot.rs index e054612b3..33ab7e8df 100644 --- a/taskchampion/src/taskdb/snapshot.rs +++ b/taskchampion/src/taskdb/snapshot.rs @@ -70,7 +70,6 @@ impl SnapshotTasks { } } -#[allow(dead_code)] /// Generate a snapshot (compressed, unencrypted) for the current state of the taskdb in the given /// storage. pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result> { @@ -78,7 +77,6 @@ pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result> all_tasks.encode() } -#[allow(dead_code)] /// Apply the given snapshot (compressed, unencrypted) to the taskdb's storage. pub(super) fn apply_snapshot( txn: &mut dyn StorageTxn, @@ -87,14 +85,8 @@ pub(super) fn apply_snapshot( ) -> anyhow::Result<()> { let all_tasks = SnapshotTasks::decode(snapshot)?; - // first, verify that the taskdb truly is empty - let mut empty = true; - empty = empty && txn.all_tasks()?.is_empty(); - empty = empty && txn.get_working_set()? == vec![None]; - empty = empty && txn.base_version()? == Uuid::nil(); - empty = empty && txn.operations()?.is_empty(); - - if !empty { + // double-check emptiness + if !txn.is_empty()? { anyhow::bail!("Cannot apply snapshot to a non-empty task database"); } diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index 2cd8c2717..e77a5db66 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -16,6 +16,15 @@ pub(super) fn sync( txn: &mut dyn StorageTxn, avoid_snapshots: bool, ) -> anyhow::Result<()> { + // if this taskdb is entirely empty, then start by getting and applying a snapshot + if txn.is_empty()? { + trace!("storage is empty; attempting to apply a snapshot"); + if let Some((version, snap)) = server.get_snapshot()? { + snapshot::apply_snapshot(txn, version, snap.as_ref())?; + trace!("applied snapshot for version {}", version); + } + } + // retry synchronizing until the server accepts our version (this allows for races between // replicas trying to sync to the same server). If the server insists on the same base // version twice, then we have diverged. @@ -293,24 +302,23 @@ mod test { } #[test] - fn test_sync_adds_snapshot() -> anyhow::Result<()> { - let test_server = TestServer::new(); + fn test_sync_add_snapshot_start_with_snapshot() -> anyhow::Result<()> { + let mut test_server = TestServer::new(); let mut server: Box = test_server.server(); let mut db1 = newdb(); let uuid = Uuid::new_v4(); - db1.apply(Operation::Create { uuid }).unwrap(); + db1.apply(Operation::Create { uuid })?; db1.apply(Operation::Update { uuid, property: "title".into(), value: Some("my first task".into()), timestamp: Utc::now(), - }) - .unwrap(); + })?; test_server.set_snapshot_urgency(SnapshotUrgency::High); - sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap(); + sync(&mut server, db1.storage.txn()?.as_mut(), false)?; // assert that a snapshot was added let base_version = db1.storage.txn()?.base_version()?; @@ -322,6 +330,26 @@ mod test { let tasks = SnapshotTasks::decode(&s)?.into_inner(); assert_eq!(tasks[0].0, uuid); + // update the taskdb and sync again + db1.apply(Operation::Update { + uuid, + property: "title".into(), + value: Some("my first task, updated".into()), + timestamp: Utc::now(), + })?; + sync(&mut server, db1.storage.txn()?.as_mut(), false)?; + + // delete the first version, so that db2 *must* initialize from + // the snapshot + test_server.delete_version(Uuid::nil()); + + // sync to a new DB and check that we got the expected results + let mut db2 = newdb(); + sync(&mut server, db2.storage.txn()?.as_mut(), false)?; + + let task = db2.get_task(uuid)?.unwrap(); + assert_eq!(task.get("title").unwrap(), "my first task, updated"); + Ok(()) } From ec35d4fa20710a97e228c8397df48899ab6b0813 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 20 Oct 2021 21:15:05 -0400 Subject: [PATCH 2/2] use a distinct error for out-of-sync replica --- cli/src/invocation/cmd/sync.rs | 31 +++++++++++++++++++++++++++---- taskchampion/src/errors.rs | 6 ++++++ taskchampion/src/replica.rs | 2 +- taskchampion/src/taskdb/sync.rs | 3 ++- 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/cli/src/invocation/cmd/sync.rs b/cli/src/invocation/cmd/sync.rs index a042a5bf4..7a3708421 100644 --- a/cli/src/invocation/cmd/sync.rs +++ b/cli/src/invocation/cmd/sync.rs @@ -1,5 +1,5 @@ use crate::settings::Settings; -use taskchampion::{server::Server, Replica}; +use taskchampion::{server::Server, Error as TCError, Replica}; use termcolor::WriteColor; pub(crate) fn execute( @@ -8,9 +8,32 @@ pub(crate) fn execute( settings: &Settings, server: &mut Box, ) -> Result<(), crate::Error> { - replica.sync(server, settings.avoid_snapshots)?; - writeln!(w, "sync complete.")?; - Ok(()) + match replica.sync(server, settings.avoid_snapshots) { + Ok(()) => { + writeln!(w, "sync complete.")?; + Ok(()) + } + Err(e) => match e.downcast() { + Ok(TCError::OutOfSync) => { + writeln!(w, "This replica cannot be synchronized with the server.")?; + writeln!( + w, + "It may be too old, or some other failure may have occurred." + )?; + writeln!( + w, + "To start fresh, remove the local task database and run `ta sync` again." + )?; + writeln!( + w, + "Note that doing so will lose any un-synchronized local changes." + )?; + Ok(()) + } + Ok(e) => Err(e.into()), + Err(e) => Err(e.into()), + }, + } } #[cfg(test)] diff --git a/taskchampion/src/errors.rs b/taskchampion/src/errors.rs index 44bad9881..3209b6ea9 100644 --- a/taskchampion/src/errors.rs +++ b/taskchampion/src/errors.rs @@ -4,6 +4,12 @@ use thiserror::Error; #[non_exhaustive] /// Errors returned from taskchampion operations pub enum Error { + /// A task-database-related error #[error("Task Database Error: {0}")] Database(String), + /// An error specifically indicating that the local replica cannot + /// be synchronized with the sever, due to being out of date or some + /// other irrecoverable error. + #[error("Local replica is out of sync with the server")] + OutOfSync, } diff --git a/taskchampion/src/replica.rs b/taskchampion/src/replica.rs index cc6338ceb..7afbc90a3 100644 --- a/taskchampion/src/replica.rs +++ b/taskchampion/src/replica.rs @@ -140,7 +140,7 @@ impl Replica { ) -> anyhow::Result<()> { self.taskdb .sync(server, avoid_snapshots) - .context("Failed to synchronize")?; + .context("Failed to synchronize with server")?; self.rebuild_working_set(false) .context("Failed to rebuild working set after sync")?; Ok(()) diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index e77a5db66..7ce1d76ce 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -1,6 +1,7 @@ use super::{ops, snapshot}; use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency}; use crate::storage::{Operation, StorageTxn}; +use crate::Error; use log::{info, trace, warn}; use serde::{Deserialize, Serialize}; use std::str; @@ -97,7 +98,7 @@ pub(super) fn sync( ); if let Some(requested) = requested_parent_version_id { if parent_version_id == requested { - anyhow::bail!("Server's task history has diverged from this replica"); + return Err(Error::OutOfSync.into()); } } requested_parent_version_id = Some(parent_version_id);