mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-06-26 10:54:26 +02:00
Apply snapshots automatically on empty taskdbs
This commit is contained in:
parent
636862f8c5
commit
c72cae648d
7 changed files with 86 additions and 18 deletions
|
@ -160,6 +160,10 @@ impl Server for LocalServer {
|
||||||
// the local server never requests a snapshot, so it should never get one
|
// the local server never requests a snapshot, so it should never get one
|
||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -152,4 +152,25 @@ impl Server for RemoteServer {
|
||||||
.send_bytes(ciphertext.as_ref())
|
.send_bytes(ciphertext.as_ref())
|
||||||
.map(|_| ())?)
|
.map(|_| ())?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>> {
|
||||||
|
let url = format!("{}/v1/client/snapshot", self.origin);
|
||||||
|
match self
|
||||||
|
.agent
|
||||||
|
.get(&url)
|
||||||
|
.set("X-Client-Key", &self.client_key.to_string())
|
||||||
|
.call()
|
||||||
|
{
|
||||||
|
Ok(resp) => {
|
||||||
|
let version_id = get_uuid_header(&resp, "X-Version-Id")?;
|
||||||
|
let ciphertext = Ciphertext::from_resp(resp, SNAPSHOT_CONTENT_TYPE)?;
|
||||||
|
let snapshot = ciphertext
|
||||||
|
.open(&self.encryption_secret, version_id)?
|
||||||
|
.payload;
|
||||||
|
Ok(Some((version_id, snapshot)))
|
||||||
|
}
|
||||||
|
Err(ureq::Error::Status(status, _)) if status == 404 => Ok(None),
|
||||||
|
Err(err) => Err(err.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,9 +12,8 @@ struct Version {
|
||||||
history_segment: HistorySegment,
|
history_segment: HistorySegment,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
|
|
||||||
/// TestServer implements the Server trait with a test implementation.
|
/// TestServer implements the Server trait with a test implementation.
|
||||||
|
#[derive(Clone)]
|
||||||
pub(crate) struct TestServer(Arc<Mutex<Inner>>);
|
pub(crate) struct TestServer(Arc<Mutex<Inner>>);
|
||||||
|
|
||||||
pub(crate) struct Inner {
|
pub(crate) struct Inner {
|
||||||
|
@ -35,6 +34,7 @@ impl TestServer {
|
||||||
snapshot: None,
|
snapshot: None,
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
// feel free to add any test utility functions here
|
||||||
|
|
||||||
/// Get a boxed Server implementation referring to this TestServer
|
/// Get a boxed Server implementation referring to this TestServer
|
||||||
pub(crate) fn server(&self) -> Box<dyn Server> {
|
pub(crate) fn server(&self) -> Box<dyn Server> {
|
||||||
|
@ -51,6 +51,12 @@ impl TestServer {
|
||||||
let inner = self.0.lock().unwrap();
|
let inner = self.0.lock().unwrap();
|
||||||
inner.snapshot.as_ref().cloned()
|
inner.snapshot.as_ref().cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete a version from storage
|
||||||
|
pub(crate) fn delete_version(&mut self, parent_version_id: VersionId) {
|
||||||
|
let mut inner = self.0.lock().unwrap();
|
||||||
|
inner.versions.remove(&parent_version_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server for TestServer {
|
impl Server for TestServer {
|
||||||
|
@ -119,4 +125,9 @@ impl Server for TestServer {
|
||||||
inner.snapshot = Some((version_id, snapshot));
|
inner.snapshot = Some((version_id, snapshot));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>> {
|
||||||
|
let inner = self.0.lock().unwrap();
|
||||||
|
Ok(inner.snapshot.clone())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,4 +65,6 @@ pub trait Server {
|
||||||
|
|
||||||
/// Add a snapshot on the server
|
/// Add a snapshot on the server
|
||||||
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()>;
|
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()>;
|
||||||
|
|
||||||
|
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,6 +105,16 @@ pub trait StorageTxn {
|
||||||
/// Note that this is the only way items are removed from the set.
|
/// Note that this is the only way items are removed from the set.
|
||||||
fn clear_working_set(&mut self) -> Result<()>;
|
fn clear_working_set(&mut self) -> Result<()>;
|
||||||
|
|
||||||
|
/// Check whether this storage is entirely empty
|
||||||
|
fn is_empty(&mut self) -> Result<bool> {
|
||||||
|
let mut empty = true;
|
||||||
|
empty = empty && self.all_tasks()?.is_empty();
|
||||||
|
empty = empty && self.get_working_set()? == vec![None];
|
||||||
|
empty = empty && self.base_version()? == Uuid::nil();
|
||||||
|
empty = empty && self.operations()?.is_empty();
|
||||||
|
Ok(empty)
|
||||||
|
}
|
||||||
|
|
||||||
/// Commit any changes made in the transaction. It is an error to call this more than
|
/// Commit any changes made in the transaction. It is an error to call this more than
|
||||||
/// once.
|
/// once.
|
||||||
fn commit(&mut self) -> Result<()>;
|
fn commit(&mut self) -> Result<()>;
|
||||||
|
|
|
@ -70,7 +70,6 @@ impl SnapshotTasks {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
/// Generate a snapshot (compressed, unencrypted) for the current state of the taskdb in the given
|
/// Generate a snapshot (compressed, unencrypted) for the current state of the taskdb in the given
|
||||||
/// storage.
|
/// storage.
|
||||||
pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result<Vec<u8>> {
|
pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result<Vec<u8>> {
|
||||||
|
@ -78,7 +77,6 @@ pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result<Vec<u8>>
|
||||||
all_tasks.encode()
|
all_tasks.encode()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
/// Apply the given snapshot (compressed, unencrypted) to the taskdb's storage.
|
/// Apply the given snapshot (compressed, unencrypted) to the taskdb's storage.
|
||||||
pub(super) fn apply_snapshot(
|
pub(super) fn apply_snapshot(
|
||||||
txn: &mut dyn StorageTxn,
|
txn: &mut dyn StorageTxn,
|
||||||
|
@ -87,14 +85,8 @@ pub(super) fn apply_snapshot(
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let all_tasks = SnapshotTasks::decode(snapshot)?;
|
let all_tasks = SnapshotTasks::decode(snapshot)?;
|
||||||
|
|
||||||
// first, verify that the taskdb truly is empty
|
// double-check emptiness
|
||||||
let mut empty = true;
|
if !txn.is_empty()? {
|
||||||
empty = empty && txn.all_tasks()?.is_empty();
|
|
||||||
empty = empty && txn.get_working_set()? == vec![None];
|
|
||||||
empty = empty && txn.base_version()? == Uuid::nil();
|
|
||||||
empty = empty && txn.operations()?.is_empty();
|
|
||||||
|
|
||||||
if !empty {
|
|
||||||
anyhow::bail!("Cannot apply snapshot to a non-empty task database");
|
anyhow::bail!("Cannot apply snapshot to a non-empty task database");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,15 @@ pub(super) fn sync(
|
||||||
txn: &mut dyn StorageTxn,
|
txn: &mut dyn StorageTxn,
|
||||||
avoid_snapshots: bool,
|
avoid_snapshots: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
// if this taskdb is entirely empty, then start by getting and applying a snapshot
|
||||||
|
if txn.is_empty()? {
|
||||||
|
trace!("storage is empty; attempting to apply a snapshot");
|
||||||
|
if let Some((version, snap)) = server.get_snapshot()? {
|
||||||
|
snapshot::apply_snapshot(txn, version, snap.as_ref())?;
|
||||||
|
trace!("applied snapshot for version {}", version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// retry synchronizing until the server accepts our version (this allows for races between
|
// retry synchronizing until the server accepts our version (this allows for races between
|
||||||
// replicas trying to sync to the same server). If the server insists on the same base
|
// replicas trying to sync to the same server). If the server insists on the same base
|
||||||
// version twice, then we have diverged.
|
// version twice, then we have diverged.
|
||||||
|
@ -293,24 +302,23 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sync_adds_snapshot() -> anyhow::Result<()> {
|
fn test_sync_add_snapshot_start_with_snapshot() -> anyhow::Result<()> {
|
||||||
let test_server = TestServer::new();
|
let mut test_server = TestServer::new();
|
||||||
|
|
||||||
let mut server: Box<dyn Server> = test_server.server();
|
let mut server: Box<dyn Server> = test_server.server();
|
||||||
let mut db1 = newdb();
|
let mut db1 = newdb();
|
||||||
|
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
db1.apply(Operation::Create { uuid }).unwrap();
|
db1.apply(Operation::Create { uuid })?;
|
||||||
db1.apply(Operation::Update {
|
db1.apply(Operation::Update {
|
||||||
uuid,
|
uuid,
|
||||||
property: "title".into(),
|
property: "title".into(),
|
||||||
value: Some("my first task".into()),
|
value: Some("my first task".into()),
|
||||||
timestamp: Utc::now(),
|
timestamp: Utc::now(),
|
||||||
})
|
})?;
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
test_server.set_snapshot_urgency(SnapshotUrgency::High);
|
test_server.set_snapshot_urgency(SnapshotUrgency::High);
|
||||||
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
|
sync(&mut server, db1.storage.txn()?.as_mut(), false)?;
|
||||||
|
|
||||||
// assert that a snapshot was added
|
// assert that a snapshot was added
|
||||||
let base_version = db1.storage.txn()?.base_version()?;
|
let base_version = db1.storage.txn()?.base_version()?;
|
||||||
|
@ -322,6 +330,26 @@ mod test {
|
||||||
let tasks = SnapshotTasks::decode(&s)?.into_inner();
|
let tasks = SnapshotTasks::decode(&s)?.into_inner();
|
||||||
assert_eq!(tasks[0].0, uuid);
|
assert_eq!(tasks[0].0, uuid);
|
||||||
|
|
||||||
|
// update the taskdb and sync again
|
||||||
|
db1.apply(Operation::Update {
|
||||||
|
uuid,
|
||||||
|
property: "title".into(),
|
||||||
|
value: Some("my first task, updated".into()),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
})?;
|
||||||
|
sync(&mut server, db1.storage.txn()?.as_mut(), false)?;
|
||||||
|
|
||||||
|
// delete the first version, so that db2 *must* initialize from
|
||||||
|
// the snapshot
|
||||||
|
test_server.delete_version(Uuid::nil());
|
||||||
|
|
||||||
|
// sync to a new DB and check that we got the expected results
|
||||||
|
let mut db2 = newdb();
|
||||||
|
sync(&mut server, db2.storage.txn()?.as_mut(), false)?;
|
||||||
|
|
||||||
|
let task = db2.get_task(uuid)?.unwrap();
|
||||||
|
assert_eq!(task.get("title").unwrap(), "my first task, updated");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue