Merge pull request #300 from djmitche/issue23-client

Client support for adding snapshots
This commit is contained in:
Dustin J. Mitchell 2021-10-13 16:44:50 -04:00 committed by GitHub
commit bd2189e589
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1141 additions and 514 deletions

View file

@ -0,0 +1,2 @@
- The `avoid_snapshots` configuration value, if set, will cause the replica to
avoid creating snapshots unless required.

1
Cargo.lock generated
View file

@ -2962,6 +2962,7 @@ version = "0.4.1"
dependencies = [
"anyhow",
"chrono",
"flate2",
"log",
"pretty_assertions",
"proptest",

View file

@ -1,12 +1,14 @@
use crate::settings::Settings;
use taskchampion::{server::Server, Replica};
use termcolor::WriteColor;
pub(crate) fn execute<W: WriteColor>(
w: &mut W,
replica: &mut Replica,
settings: &Settings,
server: &mut Box<dyn Server>,
) -> Result<(), crate::Error> {
replica.sync(server)?;
replica.sync(server, settings.avoid_snapshots)?;
writeln!(w, "sync complete.")?;
Ok(())
}
@ -24,9 +26,10 @@ mod test {
let mut replica = test_replica();
let server_dir = TempDir::new().unwrap();
let mut server = test_server(&server_dir);
let settings = Settings::default();
// Note that the details of the actual sync are tested thoroughly in the taskchampion crate
execute(&mut w, &mut replica, &mut server).unwrap();
execute(&mut w, &mut replica, &settings, &mut server).unwrap();
assert_eq!(&w.into_string(), "sync complete.\n")
}
}

View file

@ -87,7 +87,7 @@ pub(crate) fn invoke(command: Command, settings: Settings) -> Result<(), crate::
..
} => {
let mut server = get_server(&settings)?;
return cmd::sync::execute(&mut w, &mut replica, &mut server);
return cmd::sync::execute(&mut w, &mut replica, &settings, &mut server);
}
// handled in the first match, but here to ensure this match is exhaustive

View file

@ -22,6 +22,7 @@ pub(crate) struct Settings {
/// replica
pub(crate) data_dir: PathBuf,
pub(crate) avoid_snapshots: bool,
/// remote sync server
pub(crate) server_client_key: Option<String>,
@ -91,6 +92,7 @@ impl Settings {
let table_keys = [
"data_dir",
"modification_count_prompt",
"avoid_snapshots",
"server_client_key",
"server_origin",
"encryption_secret",
@ -124,6 +126,20 @@ impl Settings {
Ok(())
}
fn get_bool_cfg<F: FnOnce(bool)>(
table: &Table,
name: &'static str,
setter: F,
) -> Result<()> {
if let Some(v) = table.get(name) {
setter(
v.as_bool()
.ok_or_else(|| anyhow!(".{}: not a boolean value", name))?,
);
}
Ok(())
}
get_str_cfg(table, "data_dir", |v| {
self.data_dir = v.into();
})?;
@ -132,6 +148,10 @@ impl Settings {
self.modification_count_prompt = Some(v);
})?;
get_bool_cfg(table, "avoid_snapshots", |v| {
self.avoid_snapshots = v;
})?;
get_str_cfg(table, "server_client_key", |v| {
self.server_client_key = Some(v);
})?;
@ -313,6 +333,7 @@ impl Default for Settings {
filename: None,
data_dir,
modification_count_prompt: None,
avoid_snapshots: false,
server_client_key: None,
server_origin: None,
encryption_secret: None,

View file

@ -46,6 +46,12 @@ If using a remote server:
* `server_client_key` - Client key to identify this replica to the sync server (a UUID)
If not set, then sync is done to a local server.
## Snapshots
* `avoid_snapshots` - If running on a CPU-, memory-, or bandwidth-constrained
device, set this to true. The effect is that this replica will wait longer
to produce a snapshot, in the hopes that other replicas will do so first.
## Reports
* `reports` - a mapping of each report's name to its definition.

View file

@ -38,6 +38,41 @@ This observation allows the server to discard older versions.
The third invariant prevents the server from discarding versions if there is no snapshot.
The fourth invariant prevents the server from discarding versions newer than the snapshot.
## Data Formats
### Encryption
TBD (#299)
### Version
The decrypted form of a version is a JSON array containing operations in the order they should be applied.
Each operation has the form `{TYPE: DATA}`, for example:
* `{"Create":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7"}}`
* `{"Delete":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7"}}`
* `{"Update":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7","property":"prop","value":"v","timestamp":"2021-10-11T12:47:07.188090948Z"}}`
* `{"Update":{"uuid":"56e0be07-c61f-494c-a54c-bdcfdd52d2a7","property":"prop","value":null,"timestamp":"2021-10-11T12:47:07.188090948Z"}}` (to delete a property)
Timestamps are in RFC3339 format with a `Z` suffix.
### Snapshot
The decrypted form of a snapshot is a JSON object mapping task IDs to task properties.
For example (pretty-printed for clarity):
```json
{
"56e0be07-c61f-494c-a54c-bdcfdd52d2a7": {
"description": "a task",
"priority": "H"
},
"4b7ed904-f7b0-4293-8a10-ad452422c7b3": {
"description": "another task"
}
}
```
## Transactions
### AddVersion

View file

@ -41,8 +41,8 @@ async fn cross_sync() -> anyhow::Result<()> {
t1.start()?;
let t1 = t1.into_immut();
rep1.sync(&mut serv1)?;
rep2.sync(&mut serv2)?;
rep1.sync(&mut serv1, false)?;
rep2.sync(&mut serv2, false)?;
// those tasks should exist on rep2 now
let t12 = rep2
@ -66,9 +66,9 @@ async fn cross_sync() -> anyhow::Result<()> {
t12.set_status(Status::Completed)?;
// sync those changes back and forth
rep1.sync(&mut serv1)?; // rep1 -> server
rep2.sync(&mut serv2)?; // server -> rep2, rep2 -> server
rep1.sync(&mut serv1)?; // server -> rep1
rep1.sync(&mut serv1, false)?; // rep1 -> server
rep2.sync(&mut serv2, false)?; // server -> rep2, rep2 -> server
rep1.sync(&mut serv1, false)?; // server -> rep1
let t1 = rep1
.get_task(t1.get_uuid())?

View file

@ -23,6 +23,7 @@ tindercrypt = { version = "^0.2.2", default-features = false }
rusqlite = { version = "0.25", features = ["bundled"] }
strum = "0.21"
strum_macros = "0.21"
flate2 = "1"
[dev-dependencies]
proptest = "^1.0.0"

View file

@ -126,8 +126,21 @@ impl Replica {
/// Synchronize this replica against the given server. The working set is rebuilt after
/// this occurs, but without renumbering, so any newly-pending tasks should appear in
/// the working set.
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> anyhow::Result<()> {
self.taskdb.sync(server).context("Failed to synchronize")?;
///
/// If `avoid_snapshots` is true, the sync operations produces a snapshot only when the server
/// indicate it is urgent (snapshot urgency "high"). This allows time for other replicas to
/// create a snapshot before this one does.
///
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
/// system
pub fn sync(
&mut self,
server: &mut Box<dyn Server>,
avoid_snapshots: bool,
) -> anyhow::Result<()> {
self.taskdb
.sync(server, avoid_snapshots)
.context("Failed to synchronize")?;
self.rebuild_working_set(false)
.context("Failed to rebuild working set after sync")?;
Ok(())

View file

@ -1,5 +1,6 @@
use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NIL_VERSION_ID,
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
VersionId, NIL_VERSION_ID,
};
use crate::storage::sqlite::StoredUuid;
use anyhow::Context;
@ -110,20 +111,21 @@ impl Server for LocalServer {
// TODO: better transaction isolation for add_version (gets and sets should be in the same
// transaction)
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> {
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
// no client lookup
// no signature validation
// check the parent_version_id for linearity
let latest_version_id = self.get_latest_version_id()?;
if latest_version_id != NIL_VERSION_ID && parent_version_id != latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(latest_version_id));
return Ok((
AddVersionResult::ExpectedParentVersion(latest_version_id),
SnapshotUrgency::None,
));
}
// invent a new ID for this version
@ -136,10 +138,9 @@ impl Server for LocalServer {
})?;
self.set_latest_version_id(version_id)?;
Ok(AddVersionResult::Ok(version_id))
Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None))
}
/// Get a vector of all versions after `since_version`
fn get_child_version(
&mut self,
parent_version_id: VersionId,
@ -154,6 +155,11 @@ impl Server for LocalServer {
Ok(GetVersionResult::NoSuchVersion)
}
}
fn add_snapshot(&mut self, _version_id: VersionId, _snapshot: Snapshot) -> anyhow::Result<()> {
// the local server never requests a snapshot, so it should never get one
unreachable!()
}
}
#[cfg(test)]
@ -176,7 +182,7 @@ mod test {
let tmp_dir = TempDir::new()?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec();
match server.add_version(NIL_VERSION_ID, history.clone())? {
match server.add_version(NIL_VERSION_ID, history.clone())?.0 {
AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version")
}
@ -204,7 +210,7 @@ mod test {
let parent_version_id = Uuid::new_v4() as VersionId;
// This is OK because the server has no latest_version_id yet
match server.add_version(parent_version_id, history.clone())? {
match server.add_version(parent_version_id, history.clone())?.0 {
AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version")
}
@ -232,14 +238,16 @@ mod test {
let parent_version_id = Uuid::new_v4() as VersionId;
// add a version
if let AddVersionResult::ExpectedParentVersion(_) =
if let (AddVersionResult::ExpectedParentVersion(_), SnapshotUrgency::None) =
server.add_version(parent_version_id, history.clone())?
{
panic!("should have accepted the version")
}
// then add another, not based on that one
if let AddVersionResult::Ok(_) = server.add_version(parent_version_id, history.clone())? {
if let (AddVersionResult::Ok(_), SnapshotUrgency::None) =
server.add_version(parent_version_id, history.clone())?
{
panic!("should not have accepted the version")
}

View file

@ -1,5 +1,4 @@
use crate::server::HistorySegment;
use std::convert::TryFrom;
use std::io::Read;
use tindercrypt::cryptors::RingCryptor;
use uuid::Uuid;
@ -18,45 +17,31 @@ impl AsRef<[u8]> for Secret {
}
}
/// A cleartext payload containing a history segment.
pub(super) struct HistoryCleartext {
pub(super) parent_version_id: Uuid,
pub(super) history_segment: HistorySegment,
/// A cleartext payload with an attached version_id. The version_id is used to
/// validate the context of the payload.
pub(super) struct Cleartext {
pub(super) version_id: Uuid,
pub(super) payload: HistorySegment,
}
impl HistoryCleartext {
impl Cleartext {
/// Seal the payload into its ciphertext
pub(super) fn seal(self, secret: &Secret) -> anyhow::Result<HistoryCiphertext> {
let cryptor = RingCryptor::new().with_aad(self.parent_version_id.as_bytes());
let ciphertext = cryptor.seal_with_passphrase(secret.as_ref(), &self.history_segment)?;
Ok(HistoryCiphertext(ciphertext))
pub(super) fn seal(self, secret: &Secret) -> anyhow::Result<Ciphertext> {
let cryptor = RingCryptor::new().with_aad(self.version_id.as_bytes());
let ciphertext = cryptor.seal_with_passphrase(secret.as_ref(), &self.payload)?;
Ok(Ciphertext(ciphertext))
}
}
/// An ecrypted payload containing a history segment
pub(super) struct HistoryCiphertext(pub(super) Vec<u8>);
/// An ecrypted payload
pub(super) struct Ciphertext(pub(super) Vec<u8>);
impl HistoryCiphertext {
pub(super) fn open(
self,
secret: &Secret,
parent_version_id: Uuid,
) -> anyhow::Result<HistoryCleartext> {
let cryptor = RingCryptor::new().with_aad(parent_version_id.as_bytes());
let plaintext = cryptor.open(secret.as_ref(), &self.0)?;
Ok(HistoryCleartext {
parent_version_id,
history_segment: plaintext,
})
}
}
impl TryFrom<ureq::Response> for HistoryCiphertext {
type Error = anyhow::Error;
fn try_from(resp: ureq::Response) -> Result<HistoryCiphertext, anyhow::Error> {
if let Some("application/vnd.taskchampion.history-segment") = resp.header("Content-Type") {
impl Ciphertext {
pub(super) fn from_resp(
resp: ureq::Response,
content_type: &str,
) -> Result<Ciphertext, anyhow::Error> {
if resp.header("Content-Type") == Some(content_type) {
let mut reader = resp.into_reader();
let mut bytes = vec![];
reader.read_to_end(&mut bytes)?;
@ -67,9 +52,19 @@ impl TryFrom<ureq::Response> for HistoryCiphertext {
))
}
}
pub(super) fn open(self, secret: &Secret, version_id: Uuid) -> anyhow::Result<Cleartext> {
let cryptor = RingCryptor::new().with_aad(version_id.as_bytes());
let plaintext = cryptor.open(secret.as_ref(), &self.0)?;
Ok(Cleartext {
version_id,
payload: plaintext,
})
}
}
impl AsRef<[u8]> for HistoryCiphertext {
impl AsRef<[u8]> for Ciphertext {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
@ -82,52 +77,50 @@ mod test {
#[test]
fn round_trip() {
let parent_version_id = Uuid::new_v4();
let history_segment = b"HISTORY REPEATS ITSELF".to_vec();
let version_id = Uuid::new_v4();
let payload = b"HISTORY REPEATS ITSELF".to_vec();
let secret = Secret(b"SEKRIT".to_vec());
let history_cleartext = HistoryCleartext {
parent_version_id,
history_segment: history_segment.clone(),
let cleartext = Cleartext {
version_id,
payload: payload.clone(),
};
let history_ciphertext = history_cleartext.seal(&secret).unwrap();
let history_cleartext = history_ciphertext.open(&secret, parent_version_id).unwrap();
let ciphertext = cleartext.seal(&secret).unwrap();
let cleartext = ciphertext.open(&secret, version_id).unwrap();
assert_eq!(history_cleartext.history_segment, history_segment);
assert_eq!(history_cleartext.parent_version_id, parent_version_id);
assert_eq!(cleartext.payload, payload);
assert_eq!(cleartext.version_id, version_id);
}
#[test]
fn round_trip_bad_key() {
let parent_version_id = Uuid::new_v4();
let history_segment = b"HISTORY REPEATS ITSELF".to_vec();
let version_id = Uuid::new_v4();
let payload = b"HISTORY REPEATS ITSELF".to_vec();
let secret = Secret(b"SEKRIT".to_vec());
let history_cleartext = HistoryCleartext {
parent_version_id,
history_segment: history_segment.clone(),
let cleartext = Cleartext {
version_id,
payload: payload.clone(),
};
let history_ciphertext = history_cleartext.seal(&secret).unwrap();
let ciphertext = cleartext.seal(&secret).unwrap();
let secret = Secret(b"BADSEKRIT".to_vec());
assert!(history_ciphertext.open(&secret, parent_version_id).is_err());
assert!(ciphertext.open(&secret, version_id).is_err());
}
#[test]
fn round_trip_bad_pvid() {
let parent_version_id = Uuid::new_v4();
let history_segment = b"HISTORY REPEATS ITSELF".to_vec();
fn round_trip_bad_version() {
let version_id = Uuid::new_v4();
let payload = b"HISTORY REPEATS ITSELF".to_vec();
let secret = Secret(b"SEKRIT".to_vec());
let history_cleartext = HistoryCleartext {
parent_version_id,
history_segment: history_segment.clone(),
let cleartext = Cleartext {
version_id,
payload: payload.clone(),
};
let history_ciphertext = history_cleartext.seal(&secret).unwrap();
let ciphertext = cleartext.seal(&secret).unwrap();
let bad_parent_version_id = Uuid::new_v4();
assert!(history_ciphertext
.open(&secret, bad_parent_version_id)
.is_err());
let bad_version_id = Uuid::new_v4();
assert!(ciphertext.open(&secret, bad_version_id).is_err());
}
}

View file

@ -1,10 +1,12 @@
use crate::server::{AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId};
use std::convert::TryInto;
use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
VersionId,
};
use std::time::Duration;
use uuid::Uuid;
mod crypto;
use crypto::{HistoryCiphertext, HistoryCleartext, Secret};
use crypto::{Ciphertext, Cleartext, Secret};
pub struct RemoteServer {
origin: String,
@ -13,6 +15,12 @@ pub struct RemoteServer {
agent: ureq::Agent,
}
/// The content-type for history segments (opaque blobs of bytes)
const HISTORY_SEGMENT_CONTENT_TYPE: &str = "application/vnd.taskchampion.history-segment";
/// The content-type for snapshots (opaque blobs of bytes)
const SNAPSHOT_CONTENT_TYPE: &str = "application/vnd.taskchampion.snapshot";
/// A RemoeServer communicates with a remote server over HTTP (such as with
/// taskchampion-sync-server).
impl RemoteServer {
@ -43,38 +51,53 @@ fn get_uuid_header(resp: &ureq::Response, name: &str) -> anyhow::Result<Uuid> {
Ok(value)
}
/// Read the X-Snapshot-Request header and return a SnapshotUrgency
fn get_snapshot_urgency(resp: &ureq::Response) -> SnapshotUrgency {
match resp.header("X-Snapshot-Request") {
None => SnapshotUrgency::None,
Some(hdr) => match hdr {
"urgency=low" => SnapshotUrgency::Low,
"urgency=high" => SnapshotUrgency::High,
_ => SnapshotUrgency::None,
},
}
}
impl Server for RemoteServer {
fn add_version(
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> {
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
let url = format!(
"{}/v1/client/add-version/{}",
self.origin, parent_version_id
);
let history_cleartext = HistoryCleartext {
parent_version_id,
history_segment,
let cleartext = Cleartext {
version_id: parent_version_id,
payload: history_segment,
};
let history_ciphertext = history_cleartext.seal(&self.encryption_secret)?;
let ciphertext = cleartext.seal(&self.encryption_secret)?;
match self
.agent
.post(&url)
.set(
"Content-Type",
"application/vnd.taskchampion.history-segment",
)
.set("Content-Type", HISTORY_SEGMENT_CONTENT_TYPE)
.set("X-Client-Key", &self.client_key.to_string())
.send_bytes(history_ciphertext.as_ref())
.send_bytes(ciphertext.as_ref())
{
Ok(resp) => {
let version_id = get_uuid_header(&resp, "X-Version-Id")?;
Ok(AddVersionResult::Ok(version_id))
Ok((
AddVersionResult::Ok(version_id),
get_snapshot_urgency(&resp),
))
}
Err(ureq::Error::Status(status, resp)) if status == 409 => {
let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?;
Ok(AddVersionResult::ExpectedParentVersion(parent_version_id))
Ok((
AddVersionResult::ExpectedParentVersion(parent_version_id),
SnapshotUrgency::None,
))
}
Err(err) => Err(err.into()),
}
@ -97,10 +120,10 @@ impl Server for RemoteServer {
Ok(resp) => {
let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?;
let version_id = get_uuid_header(&resp, "X-Version-Id")?;
let history_ciphertext: HistoryCiphertext = resp.try_into()?;
let history_segment = history_ciphertext
let ciphertext = Ciphertext::from_resp(resp, HISTORY_SEGMENT_CONTENT_TYPE)?;
let history_segment = ciphertext
.open(&self.encryption_secret, parent_version_id)?
.history_segment;
.payload;
Ok(GetVersionResult::Version {
version_id,
parent_version_id,
@ -113,4 +136,20 @@ impl Server for RemoteServer {
Err(err) => Err(err.into()),
}
}
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> {
let url = format!("{}/v1/client/add-snapshot/{}", self.origin, version_id);
let cleartext = Cleartext {
version_id,
payload: snapshot,
};
let ciphertext = cleartext.seal(&self.encryption_secret)?;
Ok(self
.agent
.post(&url)
.set("Content-Type", SNAPSHOT_CONTENT_TYPE)
.set("X-Client-Key", &self.client_key.to_string())
.send_bytes(ciphertext.as_ref())
.map(|_| ())?)
}
}

View file

@ -1,7 +1,9 @@
use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NIL_VERSION_ID,
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
VersionId, NIL_VERSION_ID,
};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
struct Version {
@ -10,19 +12,44 @@ struct Version {
history_segment: HistorySegment,
}
pub(crate) struct TestServer {
#[derive(Clone)]
/// TestServer implements the Server trait with a test implementation.
pub(crate) struct TestServer(Arc<Mutex<Inner>>);
pub(crate) struct Inner {
latest_version_id: VersionId,
// NOTE: indexed by parent_version_id!
versions: HashMap<VersionId, Version>,
snapshot_urgency: SnapshotUrgency,
snapshot: Option<(VersionId, Snapshot)>,
}
impl TestServer {
/// A test server has no notion of clients, signatures, encryption, etc.
pub fn new() -> TestServer {
TestServer {
pub(crate) fn new() -> TestServer {
TestServer(Arc::new(Mutex::new(Inner {
latest_version_id: NIL_VERSION_ID,
versions: HashMap::new(),
snapshot_urgency: SnapshotUrgency::None,
snapshot: None,
})))
}
/// Get a boxed Server implementation referring to this TestServer
pub(crate) fn server(&self) -> Box<dyn Server> {
Box::new(self.clone())
}
pub(crate) fn set_snapshot_urgency(&self, urgency: SnapshotUrgency) {
let mut inner = self.0.lock().unwrap();
inner.snapshot_urgency = urgency;
}
/// Get the latest snapshot added to this server
pub(crate) fn snapshot(&self) -> Option<(VersionId, Snapshot)> {
let inner = self.0.lock().unwrap();
inner.snapshot.as_ref().cloned()
}
}
@ -33,23 +60,25 @@ impl Server for TestServer {
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> {
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
let mut inner = self.0.lock().unwrap();
// no client lookup
// no signature validation
// check the parent_version_id for linearity
if self.latest_version_id != NIL_VERSION_ID {
if parent_version_id != self.latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(
self.latest_version_id,
if inner.latest_version_id != NIL_VERSION_ID && parent_version_id != inner.latest_version_id
{
return Ok((
AddVersionResult::ExpectedParentVersion(inner.latest_version_id),
SnapshotUrgency::None,
));
}
}
// invent a new ID for this version
let version_id = Uuid::new_v4();
self.versions.insert(
inner.versions.insert(
parent_version_id,
Version {
version_id,
@ -57,9 +86,12 @@ impl Server for TestServer {
history_segment,
},
);
self.latest_version_id = version_id;
inner.latest_version_id = version_id;
Ok(AddVersionResult::Ok(version_id))
// reply with the configured urgency and reset it to None
let urgency = inner.snapshot_urgency;
inner.snapshot_urgency = SnapshotUrgency::None;
Ok((AddVersionResult::Ok(version_id), urgency))
}
/// Get a vector of all versions after `since_version`
@ -67,7 +99,9 @@ impl Server for TestServer {
&mut self,
parent_version_id: VersionId,
) -> anyhow::Result<GetVersionResult> {
if let Some(version) = self.versions.get(&parent_version_id) {
let inner = self.0.lock().unwrap();
if let Some(version) = inner.versions.get(&parent_version_id) {
Ok(GetVersionResult::Version {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
@ -77,4 +111,12 @@ impl Server for TestServer {
Ok(GetVersionResult::NoSuchVersion)
}
}
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> {
let mut inner = self.0.lock().unwrap();
// test implementation -- does not perform any validation
inner.snapshot = Some((version_id, snapshot));
Ok(())
}
}

View file

@ -10,7 +10,11 @@ pub const NIL_VERSION_ID: VersionId = Uuid::nil();
/// data is pre-encoded, and from the protocol level appears as a sequence of bytes.
pub type HistorySegment = Vec<u8>;
/// VersionAdd is the response type from [`crate::server::Server::add_version`].
/// A snapshot of the state of the task database. This is encoded by the taskdb implementation
/// and treated as a sequence of bytes by the server implementation.
pub type Snapshot = Vec<u8>;
/// AddVersionResult is the response type from [`crate::server::Server::add_version`].
#[derive(Debug, PartialEq)]
pub enum AddVersionResult {
/// OK, version added with the given ID
@ -19,6 +23,17 @@ pub enum AddVersionResult {
ExpectedParentVersion(VersionId),
}
/// SnapshotUrgency indicates how much the server would like this replica to send a snapshot.
#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)]
pub enum SnapshotUrgency {
/// Don't need a snapshot right now.
None,
/// A snapshot would be good, but can wait for other replicas to provide it.
Low,
/// A snapshot is needed right now.
High,
}
/// A version as downloaded from the server
#[derive(Debug, PartialEq)]
pub enum GetVersionResult {
@ -40,11 +55,14 @@ pub trait Server {
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult>;
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)>;
/// Get the version with the given parent VersionId
fn get_child_version(
&mut self,
parent_version_id: VersionId,
) -> anyhow::Result<GetVersionResult>;
/// Add a snapshot on the server
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()>;
}

View file

@ -274,6 +274,72 @@ mod test {
);
}
#[test]
fn test_json_create() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Create { uuid };
assert_eq!(
serde_json::to_string(&op)?,
format!(r#"{{"Create":{{"uuid":"{}"}}}}"#, uuid),
);
Ok(())
}
#[test]
fn test_json_delete() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Delete { uuid };
assert_eq!(
serde_json::to_string(&op)?,
format!(r#"{{"Delete":{{"uuid":"{}"}}}}"#, uuid),
);
Ok(())
}
#[test]
fn test_json_update() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp,
};
assert_eq!(
serde_json::to_string(&op)?,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":"false","timestamp":"{:?}"}}}}"#,
uuid, timestamp,
),
);
Ok(())
}
#[test]
fn test_json_update_none() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
value: None,
timestamp,
};
assert_eq!(
serde_json::to_string(&op)?,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":null,"timestamp":"{:?}"}}}}"#,
uuid, timestamp,
),
);
Ok(())
}
fn uuid_strategy() -> impl Strategy<Value = Uuid> {
prop_oneof![
Just(Uuid::parse_str("83a2f9ef-f455-4195-b92e-a54c161eebfc").unwrap()),

View file

@ -1,10 +1,11 @@
use crate::server::Server;
use crate::storage::{Operation, Storage, TaskMap};
use std::collections::HashSet;
use uuid::Uuid;
mod ops;
mod snapshot;
mod sync;
mod working_set;
/// A TaskDb is the backend for a replica. It manages the storage, operations, synchronization,
/// and so on, and all the invariants that come with it. It leaves the meaning of particular task
@ -74,57 +75,7 @@ impl TaskDb {
where
F: Fn(&TaskMap) -> bool,
{
let mut txn = self.storage.txn()?;
let mut new_ws = vec![None]; // index 0 is always None
let mut seen = HashSet::new();
// The goal here is for existing working-set items to be "compressed' down to index 1, so
// we begin by scanning the current working set and inserting any tasks that should still
// be in the set into new_ws, implicitly dropping any tasks that are no longer in the
// working set.
for elt in txn.get_working_set()?.drain(1..) {
if let Some(uuid) = elt {
if let Some(task) = txn.get_task(uuid)? {
if in_working_set(&task) {
new_ws.push(Some(uuid));
seen.insert(uuid);
continue;
}
}
}
// if we are not renumbering, then insert a blank working-set entry here
if !renumber {
new_ws.push(None);
}
}
// if renumbering, clear the working set and re-add
if renumber {
txn.clear_working_set()?;
for elt in new_ws.drain(1..new_ws.len()).flatten() {
txn.add_to_working_set(elt)?;
}
} else {
// ..otherwise, just clear the None items determined above from the working set
for (i, elt) in new_ws.iter().enumerate().skip(1) {
if elt.is_none() {
txn.set_working_set_item(i, None)?;
}
}
}
// Now go hunting for tasks that should be in this list but are not, adding them at the
// end of the list, whether renumbering or not
for (uuid, task) in txn.all_tasks()? {
if !seen.contains(&uuid) && in_working_set(&task) {
txn.add_to_working_set(uuid)?;
}
}
txn.commit()?;
Ok(())
working_set::rebuild(self.storage.txn()?.as_mut(), in_working_set, renumber)
}
/// Add the given uuid to the working set and return its index; if it is already in the working
@ -145,9 +96,20 @@ impl TaskDb {
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> anyhow::Result<()> {
///
/// If `avoid_snapshots` is true, the sync operations produces a snapshot only when the server
/// indicate it is urgent (snapshot urgency "high"). This allows time for other replicas to
/// create a snapshot before this one does.
///
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
/// system
pub fn sync(
&mut self,
server: &mut Box<dyn Server>,
avoid_snapshots: bool,
) -> anyhow::Result<()> {
let mut txn = self.storage.txn()?;
sync::sync(server, txn.as_mut())
sync::sync(server, txn.as_mut(), avoid_snapshots)
}
// functions for supporting tests
@ -190,11 +152,12 @@ mod tests {
use chrono::Utc;
use pretty_assertions::assert_eq;
use proptest::prelude::*;
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn test_apply_create() {
fn test_apply() {
// this verifies that the operation is both applied and included in the list of
// operations; more detailed tests are in the `ops` module.
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
@ -204,345 +167,10 @@ mod tests {
assert_eq!(db.operations(), vec![op]);
}
#[test]
fn test_apply_create_exists() {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
db.apply(op.clone()).unwrap();
assert_eq!(
db.apply(op.clone()).err().unwrap().to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]);
assert_eq!(db.operations(), vec![op]);
}
#[test]
fn test_apply_create_update() {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap();
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
db.apply(op2.clone()).unwrap();
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("title".into(), "my task".into())])]
);
assert_eq!(db.operations(), vec![op1, op2]);
}
#[test]
fn test_apply_create_update_delete_prop() {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap();
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
db.apply(op2.clone()).unwrap();
let op3 = Operation::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: Utc::now(),
};
db.apply(op3.clone()).unwrap();
let op4 = Operation::Update {
uuid,
property: String::from("title"),
value: None,
timestamp: Utc::now(),
};
db.apply(op4.clone()).unwrap();
let mut exp = HashMap::new();
let mut task = HashMap::new();
task.insert(String::from("priority"), String::from("H"));
exp.insert(uuid, task);
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("priority".into(), "H".into())])]
);
assert_eq!(db.operations(), vec![op1, op2, op3, op4]);
}
#[test]
fn test_apply_update_does_not_exist() {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
assert_eq!(
db.apply(op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(db.operations(), vec![]);
}
#[test]
fn test_apply_create_delete() {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap();
let op2 = Operation::Delete { uuid };
db.apply(op2.clone()).unwrap();
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(db.operations(), vec![op1, op2]);
}
#[test]
fn test_apply_delete_not_present() {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Delete { uuid };
assert_eq!(
db.apply(op1).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(db.operations(), vec![]);
}
#[test]
fn rebuild_working_set_renumber() -> anyhow::Result<()> {
rebuild_working_set(true)
}
#[test]
fn rebuild_working_set_no_renumber() -> anyhow::Result<()> {
rebuild_working_set(false)
}
fn rebuild_working_set(renumber: bool) -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let mut uuids = vec![];
uuids.push(Uuid::new_v4());
println!("uuids[0]: {:?} - pending, not in working set", uuids[0]);
uuids.push(Uuid::new_v4());
println!("uuids[1]: {:?} - pending, in working set", uuids[1]);
uuids.push(Uuid::new_v4());
println!("uuids[2]: {:?} - not pending, not in working set", uuids[2]);
uuids.push(Uuid::new_v4());
println!("uuids[3]: {:?} - not pending, in working set", uuids[3]);
uuids.push(Uuid::new_v4());
println!("uuids[4]: {:?} - pending, in working set", uuids[4]);
// add everything to the TaskDb
for uuid in &uuids {
db.apply(Operation::Create { uuid: *uuid })?;
}
for i in &[0usize, 1, 4] {
db.apply(Operation::Update {
uuid: uuids[*i].clone(),
property: String::from("status"),
value: Some("pending".into()),
timestamp: Utc::now(),
})?;
}
// set the existing working_set as we want it
{
let mut txn = db.storage.txn()?;
txn.clear_working_set()?;
for i in &[1usize, 3, 4] {
txn.add_to_working_set(uuids[*i])?;
}
txn.commit()?;
}
assert_eq!(
db.working_set()?,
vec![
None,
Some(uuids[1].clone()),
Some(uuids[3].clone()),
Some(uuids[4].clone())
]
);
db.rebuild_working_set(
|t| {
if let Some(status) = t.get("status") {
status == "pending"
} else {
false
}
},
renumber,
)?;
let exp = if renumber {
// uuids[1] and uuids[4] are already in the working set, so are compressed
// to the top, and then uuids[0] is added.
vec![
None,
Some(uuids[1].clone()),
Some(uuids[4].clone()),
Some(uuids[0].clone()),
]
} else {
// uuids[1] and uuids[4] are already in the working set, at indexes 1 and 3,
// and then uuids[0] is added.
vec![
None,
Some(uuids[1].clone()),
None,
Some(uuids[4].clone()),
Some(uuids[0].clone()),
]
};
assert_eq!(db.working_set()?, exp);
Ok(())
}
fn newdb() -> TaskDb {
TaskDb::new(Box::new(InMemoryStorage::new()))
}
#[test]
fn test_sync() {
let mut server: Box<dyn Server> = Box::new(TestServer::new());
let mut db1 = newdb();
db1.sync(&mut server).unwrap();
let mut db2 = newdb();
db2.sync(&mut server).unwrap();
// make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4();
db1.apply(Operation::Create { uuid: uuid1 }).unwrap();
db1.apply(Operation::Update {
uuid: uuid1,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();
let uuid2 = Uuid::new_v4();
db2.apply(Operation::Create { uuid: uuid2 }).unwrap();
db2.apply(Operation::Update {
uuid: uuid2,
property: "title".into(),
value: Some("my second task".into()),
timestamp: Utc::now(),
})
.unwrap();
// and synchronize those around
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// now make updates to the same task on both sides
db1.apply(Operation::Update {
uuid: uuid2,
property: "priority".into(),
value: Some("H".into()),
timestamp: Utc::now(),
})
.unwrap();
db2.apply(Operation::Update {
uuid: uuid2,
property: "project".into(),
value: Some("personal".into()),
timestamp: Utc::now(),
})
.unwrap();
// and synchronize those around
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
#[test]
fn test_sync_create_delete() {
let mut server: Box<dyn Server> = Box::new(TestServer::new());
let mut db1 = newdb();
db1.sync(&mut server).unwrap();
let mut db2 = newdb();
db2.sync(&mut server).unwrap();
// create and update a task..
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();
// and synchronize those around
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// delete and re-create the task on db1
db1.apply(Operation::Delete { uuid }).unwrap();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
uuid,
property: "title".into(),
value: Some("my second task".into()),
timestamp: Utc::now(),
})
.unwrap();
// and on db2, update a property of the task
db2.apply(Operation::Update {
uuid,
property: "project".into(),
value: Some("personal".into()),
timestamp: Utc::now(),
})
.unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
#[derive(Debug)]
enum Action {
Op(Operation),
@ -595,7 +223,7 @@ mod tests {
println!(" {:?} (ignored)", e);
}
},
Action::Sync => db.sync(&mut server).unwrap(),
Action::Sync => db.sync(&mut server, false).unwrap(),
}
}

View file

@ -35,3 +35,199 @@ pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &Operation) -> anyhow::Resu
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::taskdb::TaskDb;
use chrono::Utc;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn test_apply_create() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op)?;
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
Ok(())
}
#[test]
fn test_apply_create_exists() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op)?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
txn.commit()?;
}
// first op was applied
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]);
Ok(())
}
#[test]
fn test_apply_create_update() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
txn.commit()?;
}
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("title".into(), "my task".into())])]
);
Ok(())
}
#[test]
fn test_apply_create_update_delete_prop() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
txn.commit()?;
}
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
let op3 = Operation::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op3)?;
txn.commit()?;
}
let op4 = Operation::Update {
uuid,
property: String::from("title"),
value: None,
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op4)?;
txn.commit()?;
}
let mut exp = HashMap::new();
let mut task = HashMap::new();
task.insert(String::from("priority"), String::from("H"));
exp.insert(uuid, task);
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("priority".into(), "H".into())])]
);
Ok(())
}
#[test]
fn test_apply_update_does_not_exist() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_apply_create_delete() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
let op2 = Operation::Delete { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![]);
Ok(())
}
#[test]
fn test_apply_delete_not_present() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Delete { uuid };
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
}

View file

@ -0,0 +1,186 @@
use crate::storage::{StorageTxn, TaskMap, VersionId};
use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression};
use serde::de::{Deserialize, Deserializer, MapAccess, Visitor};
use serde::ser::{Serialize, SerializeMap, Serializer};
use std::fmt;
use uuid::Uuid;
/// A newtype to wrap the result of [`crate::storage::StorageTxn::all_tasks`]
pub(super) struct SnapshotTasks(Vec<(Uuid, TaskMap)>);
impl Serialize for SnapshotTasks {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (k, v) in &self.0 {
map.serialize_entry(k, v)?;
}
map.end()
}
}
struct TaskDbVisitor;
impl<'de> Visitor<'de> for TaskDbVisitor {
type Value = SnapshotTasks;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a map representing a task snapshot")
}
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut map = SnapshotTasks(Vec::with_capacity(access.size_hint().unwrap_or(0)));
while let Some((key, value)) = access.next_entry()? {
map.0.push((key, value));
}
Ok(map)
}
}
impl<'de> Deserialize<'de> for SnapshotTasks {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_map(TaskDbVisitor)
}
}
impl SnapshotTasks {
pub(super) fn encode(&self) -> anyhow::Result<Vec<u8>> {
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
serde_json::to_writer(&mut encoder, &self)?;
Ok(encoder.finish()?)
}
pub(super) fn decode(snapshot: &[u8]) -> anyhow::Result<Self> {
let decoder = ZlibDecoder::new(snapshot);
Ok(serde_json::from_reader(decoder)?)
}
pub(super) fn into_inner(self) -> Vec<(Uuid, TaskMap)> {
self.0
}
}
#[allow(dead_code)]
/// Generate a snapshot (compressed, unencrypted) for the current state of the taskdb in the given
/// storage.
pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result<Vec<u8>> {
let all_tasks = SnapshotTasks(txn.all_tasks()?);
all_tasks.encode()
}
#[allow(dead_code)]
/// Apply the given snapshot (compressed, unencrypted) to the taskdb's storage.
pub(super) fn apply_snapshot(
txn: &mut dyn StorageTxn,
version: VersionId,
snapshot: &[u8],
) -> anyhow::Result<()> {
let all_tasks = SnapshotTasks::decode(snapshot)?;
// first, verify that the taskdb truly is empty
let mut empty = true;
empty = empty && txn.all_tasks()?.is_empty();
empty = empty && txn.get_working_set()? == vec![None];
empty = empty && txn.base_version()? == Uuid::nil();
empty = empty && txn.operations()?.is_empty();
if !empty {
anyhow::bail!("Cannot apply snapshot to a non-empty task database");
}
for (uuid, task) in all_tasks.into_inner().drain(..) {
txn.set_task(uuid, task)?;
}
txn.set_base_version(version)?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::{InMemoryStorage, Storage, TaskMap};
use pretty_assertions::assert_eq;
#[test]
fn test_serialize_empty() -> anyhow::Result<()> {
let empty = SnapshotTasks(vec![]);
assert_eq!(serde_json::to_vec(&empty)?, b"{}".to_owned());
Ok(())
}
#[test]
fn test_serialize_tasks() -> anyhow::Result<()> {
let u = Uuid::new_v4();
let m: TaskMap = vec![("description".to_owned(), "my task".to_owned())]
.drain(..)
.collect();
let all_tasks = SnapshotTasks(vec![(u, m)]);
assert_eq!(
serde_json::to_vec(&all_tasks)?,
format!("{{\"{}\":{{\"description\":\"my task\"}}}}", u).into_bytes(),
);
Ok(())
}
#[test]
fn test_round_trip() -> anyhow::Result<()> {
let mut storage = InMemoryStorage::new();
let version = Uuid::new_v4();
let task1 = (
Uuid::new_v4(),
vec![("description".to_owned(), "one".to_owned())]
.drain(..)
.collect::<TaskMap>(),
);
let task2 = (
Uuid::new_v4(),
vec![("description".to_owned(), "two".to_owned())]
.drain(..)
.collect::<TaskMap>(),
);
{
let mut txn = storage.txn()?;
txn.set_task(task1.0, task1.1.clone())?;
txn.set_task(task2.0, task2.1.clone())?;
txn.commit()?;
}
let snap = {
let mut txn = storage.txn()?;
make_snapshot(txn.as_mut())?
};
// apply that snapshot to a fresh bit of fake
let mut storage = InMemoryStorage::new();
{
let mut txn = storage.txn()?;
apply_snapshot(txn.as_mut(), version, &snap)?;
txn.commit()?
}
{
let mut txn = storage.txn()?;
assert_eq!(txn.get_task(task1.0)?, Some(task1.1));
assert_eq!(txn.get_task(task2.0)?, Some(task2.1));
assert_eq!(txn.all_tasks()?.len(), 2);
assert_eq!(txn.base_version()?, version);
assert_eq!(txn.operations()?.len(), 0);
assert_eq!(txn.get_working_set()?.len(), 1);
}
Ok(())
}
}

View file

@ -1,5 +1,5 @@
use super::ops;
use crate::server::{AddVersionResult, GetVersionResult, Server};
use super::{ops, snapshot};
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency};
use crate::storage::{Operation, StorageTxn};
use log::{info, trace, warn};
use serde::{Deserialize, Serialize};
@ -11,7 +11,11 @@ struct Version {
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub(super) fn sync(server: &mut Box<dyn Server>, txn: &mut dyn StorageTxn) -> anyhow::Result<()> {
pub(super) fn sync(
server: &mut Box<dyn Server>,
txn: &mut dyn StorageTxn,
avoid_snapshots: bool,
) -> anyhow::Result<()> {
// retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server). If the server insists on the same base
// version twice, then we have diverged.
@ -57,11 +61,24 @@ pub(super) fn sync(server: &mut Box<dyn Server>, txn: &mut dyn StorageTxn) -> an
let new_version = Version { operations };
let history_segment = serde_json::to_string(&new_version).unwrap().into();
info!("sending new version to server");
match server.add_version(base_version_id, history_segment)? {
let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?;
match res {
AddVersionResult::Ok(new_version_id) => {
info!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?;
txn.set_operations(vec![])?;
// make a snapshot if the server indicates it is urgent enough
let base_urgency = if avoid_snapshots {
SnapshotUrgency::High
} else {
SnapshotUrgency::Low
};
if snapshot_urgency >= base_urgency {
let snapshot = snapshot::make_snapshot(txn)?;
server.add_snapshot(new_version_id, snapshot)?;
}
break;
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
@ -143,3 +160,188 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu
txn.set_operations(local_operations)?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::server::test::TestServer;
use crate::storage::{InMemoryStorage, Operation};
use crate::taskdb::{snapshot::SnapshotTasks, TaskDb};
use chrono::Utc;
use pretty_assertions::assert_eq;
use uuid::Uuid;
fn newdb() -> TaskDb {
TaskDb::new(Box::new(InMemoryStorage::new()))
}
#[test]
fn test_sync() -> anyhow::Result<()> {
let mut server: Box<dyn Server> = TestServer::new().server();
let mut db1 = newdb();
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
let mut db2 = newdb();
sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
// make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4();
db1.apply(Operation::Create { uuid: uuid1 }).unwrap();
db1.apply(Operation::Update {
uuid: uuid1,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();
let uuid2 = Uuid::new_v4();
db2.apply(Operation::Create { uuid: uuid2 }).unwrap();
db2.apply(Operation::Update {
uuid: uuid2,
property: "title".into(),
value: Some("my second task".into()),
timestamp: Utc::now(),
})
.unwrap();
// and synchronize those around
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// now make updates to the same task on both sides
db1.apply(Operation::Update {
uuid: uuid2,
property: "priority".into(),
value: Some("H".into()),
timestamp: Utc::now(),
})
.unwrap();
db2.apply(Operation::Update {
uuid: uuid2,
property: "project".into(),
value: Some("personal".into()),
timestamp: Utc::now(),
})
.unwrap();
// and synchronize those around
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
Ok(())
}
#[test]
fn test_sync_create_delete() -> anyhow::Result<()> {
let mut server: Box<dyn Server> = TestServer::new().server();
let mut db1 = newdb();
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
let mut db2 = newdb();
sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
// create and update a task..
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();
// and synchronize those around
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// delete and re-create the task on db1
db1.apply(Operation::Delete { uuid }).unwrap();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
uuid,
property: "title".into(),
value: Some("my second task".into()),
timestamp: Utc::now(),
})
.unwrap();
// and on db2, update a property of the task
db2.apply(Operation::Update {
uuid,
property: "project".into(),
value: Some("personal".into()),
timestamp: Utc::now(),
})
.unwrap();
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
Ok(())
}
#[test]
fn test_sync_adds_snapshot() -> anyhow::Result<()> {
let test_server = TestServer::new();
let mut server: Box<dyn Server> = test_server.server();
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();
test_server.set_snapshot_urgency(SnapshotUrgency::High);
sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
// assert that a snapshot was added
let base_version = db1.storage.txn()?.base_version()?;
let (v, s) = test_server
.snapshot()
.ok_or_else(|| anyhow::anyhow!("no snapshot"))?;
assert_eq!(v, base_version);
let tasks = SnapshotTasks::decode(&s)?.into_inner();
assert_eq!(tasks[0].0, uuid);
Ok(())
}
#[test]
fn test_sync_avoids_snapshot() -> anyhow::Result<()> {
let test_server = TestServer::new();
let mut server: Box<dyn Server> = test_server.server();
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
test_server.set_snapshot_urgency(SnapshotUrgency::Low);
sync(&mut server, db1.storage.txn()?.as_mut(), true).unwrap();
// assert that a snapshot was not added, because we indicated
// we wanted to avoid snapshots and it was only low urgency
assert_eq!(test_server.snapshot(), None);
Ok(())
}
}

View file

@ -0,0 +1,167 @@
use crate::storage::{StorageTxn, TaskMap};
use std::collections::HashSet;
/// Rebuild the working set using a function to identify tasks that should be in the set. This
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
/// are not already in the working set but should be. The rebuild occurs in a single
/// trasnsaction against the storage backend.
pub fn rebuild<F>(txn: &mut dyn StorageTxn, in_working_set: F, renumber: bool) -> anyhow::Result<()>
where
F: Fn(&TaskMap) -> bool,
{
let mut new_ws = vec![None]; // index 0 is always None
let mut seen = HashSet::new();
// The goal here is for existing working-set items to be "compressed' down to index 1, so
// we begin by scanning the current working set and inserting any tasks that should still
// be in the set into new_ws, implicitly dropping any tasks that are no longer in the
// working set.
for elt in txn.get_working_set()?.drain(1..) {
if let Some(uuid) = elt {
if let Some(task) = txn.get_task(uuid)? {
if in_working_set(&task) {
new_ws.push(Some(uuid));
seen.insert(uuid);
continue;
}
}
}
// if we are not renumbering, then insert a blank working-set entry here
if !renumber {
new_ws.push(None);
}
}
// if renumbering, clear the working set and re-add
if renumber {
txn.clear_working_set()?;
for elt in new_ws.drain(1..new_ws.len()).flatten() {
txn.add_to_working_set(elt)?;
}
} else {
// ..otherwise, just clear the None items determined above from the working set
for (i, elt) in new_ws.iter().enumerate().skip(1) {
if elt.is_none() {
txn.set_working_set_item(i, None)?;
}
}
}
// Now go hunting for tasks that should be in this list but are not, adding them at the
// end of the list, whether renumbering or not
for (uuid, task) in txn.all_tasks()? {
if !seen.contains(&uuid) && in_working_set(&task) {
txn.add_to_working_set(uuid)?;
}
}
txn.commit()?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::Operation;
use crate::taskdb::TaskDb;
use chrono::Utc;
use uuid::Uuid;
#[test]
fn rebuild_working_set_renumber() -> anyhow::Result<()> {
rebuild_working_set(true)
}
#[test]
fn rebuild_working_set_no_renumber() -> anyhow::Result<()> {
rebuild_working_set(false)
}
fn rebuild_working_set(renumber: bool) -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let mut uuids = vec![];
uuids.push(Uuid::new_v4());
println!("uuids[0]: {:?} - pending, not in working set", uuids[0]);
uuids.push(Uuid::new_v4());
println!("uuids[1]: {:?} - pending, in working set", uuids[1]);
uuids.push(Uuid::new_v4());
println!("uuids[2]: {:?} - not pending, not in working set", uuids[2]);
uuids.push(Uuid::new_v4());
println!("uuids[3]: {:?} - not pending, in working set", uuids[3]);
uuids.push(Uuid::new_v4());
println!("uuids[4]: {:?} - pending, in working set", uuids[4]);
// add everything to the TaskDb
for uuid in &uuids {
db.apply(Operation::Create { uuid: *uuid })?;
}
for i in &[0usize, 1, 4] {
db.apply(Operation::Update {
uuid: uuids[*i].clone(),
property: String::from("status"),
value: Some("pending".into()),
timestamp: Utc::now(),
})?;
}
// set the existing working_set as we want it
{
let mut txn = db.storage.txn()?;
txn.clear_working_set()?;
for i in &[1usize, 3, 4] {
txn.add_to_working_set(uuids[*i])?;
}
txn.commit()?;
}
assert_eq!(
db.working_set()?,
vec![
None,
Some(uuids[1].clone()),
Some(uuids[3].clone()),
Some(uuids[4].clone())
]
);
rebuild(
db.storage.txn()?.as_mut(),
|t| {
if let Some(status) = t.get("status") {
status == "pending"
} else {
false
}
},
renumber,
)?;
let exp = if renumber {
// uuids[1] and uuids[4] are already in the working set, so are compressed
// to the top, and then uuids[0] is added.
vec![
None,
Some(uuids[1].clone()),
Some(uuids[4].clone()),
Some(uuids[0].clone()),
]
} else {
// uuids[1] and uuids[4] are already in the working set, at indexes 1 and 3,
// and then uuids[0] is added.
vec![
None,
Some(uuids[1].clone()),
None,
Some(uuids[4].clone()),
Some(uuids[0].clone()),
]
};
assert_eq!(db.working_set()?, exp);
Ok(())
}
}