mirror of
https://github.com/GothenburgBitFactory/taskwarrior.git
synced 2025-08-20 04:13:07 +02:00
parent
755100fd8a
commit
c061d926bb
21 changed files with 264 additions and 254 deletions
|
@ -1,9 +1,13 @@
|
||||||
|
use std::io;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
#[derive(Debug, Error, Eq, PartialEq, Clone)]
|
#[derive(Debug, Error)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
/// Errors returned from taskchampion operations
|
/// Errors returned from taskchampion operations
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
/// A crypto-related error
|
||||||
|
#[error("Crypto Error: {0}")]
|
||||||
|
Crypto(String),
|
||||||
/// A task-database-related error
|
/// A task-database-related error
|
||||||
#[error("Task Database Error: {0}")]
|
#[error("Task Database Error: {0}")]
|
||||||
Database(String),
|
Database(String),
|
||||||
|
@ -12,4 +16,23 @@ pub enum Error {
|
||||||
/// other irrecoverable error.
|
/// other irrecoverable error.
|
||||||
#[error("Local replica is out of sync with the server")]
|
#[error("Local replica is out of sync with the server")]
|
||||||
OutOfSync,
|
OutOfSync,
|
||||||
|
/// A usage error
|
||||||
|
#[error("User Error: {0}")]
|
||||||
|
UserError(String),
|
||||||
|
|
||||||
|
/// Error conversions.
|
||||||
|
#[error(transparent)]
|
||||||
|
Http(#[from] ureq::Error),
|
||||||
|
#[error(transparent)]
|
||||||
|
Io(#[from] io::Error),
|
||||||
|
#[error(transparent)]
|
||||||
|
Json(#[from] serde_json::Error),
|
||||||
|
#[error(transparent)]
|
||||||
|
Other(#[from] anyhow::Error),
|
||||||
|
#[error("Third Party Sqlite Error")]
|
||||||
|
Rusqlite(#[from] rusqlite::Error),
|
||||||
|
#[error(transparent)]
|
||||||
|
Sqlite(#[from] crate::storage::sqlite::SqliteError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type Result<T> = core::result::Result<T, Error>;
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use crate::depmap::DependencyMap;
|
use crate::depmap::DependencyMap;
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::server::{Server, SyncOp};
|
use crate::server::{Server, SyncOp};
|
||||||
use crate::storage::{Storage, TaskMap};
|
use crate::storage::{Storage, TaskMap};
|
||||||
use crate::task::{Status, Task};
|
use crate::task::{Status, Task};
|
||||||
|
@ -63,7 +64,7 @@ impl Replica {
|
||||||
uuid: Uuid,
|
uuid: Uuid,
|
||||||
property: S1,
|
property: S1,
|
||||||
value: Option<S2>,
|
value: Option<S2>,
|
||||||
) -> anyhow::Result<TaskMap>
|
) -> Result<TaskMap>
|
||||||
where
|
where
|
||||||
S1: Into<String>,
|
S1: Into<String>,
|
||||||
S2: Into<String>,
|
S2: Into<String>,
|
||||||
|
@ -78,12 +79,12 @@ impl Replica {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add the given uuid to the working set, returning its index.
|
/// Add the given uuid to the working set, returning its index.
|
||||||
pub(crate) fn add_to_working_set(&mut self, uuid: Uuid) -> anyhow::Result<usize> {
|
pub(crate) fn add_to_working_set(&mut self, uuid: Uuid) -> Result<usize> {
|
||||||
self.taskdb.add_to_working_set(uuid)
|
self.taskdb.add_to_working_set(uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get all tasks represented as a map keyed by UUID
|
/// Get all tasks represented as a map keyed by UUID
|
||||||
pub fn all_tasks(&mut self) -> anyhow::Result<HashMap<Uuid, Task>> {
|
pub fn all_tasks(&mut self) -> Result<HashMap<Uuid, Task>> {
|
||||||
let depmap = self.dependency_map(false)?;
|
let depmap = self.dependency_map(false)?;
|
||||||
let mut res = HashMap::new();
|
let mut res = HashMap::new();
|
||||||
for (uuid, tm) in self.taskdb.all_tasks()?.drain(..) {
|
for (uuid, tm) in self.taskdb.all_tasks()?.drain(..) {
|
||||||
|
@ -93,13 +94,13 @@ impl Replica {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the UUIDs of all tasks
|
/// Get the UUIDs of all tasks
|
||||||
pub fn all_task_uuids(&mut self) -> anyhow::Result<Vec<Uuid>> {
|
pub fn all_task_uuids(&mut self) -> Result<Vec<Uuid>> {
|
||||||
self.taskdb.all_task_uuids()
|
self.taskdb.all_task_uuids()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the "working set" for this replica. This is a snapshot of the current state,
|
/// Get the "working set" for this replica. This is a snapshot of the current state,
|
||||||
/// and it is up to the caller to decide how long to store this value.
|
/// and it is up to the caller to decide how long to store this value.
|
||||||
pub fn working_set(&mut self) -> anyhow::Result<WorkingSet> {
|
pub fn working_set(&mut self) -> Result<WorkingSet> {
|
||||||
Ok(WorkingSet::new(self.taskdb.working_set()?))
|
Ok(WorkingSet::new(self.taskdb.working_set()?))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +112,7 @@ impl Replica {
|
||||||
///
|
///
|
||||||
/// If `force` is true, then the result is re-calculated from the current state of the replica,
|
/// If `force` is true, then the result is re-calculated from the current state of the replica,
|
||||||
/// although previously-returned dependency maps are not updated.
|
/// although previously-returned dependency maps are not updated.
|
||||||
pub fn dependency_map(&mut self, force: bool) -> anyhow::Result<Rc<DependencyMap>> {
|
pub fn dependency_map(&mut self, force: bool) -> Result<Rc<DependencyMap>> {
|
||||||
if force || self.depmap.is_none() {
|
if force || self.depmap.is_none() {
|
||||||
let mut dm = DependencyMap::new();
|
let mut dm = DependencyMap::new();
|
||||||
let ws = self.working_set()?;
|
let ws = self.working_set()?;
|
||||||
|
@ -138,7 +139,7 @@ impl Replica {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an existing task by its UUID
|
/// Get an existing task by its UUID
|
||||||
pub fn get_task(&mut self, uuid: Uuid) -> anyhow::Result<Option<Task>> {
|
pub fn get_task(&mut self, uuid: Uuid) -> Result<Option<Task>> {
|
||||||
let depmap = self.dependency_map(false)?;
|
let depmap = self.dependency_map(false)?;
|
||||||
Ok(self
|
Ok(self
|
||||||
.taskdb
|
.taskdb
|
||||||
|
@ -147,7 +148,7 @@ impl Replica {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new task.
|
/// Create a new task.
|
||||||
pub fn new_task(&mut self, status: Status, description: String) -> anyhow::Result<Task> {
|
pub fn new_task(&mut self, status: Status, description: String) -> Result<Task> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
self.add_undo_point(false)?;
|
self.add_undo_point(false)?;
|
||||||
let taskmap = self.taskdb.apply(SyncOp::Create { uuid })?;
|
let taskmap = self.taskdb.apply(SyncOp::Create { uuid })?;
|
||||||
|
@ -163,7 +164,7 @@ impl Replica {
|
||||||
/// Create a new, empty task with the given UUID. This is useful for importing tasks, but
|
/// Create a new, empty task with the given UUID. This is useful for importing tasks, but
|
||||||
/// otherwise should be avoided in favor of `new_task`. If the task already exists, this
|
/// otherwise should be avoided in favor of `new_task`. If the task already exists, this
|
||||||
/// does nothing and returns the existing task.
|
/// does nothing and returns the existing task.
|
||||||
pub fn import_task_with_uuid(&mut self, uuid: Uuid) -> anyhow::Result<Task> {
|
pub fn import_task_with_uuid(&mut self, uuid: Uuid) -> Result<Task> {
|
||||||
self.add_undo_point(false)?;
|
self.add_undo_point(false)?;
|
||||||
let taskmap = self.taskdb.apply(SyncOp::Create { uuid })?;
|
let taskmap = self.taskdb.apply(SyncOp::Create { uuid })?;
|
||||||
let depmap = self.dependency_map(false)?;
|
let depmap = self.dependency_map(false)?;
|
||||||
|
@ -173,7 +174,7 @@ impl Replica {
|
||||||
/// Delete a task. The task must exist. Note that this is different from setting status to
|
/// Delete a task. The task must exist. Note that this is different from setting status to
|
||||||
/// Deleted; this is the final purge of the task. This is not a public method as deletion
|
/// Deleted; this is the final purge of the task. This is not a public method as deletion
|
||||||
/// should only occur through expiration.
|
/// should only occur through expiration.
|
||||||
fn delete_task(&mut self, uuid: Uuid) -> anyhow::Result<()> {
|
fn delete_task(&mut self, uuid: Uuid) -> Result<()> {
|
||||||
self.add_undo_point(false)?;
|
self.add_undo_point(false)?;
|
||||||
self.taskdb.apply(SyncOp::Delete { uuid })?;
|
self.taskdb.apply(SyncOp::Delete { uuid })?;
|
||||||
trace!("task {} deleted", uuid);
|
trace!("task {} deleted", uuid);
|
||||||
|
@ -190,11 +191,7 @@ impl Replica {
|
||||||
///
|
///
|
||||||
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
|
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
|
||||||
/// system
|
/// system
|
||||||
pub fn sync(
|
pub fn sync(&mut self, server: &mut Box<dyn Server>, avoid_snapshots: bool) -> Result<()> {
|
||||||
&mut self,
|
|
||||||
server: &mut Box<dyn Server>,
|
|
||||||
avoid_snapshots: bool,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
self.taskdb
|
self.taskdb
|
||||||
.sync(server, avoid_snapshots)
|
.sync(server, avoid_snapshots)
|
||||||
.context("Failed to synchronize with server")?;
|
.context("Failed to synchronize with server")?;
|
||||||
|
@ -205,7 +202,7 @@ impl Replica {
|
||||||
|
|
||||||
/// Undo local operations until the most recent UndoPoint, returning false if there are no
|
/// Undo local operations until the most recent UndoPoint, returning false if there are no
|
||||||
/// local operations to undo.
|
/// local operations to undo.
|
||||||
pub fn undo(&mut self) -> anyhow::Result<bool> {
|
pub fn undo(&mut self) -> Result<bool> {
|
||||||
self.taskdb.undo()
|
self.taskdb.undo()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +210,7 @@ impl Replica {
|
||||||
/// `renumber` is true, then existing tasks may be moved to new working-set indices; in any
|
/// `renumber` is true, then existing tasks may be moved to new working-set indices; in any
|
||||||
/// case, on completion all pending and recurring tasks are in the working set and all tasks
|
/// case, on completion all pending and recurring tasks are in the working set and all tasks
|
||||||
/// with other statuses are not.
|
/// with other statuses are not.
|
||||||
pub fn rebuild_working_set(&mut self, renumber: bool) -> anyhow::Result<()> {
|
pub fn rebuild_working_set(&mut self, renumber: bool) -> Result<()> {
|
||||||
let pending = String::from(Status::Pending.to_taskmap());
|
let pending = String::from(Status::Pending.to_taskmap());
|
||||||
let recurring = String::from(Status::Recurring.to_taskmap());
|
let recurring = String::from(Status::Recurring.to_taskmap());
|
||||||
self.taskdb.rebuild_working_set(
|
self.taskdb.rebuild_working_set(
|
||||||
|
@ -236,7 +233,7 @@ impl Replica {
|
||||||
///
|
///
|
||||||
/// Tasks are eligible for expiration when they have status Deleted and have not been modified
|
/// Tasks are eligible for expiration when they have status Deleted and have not been modified
|
||||||
/// for 180 days (about six months). Note that completed tasks are not eligible.
|
/// for 180 days (about six months). Note that completed tasks are not eligible.
|
||||||
pub fn expire_tasks(&mut self) -> anyhow::Result<()> {
|
pub fn expire_tasks(&mut self) -> Result<()> {
|
||||||
let six_mos_ago = Utc::now() - Duration::days(180);
|
let six_mos_ago = Utc::now() - Duration::days(180);
|
||||||
self.all_tasks()?
|
self.all_tasks()?
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -256,7 +253,7 @@ impl Replica {
|
||||||
/// automatically when a change is made. The `force` flag allows forcing a new UndoPoint
|
/// automatically when a change is made. The `force` flag allows forcing a new UndoPoint
|
||||||
/// even if one has already been created by this Replica, and may be useful when a Replica
|
/// even if one has already been created by this Replica, and may be useful when a Replica
|
||||||
/// instance is held for a long time and used to apply more than one user-visible change.
|
/// instance is held for a long time and used to apply more than one user-visible change.
|
||||||
pub fn add_undo_point(&mut self, force: bool) -> anyhow::Result<()> {
|
pub fn add_undo_point(&mut self, force: bool) -> Result<()> {
|
||||||
if force || !self.added_undo_point {
|
if force || !self.added_undo_point {
|
||||||
self.taskdb.add_undo_point()?;
|
self.taskdb.add_undo_point()?;
|
||||||
self.added_undo_point = true;
|
self.added_undo_point = true;
|
||||||
|
@ -265,12 +262,12 @@ impl Replica {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the number of operations local to this replica and not yet synchronized to the server.
|
/// Get the number of operations local to this replica and not yet synchronized to the server.
|
||||||
pub fn num_local_operations(&mut self) -> anyhow::Result<usize> {
|
pub fn num_local_operations(&mut self) -> Result<usize> {
|
||||||
self.taskdb.num_operations()
|
self.taskdb.num_operations()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the number of undo points available (number of times `undo` will succeed).
|
/// Get the number of undo points available (number of times `undo` will succeed).
|
||||||
pub fn num_undo_points(&mut self) -> anyhow::Result<usize> {
|
pub fn num_undo_points(&mut self) -> Result<usize> {
|
||||||
self.taskdb.num_undo_points()
|
self.taskdb.num_undo_points()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use super::types::Server;
|
use super::types::Server;
|
||||||
use super::{LocalServer, RemoteServer};
|
use super::{LocalServer, RemoteServer};
|
||||||
|
use crate::errors::Result;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
@ -26,7 +27,7 @@ pub enum ServerConfig {
|
||||||
|
|
||||||
impl ServerConfig {
|
impl ServerConfig {
|
||||||
/// Get a server based on this configuration
|
/// Get a server based on this configuration
|
||||||
pub fn into_server(self) -> anyhow::Result<Box<dyn Server>> {
|
pub fn into_server(self) -> Result<Box<dyn Server>> {
|
||||||
Ok(match self {
|
Ok(match self {
|
||||||
ServerConfig::Local { server_dir } => Box::new(LocalServer::new(server_dir)?),
|
ServerConfig::Local { server_dir } => Box::new(LocalServer::new(server_dir)?),
|
||||||
ServerConfig::Remote {
|
ServerConfig::Remote {
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
/// This module implements the encryption specified in the sync-protocol
|
/// This module implements the encryption specified in the sync-protocol
|
||||||
/// document.
|
/// document.
|
||||||
|
use crate::errors::{Error, Result};
|
||||||
use ring::{aead, digest, pbkdf2, rand, rand::SecureRandom};
|
use ring::{aead, digest, pbkdf2, rand, rand::SecureRandom};
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
@ -18,7 +19,7 @@ pub(super) struct Cryptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Cryptor {
|
impl Cryptor {
|
||||||
pub(super) fn new(client_key: Uuid, secret: &Secret) -> anyhow::Result<Self> {
|
pub(super) fn new(client_key: Uuid, secret: &Secret) -> Result<Self> {
|
||||||
Ok(Cryptor {
|
Ok(Cryptor {
|
||||||
key: Self::derive_key(client_key, secret)?,
|
key: Self::derive_key(client_key, secret)?,
|
||||||
rng: rand::SystemRandom::new(),
|
rng: rand::SystemRandom::new(),
|
||||||
|
@ -26,7 +27,7 @@ impl Cryptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Derive a key as specified for version 1. Note that this may take 10s of ms.
|
/// Derive a key as specified for version 1. Note that this may take 10s of ms.
|
||||||
fn derive_key(client_key: Uuid, secret: &Secret) -> anyhow::Result<aead::LessSafeKey> {
|
fn derive_key(client_key: Uuid, secret: &Secret) -> Result<aead::LessSafeKey> {
|
||||||
let salt = digest::digest(&digest::SHA256, client_key.as_bytes());
|
let salt = digest::digest(&digest::SHA256, client_key.as_bytes());
|
||||||
|
|
||||||
let mut key_bytes = vec![0u8; aead::CHACHA20_POLY1305.key_len()];
|
let mut key_bytes = vec![0u8; aead::CHACHA20_POLY1305.key_len()];
|
||||||
|
@ -44,7 +45,7 @@ impl Cryptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Encrypt the given payload.
|
/// Encrypt the given payload.
|
||||||
pub(super) fn seal(&self, payload: Unsealed) -> anyhow::Result<Sealed> {
|
pub(super) fn seal(&self, payload: Unsealed) -> Result<Sealed> {
|
||||||
let Unsealed {
|
let Unsealed {
|
||||||
version_id,
|
version_id,
|
||||||
mut payload,
|
mut payload,
|
||||||
|
@ -76,7 +77,7 @@ impl Cryptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decrypt the given payload, verifying it was created for the given version_id
|
/// Decrypt the given payload, verifying it was created for the given version_id
|
||||||
pub(super) fn unseal(&self, payload: Sealed) -> anyhow::Result<Unsealed> {
|
pub(super) fn unseal(&self, payload: Sealed) -> Result<Unsealed> {
|
||||||
let Sealed {
|
let Sealed {
|
||||||
version_id,
|
version_id,
|
||||||
payload,
|
payload,
|
||||||
|
@ -133,14 +134,17 @@ struct Envelope<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Envelope<'a> {
|
impl<'a> Envelope<'a> {
|
||||||
fn from_bytes(buf: &'a [u8]) -> anyhow::Result<Envelope<'a>> {
|
fn from_bytes(buf: &'a [u8]) -> Result<Envelope<'a>> {
|
||||||
if buf.len() <= 1 + aead::NONCE_LEN {
|
if buf.len() <= 1 + aead::NONCE_LEN {
|
||||||
anyhow::bail!("envelope is too small");
|
return Err(Error::Crypto(String::from("envelope is too small")));
|
||||||
}
|
}
|
||||||
|
|
||||||
let version = buf[0];
|
let version = buf[0];
|
||||||
if version != ENVELOPE_VERSION {
|
if version != ENVELOPE_VERSION {
|
||||||
anyhow::bail!("unrecognized encryption envelope version {}", version);
|
return Err(Error::Crypto(format!(
|
||||||
|
"unrecognized encryption envelope version {}",
|
||||||
|
version
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Envelope {
|
Ok(Envelope {
|
||||||
|
@ -177,7 +181,7 @@ impl Sealed {
|
||||||
resp: ureq::Response,
|
resp: ureq::Response,
|
||||||
version_id: Uuid,
|
version_id: Uuid,
|
||||||
content_type: &str,
|
content_type: &str,
|
||||||
) -> Result<Sealed, anyhow::Error> {
|
) -> Result<Sealed> {
|
||||||
if resp.header("Content-Type") == Some(content_type) {
|
if resp.header("Content-Type") == Some(content_type) {
|
||||||
let mut reader = resp.into_reader();
|
let mut reader = resp.into_reader();
|
||||||
let mut payload = vec![];
|
let mut payload = vec![];
|
||||||
|
@ -187,9 +191,9 @@ impl Sealed {
|
||||||
payload,
|
payload,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
Err(anyhow::anyhow!(
|
Err(Error::Crypto(String::from(
|
||||||
"Response did not have expected content-type"
|
"Response did not have expected content-type",
|
||||||
))
|
)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::server::{
|
use crate::server::{
|
||||||
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
|
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
|
||||||
VersionId, NIL_VERSION_ID,
|
VersionId, NIL_VERSION_ID,
|
||||||
|
@ -22,13 +23,13 @@ pub struct LocalServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LocalServer {
|
impl LocalServer {
|
||||||
fn txn(&mut self) -> anyhow::Result<rusqlite::Transaction> {
|
fn txn(&mut self) -> Result<rusqlite::Transaction> {
|
||||||
let txn = self.con.transaction()?;
|
let txn = self.con.transaction()?;
|
||||||
Ok(txn)
|
Ok(txn)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A server which has no notion of clients, signatures, encryption, etc.
|
/// A server which has no notion of clients, signatures, encryption, etc.
|
||||||
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<LocalServer> {
|
pub fn new<P: AsRef<Path>>(directory: P) -> Result<LocalServer> {
|
||||||
let db_file = directory
|
let db_file = directory
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.join("taskchampion-local-sync-server.sqlite3");
|
.join("taskchampion-local-sync-server.sqlite3");
|
||||||
|
@ -45,7 +46,7 @@ impl LocalServer {
|
||||||
Ok(LocalServer { con })
|
Ok(LocalServer { con })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_latest_version_id(&mut self) -> anyhow::Result<VersionId> {
|
fn get_latest_version_id(&mut self) -> Result<VersionId> {
|
||||||
let t = self.txn()?;
|
let t = self.txn()?;
|
||||||
let result: Option<StoredUuid> = t
|
let result: Option<StoredUuid> = t
|
||||||
.query_row(
|
.query_row(
|
||||||
|
@ -57,7 +58,7 @@ impl LocalServer {
|
||||||
Ok(result.map(|x| x.0).unwrap_or(NIL_VERSION_ID))
|
Ok(result.map(|x| x.0).unwrap_or(NIL_VERSION_ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_latest_version_id(&mut self, version_id: VersionId) -> anyhow::Result<()> {
|
fn set_latest_version_id(&mut self, version_id: VersionId) -> Result<()> {
|
||||||
let t = self.txn()?;
|
let t = self.txn()?;
|
||||||
t.execute(
|
t.execute(
|
||||||
"INSERT OR REPLACE INTO data (key, value) VALUES ('latest_version_id', ?)",
|
"INSERT OR REPLACE INTO data (key, value) VALUES ('latest_version_id', ?)",
|
||||||
|
@ -71,7 +72,7 @@ impl LocalServer {
|
||||||
fn get_version_by_parent_version_id(
|
fn get_version_by_parent_version_id(
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_version_id: VersionId,
|
parent_version_id: VersionId,
|
||||||
) -> anyhow::Result<Option<Version>> {
|
) -> Result<Option<Version>> {
|
||||||
let t = self.txn()?;
|
let t = self.txn()?;
|
||||||
let r = t.query_row(
|
let r = t.query_row(
|
||||||
"SELECT version_id, parent_version_id, data FROM versions WHERE parent_version_id = ?",
|
"SELECT version_id, parent_version_id, data FROM versions WHERE parent_version_id = ?",
|
||||||
|
@ -92,7 +93,7 @@ impl LocalServer {
|
||||||
Ok(r)
|
Ok(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_version_by_parent_version_id(&mut self, version: Version) -> anyhow::Result<()> {
|
fn add_version_by_parent_version_id(&mut self, version: Version) -> Result<()> {
|
||||||
let t = self.txn()?;
|
let t = self.txn()?;
|
||||||
t.execute(
|
t.execute(
|
||||||
"INSERT INTO versions (version_id, parent_version_id, data) VALUES (?, ?, ?)",
|
"INSERT INTO versions (version_id, parent_version_id, data) VALUES (?, ?, ?)",
|
||||||
|
@ -115,7 +116,7 @@ impl Server for LocalServer {
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_version_id: VersionId,
|
parent_version_id: VersionId,
|
||||||
history_segment: HistorySegment,
|
history_segment: HistorySegment,
|
||||||
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
|
) -> Result<(AddVersionResult, SnapshotUrgency)> {
|
||||||
// no client lookup
|
// no client lookup
|
||||||
// no signature validation
|
// no signature validation
|
||||||
|
|
||||||
|
@ -141,10 +142,7 @@ impl Server for LocalServer {
|
||||||
Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None))
|
Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_child_version(
|
fn get_child_version(&mut self, parent_version_id: VersionId) -> Result<GetVersionResult> {
|
||||||
&mut self,
|
|
||||||
parent_version_id: VersionId,
|
|
||||||
) -> anyhow::Result<GetVersionResult> {
|
|
||||||
if let Some(version) = self.get_version_by_parent_version_id(parent_version_id)? {
|
if let Some(version) = self.get_version_by_parent_version_id(parent_version_id)? {
|
||||||
Ok(GetVersionResult::Version {
|
Ok(GetVersionResult::Version {
|
||||||
version_id: version.version_id,
|
version_id: version.version_id,
|
||||||
|
@ -156,12 +154,12 @@ impl Server for LocalServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_snapshot(&mut self, _version_id: VersionId, _snapshot: Snapshot) -> anyhow::Result<()> {
|
fn add_snapshot(&mut self, _version_id: VersionId, _snapshot: Snapshot) -> Result<()> {
|
||||||
// the local server never requests a snapshot, so it should never get one
|
// the local server never requests a snapshot, so it should never get one
|
||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>> {
|
fn get_snapshot(&mut self) -> Result<Option<(VersionId, Snapshot)>> {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -173,7 +171,7 @@ mod test {
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_empty() -> anyhow::Result<()> {
|
fn test_empty() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut server = LocalServer::new(tmp_dir.path())?;
|
let mut server = LocalServer::new(tmp_dir.path())?;
|
||||||
let child_version = server.get_child_version(NIL_VERSION_ID)?;
|
let child_version = server.get_child_version(NIL_VERSION_ID)?;
|
||||||
|
@ -182,7 +180,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_add_zero_base() -> anyhow::Result<()> {
|
fn test_add_zero_base() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut server = LocalServer::new(tmp_dir.path())?;
|
let mut server = LocalServer::new(tmp_dir.path())?;
|
||||||
let history = b"1234".to_vec();
|
let history = b"1234".to_vec();
|
||||||
|
@ -207,7 +205,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_add_nonzero_base() -> anyhow::Result<()> {
|
fn test_add_nonzero_base() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut server = LocalServer::new(tmp_dir.path())?;
|
let mut server = LocalServer::new(tmp_dir.path())?;
|
||||||
let history = b"1234".to_vec();
|
let history = b"1234".to_vec();
|
||||||
|
@ -235,7 +233,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_add_nonzero_base_forbidden() -> anyhow::Result<()> {
|
fn test_add_nonzero_base_forbidden() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut server = LocalServer::new(tmp_dir.path())?;
|
let mut server = LocalServer::new(tmp_dir.path())?;
|
||||||
let history = b"1234".to_vec();
|
let history = b"1234".to_vec();
|
||||||
|
|
|
@ -123,6 +123,7 @@ impl SyncOp {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::storage::InMemoryStorage;
|
use crate::storage::InMemoryStorage;
|
||||||
use crate::taskdb::TaskDb;
|
use crate::taskdb::TaskDb;
|
||||||
use chrono::{Duration, Utc};
|
use chrono::{Duration, Utc};
|
||||||
|
@ -130,7 +131,7 @@ mod test {
|
||||||
use proptest::prelude::*;
|
use proptest::prelude::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_create() -> anyhow::Result<()> {
|
fn test_json_create() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let op = Create { uuid };
|
let op = Create { uuid };
|
||||||
let json = serde_json::to_string(&op)?;
|
let json = serde_json::to_string(&op)?;
|
||||||
|
@ -141,7 +142,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_delete() -> anyhow::Result<()> {
|
fn test_json_delete() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let op = Delete { uuid };
|
let op = Delete { uuid };
|
||||||
let json = serde_json::to_string(&op)?;
|
let json = serde_json::to_string(&op)?;
|
||||||
|
@ -152,7 +153,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_update() -> anyhow::Result<()> {
|
fn test_json_update() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let timestamp = Utc::now();
|
let timestamp = Utc::now();
|
||||||
|
|
||||||
|
@ -177,7 +178,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_update_none() -> anyhow::Result<()> {
|
fn test_json_update_none() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let timestamp = Utc::now();
|
let timestamp = Utc::now();
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::server::{
|
use crate::server::{
|
||||||
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
|
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
|
||||||
VersionId,
|
VersionId,
|
||||||
|
@ -31,7 +32,7 @@ impl RemoteServer {
|
||||||
origin: String,
|
origin: String,
|
||||||
client_key: Uuid,
|
client_key: Uuid,
|
||||||
encryption_secret: Vec<u8>,
|
encryption_secret: Vec<u8>,
|
||||||
) -> anyhow::Result<RemoteServer> {
|
) -> Result<RemoteServer> {
|
||||||
Ok(RemoteServer {
|
Ok(RemoteServer {
|
||||||
origin,
|
origin,
|
||||||
client_key,
|
client_key,
|
||||||
|
@ -45,7 +46,7 @@ impl RemoteServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read a UUID-bearing header or fail trying
|
/// Read a UUID-bearing header or fail trying
|
||||||
fn get_uuid_header(resp: &ureq::Response, name: &str) -> anyhow::Result<Uuid> {
|
fn get_uuid_header(resp: &ureq::Response, name: &str) -> Result<Uuid> {
|
||||||
let value = resp
|
let value = resp
|
||||||
.header(name)
|
.header(name)
|
||||||
.ok_or_else(|| anyhow::anyhow!("Response does not have {} header", name))?;
|
.ok_or_else(|| anyhow::anyhow!("Response does not have {} header", name))?;
|
||||||
|
@ -71,7 +72,7 @@ impl Server for RemoteServer {
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_version_id: VersionId,
|
parent_version_id: VersionId,
|
||||||
history_segment: HistorySegment,
|
history_segment: HistorySegment,
|
||||||
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
|
) -> Result<(AddVersionResult, SnapshotUrgency)> {
|
||||||
let url = format!(
|
let url = format!(
|
||||||
"{}/v1/client/add-version/{}",
|
"{}/v1/client/add-version/{}",
|
||||||
self.origin, parent_version_id
|
self.origin, parent_version_id
|
||||||
|
@ -106,10 +107,7 @@ impl Server for RemoteServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_child_version(
|
fn get_child_version(&mut self, parent_version_id: VersionId) -> Result<GetVersionResult> {
|
||||||
&mut self,
|
|
||||||
parent_version_id: VersionId,
|
|
||||||
) -> anyhow::Result<GetVersionResult> {
|
|
||||||
let url = format!(
|
let url = format!(
|
||||||
"{}/v1/client/get-child-version/{}",
|
"{}/v1/client/get-child-version/{}",
|
||||||
self.origin, parent_version_id
|
self.origin, parent_version_id
|
||||||
|
@ -139,7 +137,7 @@ impl Server for RemoteServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> {
|
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> Result<()> {
|
||||||
let url = format!("{}/v1/client/add-snapshot/{}", self.origin, version_id);
|
let url = format!("{}/v1/client/add-snapshot/{}", self.origin, version_id);
|
||||||
let unsealed = Unsealed {
|
let unsealed = Unsealed {
|
||||||
version_id,
|
version_id,
|
||||||
|
@ -155,7 +153,7 @@ impl Server for RemoteServer {
|
||||||
.map(|_| ())?)
|
.map(|_| ())?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>> {
|
fn get_snapshot(&mut self) -> Result<Option<(VersionId, Snapshot)>> {
|
||||||
let url = format!("{}/v1/client/snapshot", self.origin);
|
let url = format!("{}/v1/client/snapshot", self.origin);
|
||||||
match self
|
match self
|
||||||
.agent
|
.agent
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::server::{
|
use crate::server::{
|
||||||
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
|
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
|
||||||
VersionId, NIL_VERSION_ID,
|
VersionId, NIL_VERSION_ID,
|
||||||
|
@ -66,7 +67,7 @@ impl Server for TestServer {
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_version_id: VersionId,
|
parent_version_id: VersionId,
|
||||||
history_segment: HistorySegment,
|
history_segment: HistorySegment,
|
||||||
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
|
) -> Result<(AddVersionResult, SnapshotUrgency)> {
|
||||||
let mut inner = self.0.lock().unwrap();
|
let mut inner = self.0.lock().unwrap();
|
||||||
|
|
||||||
// no client lookup
|
// no client lookup
|
||||||
|
@ -101,10 +102,7 @@ impl Server for TestServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a vector of all versions after `since_version`
|
/// Get a vector of all versions after `since_version`
|
||||||
fn get_child_version(
|
fn get_child_version(&mut self, parent_version_id: VersionId) -> Result<GetVersionResult> {
|
||||||
&mut self,
|
|
||||||
parent_version_id: VersionId,
|
|
||||||
) -> anyhow::Result<GetVersionResult> {
|
|
||||||
let inner = self.0.lock().unwrap();
|
let inner = self.0.lock().unwrap();
|
||||||
|
|
||||||
if let Some(version) = inner.versions.get(&parent_version_id) {
|
if let Some(version) = inner.versions.get(&parent_version_id) {
|
||||||
|
@ -118,7 +116,7 @@ impl Server for TestServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> {
|
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> Result<()> {
|
||||||
let mut inner = self.0.lock().unwrap();
|
let mut inner = self.0.lock().unwrap();
|
||||||
|
|
||||||
// test implementation -- does not perform any validation
|
// test implementation -- does not perform any validation
|
||||||
|
@ -126,7 +124,7 @@ impl Server for TestServer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>> {
|
fn get_snapshot(&mut self) -> Result<Option<(VersionId, Snapshot)>> {
|
||||||
let inner = self.0.lock().unwrap();
|
let inner = self.0.lock().unwrap();
|
||||||
Ok(inner.snapshot.clone())
|
Ok(inner.snapshot.clone())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::Result;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// Versions are referred to with sha2 hashes.
|
/// Versions are referred to with sha2 hashes.
|
||||||
|
@ -55,16 +56,13 @@ pub trait Server {
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_version_id: VersionId,
|
parent_version_id: VersionId,
|
||||||
history_segment: HistorySegment,
|
history_segment: HistorySegment,
|
||||||
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)>;
|
) -> Result<(AddVersionResult, SnapshotUrgency)>;
|
||||||
|
|
||||||
/// Get the version with the given parent VersionId
|
/// Get the version with the given parent VersionId
|
||||||
fn get_child_version(
|
fn get_child_version(&mut self, parent_version_id: VersionId) -> Result<GetVersionResult>;
|
||||||
&mut self,
|
|
||||||
parent_version_id: VersionId,
|
|
||||||
) -> anyhow::Result<GetVersionResult>;
|
|
||||||
|
|
||||||
/// Add a snapshot on the server
|
/// Add a snapshot on the server
|
||||||
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()>;
|
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> Result<()>;
|
||||||
|
|
||||||
fn get_snapshot(&mut self) -> anyhow::Result<Option<(VersionId, Snapshot)>>;
|
fn get_snapshot(&mut self) -> Result<Option<(VersionId, Snapshot)>>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use super::{InMemoryStorage, SqliteStorage, Storage};
|
use super::{InMemoryStorage, SqliteStorage, Storage};
|
||||||
|
use crate::errors::Result;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
/// The configuration required for a replica's storage.
|
/// The configuration required for a replica's storage.
|
||||||
|
@ -16,7 +17,7 @@ pub enum StorageConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StorageConfig {
|
impl StorageConfig {
|
||||||
pub fn into_storage(self) -> anyhow::Result<Box<dyn Storage>> {
|
pub fn into_storage(self) -> Result<Box<dyn Storage>> {
|
||||||
Ok(match self {
|
Ok(match self {
|
||||||
StorageConfig::OnDisk {
|
StorageConfig::OnDisk {
|
||||||
taskdb_dir,
|
taskdb_dir,
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
#![allow(clippy::new_without_default)]
|
#![allow(clippy::new_without_default)]
|
||||||
|
|
||||||
|
use crate::errors::{Error, Result};
|
||||||
use crate::storage::{ReplicaOp, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
|
use crate::storage::{ReplicaOp, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
@ -40,14 +41,14 @@ impl<'t> Txn<'t> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'t> StorageTxn for Txn<'t> {
|
impl<'t> StorageTxn for Txn<'t> {
|
||||||
fn get_task(&mut self, uuid: Uuid) -> anyhow::Result<Option<TaskMap>> {
|
fn get_task(&mut self, uuid: Uuid) -> Result<Option<TaskMap>> {
|
||||||
match self.data_ref().tasks.get(&uuid) {
|
match self.data_ref().tasks.get(&uuid) {
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
Some(t) => Ok(Some(t.clone())),
|
Some(t) => Ok(Some(t.clone())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_task(&mut self, uuid: Uuid) -> anyhow::Result<bool> {
|
fn create_task(&mut self, uuid: Uuid) -> Result<bool> {
|
||||||
if let ent @ Entry::Vacant(_) = self.mut_data_ref().tasks.entry(uuid) {
|
if let ent @ Entry::Vacant(_) = self.mut_data_ref().tasks.entry(uuid) {
|
||||||
ent.or_insert_with(TaskMap::new);
|
ent.or_insert_with(TaskMap::new);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
|
@ -56,16 +57,16 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> anyhow::Result<()> {
|
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Result<()> {
|
||||||
self.mut_data_ref().tasks.insert(uuid, task);
|
self.mut_data_ref().tasks.insert(uuid, task);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete_task(&mut self, uuid: Uuid) -> anyhow::Result<bool> {
|
fn delete_task(&mut self, uuid: Uuid) -> Result<bool> {
|
||||||
Ok(self.mut_data_ref().tasks.remove(&uuid).is_some())
|
Ok(self.mut_data_ref().tasks.remove(&uuid).is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_tasks<'a>(&mut self) -> anyhow::Result<Vec<(Uuid, TaskMap)>> {
|
fn all_tasks<'a>(&mut self) -> Result<Vec<(Uuid, TaskMap)>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.data_ref()
|
.data_ref()
|
||||||
.tasks
|
.tasks
|
||||||
|
@ -74,62 +75,65 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_task_uuids<'a>(&mut self) -> anyhow::Result<Vec<Uuid>> {
|
fn all_task_uuids<'a>(&mut self) -> Result<Vec<Uuid>> {
|
||||||
Ok(self.data_ref().tasks.keys().copied().collect())
|
Ok(self.data_ref().tasks.keys().copied().collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn base_version(&mut self) -> anyhow::Result<VersionId> {
|
fn base_version(&mut self) -> Result<VersionId> {
|
||||||
Ok(self.data_ref().base_version)
|
Ok(self.data_ref().base_version)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_base_version(&mut self, version: VersionId) -> anyhow::Result<()> {
|
fn set_base_version(&mut self, version: VersionId) -> Result<()> {
|
||||||
self.mut_data_ref().base_version = version;
|
self.mut_data_ref().base_version = version;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn operations(&mut self) -> anyhow::Result<Vec<ReplicaOp>> {
|
fn operations(&mut self) -> Result<Vec<ReplicaOp>> {
|
||||||
Ok(self.data_ref().operations.clone())
|
Ok(self.data_ref().operations.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn num_operations(&mut self) -> anyhow::Result<usize> {
|
fn num_operations(&mut self) -> Result<usize> {
|
||||||
Ok(self.data_ref().operations.len())
|
Ok(self.data_ref().operations.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_operation(&mut self, op: ReplicaOp) -> anyhow::Result<()> {
|
fn add_operation(&mut self, op: ReplicaOp) -> Result<()> {
|
||||||
self.mut_data_ref().operations.push(op);
|
self.mut_data_ref().operations.push(op);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_operations(&mut self, ops: Vec<ReplicaOp>) -> anyhow::Result<()> {
|
fn set_operations(&mut self, ops: Vec<ReplicaOp>) -> Result<()> {
|
||||||
self.mut_data_ref().operations = ops;
|
self.mut_data_ref().operations = ops;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_working_set(&mut self) -> anyhow::Result<Vec<Option<Uuid>>> {
|
fn get_working_set(&mut self) -> Result<Vec<Option<Uuid>>> {
|
||||||
Ok(self.data_ref().working_set.clone())
|
Ok(self.data_ref().working_set.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_to_working_set(&mut self, uuid: Uuid) -> anyhow::Result<usize> {
|
fn add_to_working_set(&mut self, uuid: Uuid) -> Result<usize> {
|
||||||
let working_set = &mut self.mut_data_ref().working_set;
|
let working_set = &mut self.mut_data_ref().working_set;
|
||||||
working_set.push(Some(uuid));
|
working_set.push(Some(uuid));
|
||||||
Ok(working_set.len())
|
Ok(working_set.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> anyhow::Result<()> {
|
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Result<()> {
|
||||||
let working_set = &mut self.mut_data_ref().working_set;
|
let working_set = &mut self.mut_data_ref().working_set;
|
||||||
if index >= working_set.len() {
|
if index >= working_set.len() {
|
||||||
anyhow::bail!("Index {} is not in the working set", index);
|
return Err(Error::Database(format!(
|
||||||
|
"Index {} is not in the working set",
|
||||||
|
index
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
working_set[index] = uuid;
|
working_set[index] = uuid;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_working_set(&mut self) -> anyhow::Result<()> {
|
fn clear_working_set(&mut self) -> Result<()> {
|
||||||
self.mut_data_ref().working_set = vec![None];
|
self.mut_data_ref().working_set = vec![None];
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn commit(&mut self) -> anyhow::Result<()> {
|
fn commit(&mut self) -> Result<()> {
|
||||||
// copy the new_data back into storage to commit the transaction
|
// copy the new_data back into storage to commit the transaction
|
||||||
if let Some(data) = self.new_data.take() {
|
if let Some(data) = self.new_data.take() {
|
||||||
self.storage.data = data;
|
self.storage.data = data;
|
||||||
|
@ -159,7 +163,7 @@ impl InMemoryStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Storage for InMemoryStorage {
|
impl Storage for InMemoryStorage {
|
||||||
fn txn<'a>(&'a mut self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
|
fn txn<'a>(&'a mut self) -> Result<Box<dyn StorageTxn + 'a>> {
|
||||||
Ok(Box::new(Txn {
|
Ok(Box::new(Txn {
|
||||||
storage: self,
|
storage: self,
|
||||||
new_data: None,
|
new_data: None,
|
||||||
|
@ -176,7 +180,7 @@ mod test {
|
||||||
// elsewhere and not tested here)
|
// elsewhere and not tested here)
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn get_working_set_empty() -> anyhow::Result<()> {
|
fn get_working_set_empty() -> Result<()> {
|
||||||
let mut storage = InMemoryStorage::new();
|
let mut storage = InMemoryStorage::new();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -189,7 +193,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn add_to_working_set() -> anyhow::Result<()> {
|
fn add_to_working_set() -> Result<()> {
|
||||||
let mut storage = InMemoryStorage::new();
|
let mut storage = InMemoryStorage::new();
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
let uuid2 = Uuid::new_v4();
|
let uuid2 = Uuid::new_v4();
|
||||||
|
@ -211,7 +215,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn clear_working_set() -> anyhow::Result<()> {
|
fn clear_working_set() -> Result<()> {
|
||||||
let mut storage = InMemoryStorage::new();
|
let mut storage = InMemoryStorage::new();
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
let uuid2 = Uuid::new_v4();
|
let uuid2 = Uuid::new_v4();
|
||||||
|
|
|
@ -5,7 +5,7 @@ It defines a [trait](crate::storage::Storage) for storage implementations, and p
|
||||||
Typical uses of this crate do not interact directly with this module; [`StorageConfig`](crate::StorageConfig) is sufficient.
|
Typical uses of this crate do not interact directly with this module; [`StorageConfig`](crate::StorageConfig) is sufficient.
|
||||||
However, users who wish to implement their own storage backends can implement the traits defined here and pass the result to [`Replica`](crate::Replica).
|
However, users who wish to implement their own storage backends can implement the traits defined here and pass the result to [`Replica`](crate::Replica).
|
||||||
*/
|
*/
|
||||||
use anyhow::Result;
|
use crate::errors::Result;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,7 @@ impl ReplicaOp {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::storage::taskmap_with;
|
use crate::storage::taskmap_with;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
|
@ -110,7 +111,7 @@ mod test {
|
||||||
use ReplicaOp::*;
|
use ReplicaOp::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_create() -> anyhow::Result<()> {
|
fn test_json_create() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let op = Create { uuid };
|
let op = Create { uuid };
|
||||||
let json = serde_json::to_string(&op)?;
|
let json = serde_json::to_string(&op)?;
|
||||||
|
@ -121,7 +122,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_delete() -> anyhow::Result<()> {
|
fn test_json_delete() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let old_task = vec![("foo".into(), "bar".into())].drain(..).collect();
|
let old_task = vec![("foo".into(), "bar".into())].drain(..).collect();
|
||||||
let op = Delete { uuid, old_task };
|
let op = Delete { uuid, old_task };
|
||||||
|
@ -139,7 +140,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_update() -> anyhow::Result<()> {
|
fn test_json_update() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let timestamp = Utc::now();
|
let timestamp = Utc::now();
|
||||||
|
|
||||||
|
@ -165,7 +166,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_update_none() -> anyhow::Result<()> {
|
fn test_json_update_none() -> Result<()> {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let timestamp = Utc::now();
|
let timestamp = Utc::now();
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::storage::{ReplicaOp, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
|
use crate::storage::{ReplicaOp, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use rusqlite::types::{FromSql, ToSql};
|
use rusqlite::types::{FromSql, ToSql};
|
||||||
|
@ -6,7 +7,7 @@ use std::path::Path;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
enum SqliteError {
|
pub enum SqliteError {
|
||||||
#[error("SQLite transaction already committted")]
|
#[error("SQLite transaction already committted")]
|
||||||
TransactionAlreadyCommitted,
|
TransactionAlreadyCommitted,
|
||||||
}
|
}
|
||||||
|
@ -76,10 +77,7 @@ pub struct SqliteStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SqliteStorage {
|
impl SqliteStorage {
|
||||||
pub fn new<P: AsRef<Path>>(
|
pub fn new<P: AsRef<Path>>(directory: P, create_if_missing: bool) -> Result<SqliteStorage> {
|
||||||
directory: P,
|
|
||||||
create_if_missing: bool,
|
|
||||||
) -> anyhow::Result<SqliteStorage> {
|
|
||||||
if create_if_missing {
|
if create_if_missing {
|
||||||
// Ensure parent folder exists
|
// Ensure parent folder exists
|
||||||
std::fs::create_dir_all(&directory)?;
|
std::fs::create_dir_all(&directory)?;
|
||||||
|
@ -115,13 +113,13 @@ struct Txn<'t> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'t> Txn<'t> {
|
impl<'t> Txn<'t> {
|
||||||
fn get_txn(&self) -> Result<&rusqlite::Transaction<'t>, SqliteError> {
|
fn get_txn(&self) -> core::result::Result<&rusqlite::Transaction<'t>, SqliteError> {
|
||||||
self.txn
|
self.txn
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.ok_or(SqliteError::TransactionAlreadyCommitted)
|
.ok_or(SqliteError::TransactionAlreadyCommitted)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_next_working_set_number(&self) -> anyhow::Result<usize> {
|
fn get_next_working_set_number(&self) -> Result<usize> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
let next_id: Option<usize> = t
|
let next_id: Option<usize> = t
|
||||||
.query_row(
|
.query_row(
|
||||||
|
@ -137,14 +135,14 @@ impl<'t> Txn<'t> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Storage for SqliteStorage {
|
impl Storage for SqliteStorage {
|
||||||
fn txn<'a>(&'a mut self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
|
fn txn<'a>(&'a mut self) -> Result<Box<dyn StorageTxn + 'a>> {
|
||||||
let txn = self.con.transaction()?;
|
let txn = self.con.transaction()?;
|
||||||
Ok(Box::new(Txn { txn: Some(txn) }))
|
Ok(Box::new(Txn { txn: Some(txn) }))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'t> StorageTxn for Txn<'t> {
|
impl<'t> StorageTxn for Txn<'t> {
|
||||||
fn get_task(&mut self, uuid: Uuid) -> anyhow::Result<Option<TaskMap>> {
|
fn get_task(&mut self, uuid: Uuid) -> Result<Option<TaskMap>> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
let result: Option<StoredTaskMap> = t
|
let result: Option<StoredTaskMap> = t
|
||||||
.query_row(
|
.query_row(
|
||||||
|
@ -158,7 +156,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(result.map(|t| t.0))
|
Ok(result.map(|t| t.0))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_task(&mut self, uuid: Uuid) -> anyhow::Result<bool> {
|
fn create_task(&mut self, uuid: Uuid) -> Result<bool> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
let count: usize = t.query_row(
|
let count: usize = t.query_row(
|
||||||
"SELECT count(uuid) FROM tasks WHERE uuid = ?",
|
"SELECT count(uuid) FROM tasks WHERE uuid = ?",
|
||||||
|
@ -178,7 +176,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> anyhow::Result<()> {
|
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Result<()> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
t.execute(
|
t.execute(
|
||||||
"INSERT OR REPLACE INTO tasks (uuid, data) VALUES (?, ?)",
|
"INSERT OR REPLACE INTO tasks (uuid, data) VALUES (?, ?)",
|
||||||
|
@ -188,7 +186,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete_task(&mut self, uuid: Uuid) -> anyhow::Result<bool> {
|
fn delete_task(&mut self, uuid: Uuid) -> Result<bool> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
let changed = t
|
let changed = t
|
||||||
.execute("DELETE FROM tasks WHERE uuid = ?", [&StoredUuid(uuid)])
|
.execute("DELETE FROM tasks WHERE uuid = ?", [&StoredUuid(uuid)])
|
||||||
|
@ -196,7 +194,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(changed > 0)
|
Ok(changed > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_tasks(&mut self) -> anyhow::Result<Vec<(Uuid, TaskMap)>> {
|
fn all_tasks(&mut self) -> Result<Vec<(Uuid, TaskMap)>> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
let mut q = t.prepare("SELECT uuid, data FROM tasks")?;
|
let mut q = t.prepare("SELECT uuid, data FROM tasks")?;
|
||||||
|
@ -213,7 +211,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_task_uuids(&mut self) -> anyhow::Result<Vec<Uuid>> {
|
fn all_task_uuids(&mut self) -> Result<Vec<Uuid>> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
let mut q = t.prepare("SELECT uuid FROM tasks")?;
|
let mut q = t.prepare("SELECT uuid FROM tasks")?;
|
||||||
|
@ -229,7 +227,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn base_version(&mut self) -> anyhow::Result<VersionId> {
|
fn base_version(&mut self) -> Result<VersionId> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
let version: Option<StoredUuid> = t
|
let version: Option<StoredUuid> = t
|
||||||
|
@ -242,7 +240,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(version.map(|u| u.0).unwrap_or(DEFAULT_BASE_VERSION))
|
Ok(version.map(|u| u.0).unwrap_or(DEFAULT_BASE_VERSION))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_base_version(&mut self, version: VersionId) -> anyhow::Result<()> {
|
fn set_base_version(&mut self, version: VersionId) -> Result<()> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
t.execute(
|
t.execute(
|
||||||
"INSERT OR REPLACE INTO sync_meta (key, value) VALUES (?, ?)",
|
"INSERT OR REPLACE INTO sync_meta (key, value) VALUES (?, ?)",
|
||||||
|
@ -252,7 +250,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn operations(&mut self) -> anyhow::Result<Vec<ReplicaOp>> {
|
fn operations(&mut self) -> Result<Vec<ReplicaOp>> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
let mut q = t.prepare("SELECT data FROM operations ORDER BY id ASC")?;
|
let mut q = t.prepare("SELECT data FROM operations ORDER BY id ASC")?;
|
||||||
|
@ -268,13 +266,13 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn num_operations(&mut self) -> anyhow::Result<usize> {
|
fn num_operations(&mut self) -> Result<usize> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
let count: usize = t.query_row("SELECT count(*) FROM operations", [], |x| x.get(0))?;
|
let count: usize = t.query_row("SELECT count(*) FROM operations", [], |x| x.get(0))?;
|
||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_operation(&mut self, op: ReplicaOp) -> anyhow::Result<()> {
|
fn add_operation(&mut self, op: ReplicaOp) -> Result<()> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
t.execute("INSERT INTO operations (data) VALUES (?)", params![&op])
|
t.execute("INSERT INTO operations (data) VALUES (?)", params![&op])
|
||||||
|
@ -282,7 +280,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_operations(&mut self, ops: Vec<ReplicaOp>) -> anyhow::Result<()> {
|
fn set_operations(&mut self, ops: Vec<ReplicaOp>) -> Result<()> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
t.execute("DELETE FROM operations", [])
|
t.execute("DELETE FROM operations", [])
|
||||||
.context("Clear all existing operations")?;
|
.context("Clear all existing operations")?;
|
||||||
|
@ -295,7 +293,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_working_set(&mut self) -> anyhow::Result<Vec<Option<Uuid>>> {
|
fn get_working_set(&mut self) -> Result<Vec<Option<Uuid>>> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
let mut q = t.prepare("SELECT id, uuid FROM working_set ORDER BY id ASC")?;
|
let mut q = t.prepare("SELECT id, uuid FROM working_set ORDER BY id ASC")?;
|
||||||
|
@ -307,7 +305,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
})
|
})
|
||||||
.context("Get working set query")?;
|
.context("Get working set query")?;
|
||||||
|
|
||||||
let rows: Vec<Result<(usize, Uuid), _>> = rows.collect();
|
let rows: Vec<core::result::Result<(usize, Uuid), _>> = rows.collect();
|
||||||
let mut res = Vec::with_capacity(rows.len());
|
let mut res = Vec::with_capacity(rows.len());
|
||||||
for _ in 0..self
|
for _ in 0..self
|
||||||
.get_next_working_set_number()
|
.get_next_working_set_number()
|
||||||
|
@ -323,7 +321,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_to_working_set(&mut self, uuid: Uuid) -> anyhow::Result<usize> {
|
fn add_to_working_set(&mut self, uuid: Uuid) -> Result<usize> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
let next_working_id = self.get_next_working_set_number()?;
|
let next_working_id = self.get_next_working_set_number()?;
|
||||||
|
@ -337,7 +335,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(next_working_id)
|
Ok(next_working_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> anyhow::Result<()> {
|
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Result<()> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
match uuid {
|
match uuid {
|
||||||
// Add or override item
|
// Add or override item
|
||||||
|
@ -352,14 +350,14 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_working_set(&mut self) -> anyhow::Result<()> {
|
fn clear_working_set(&mut self) -> Result<()> {
|
||||||
let t = self.get_txn()?;
|
let t = self.get_txn()?;
|
||||||
t.execute("DELETE FROM working_set", [])
|
t.execute("DELETE FROM working_set", [])
|
||||||
.context("Clear working set query")?;
|
.context("Clear working set query")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn commit(&mut self) -> anyhow::Result<()> {
|
fn commit(&mut self) -> Result<()> {
|
||||||
let t = self
|
let t = self
|
||||||
.txn
|
.txn
|
||||||
.take()
|
.take()
|
||||||
|
@ -377,7 +375,7 @@ mod test {
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_empty_dir() -> anyhow::Result<()> {
|
fn test_empty_dir() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let non_existant = tmp_dir.path().join("subdir");
|
let non_existant = tmp_dir.path().join("subdir");
|
||||||
let mut storage = SqliteStorage::new(non_existant, true)?;
|
let mut storage = SqliteStorage::new(non_existant, true)?;
|
||||||
|
@ -396,7 +394,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn drop_transaction() -> anyhow::Result<()> {
|
fn drop_transaction() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
|
@ -425,7 +423,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_create() -> anyhow::Result<()> {
|
fn test_create() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
|
@ -443,7 +441,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_create_exists() -> anyhow::Result<()> {
|
fn test_create_exists() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
|
@ -461,7 +459,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_get_missing() -> anyhow::Result<()> {
|
fn test_get_missing() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
|
@ -474,7 +472,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_set_task() -> anyhow::Result<()> {
|
fn test_set_task() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
|
@ -495,7 +493,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_delete_task_missing() -> anyhow::Result<()> {
|
fn test_delete_task_missing() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
|
@ -507,7 +505,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_delete_task_exists() -> anyhow::Result<()> {
|
fn test_delete_task_exists() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
|
@ -524,7 +522,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_all_tasks_empty() -> anyhow::Result<()> {
|
fn test_all_tasks_empty() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
{
|
{
|
||||||
|
@ -536,7 +534,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_all_tasks_and_uuids() -> anyhow::Result<()> {
|
fn test_all_tasks_and_uuids() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
|
@ -590,7 +588,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_base_version_default() -> anyhow::Result<()> {
|
fn test_base_version_default() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
{
|
{
|
||||||
|
@ -601,7 +599,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_base_version_setting() -> anyhow::Result<()> {
|
fn test_base_version_setting() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let u = Uuid::new_v4();
|
let u = Uuid::new_v4();
|
||||||
|
@ -618,7 +616,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_operations() -> anyhow::Result<()> {
|
fn test_operations() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
|
@ -703,7 +701,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn get_working_set_empty() -> anyhow::Result<()> {
|
fn get_working_set_empty() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
|
|
||||||
|
@ -717,7 +715,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn add_to_working_set() -> anyhow::Result<()> {
|
fn add_to_working_set() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
|
@ -740,7 +738,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn clear_working_set() -> anyhow::Result<()> {
|
fn clear_working_set() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
|
@ -771,7 +769,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn set_working_set_item() -> anyhow::Result<()> {
|
fn set_working_set_item() -> Result<()> {
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new()?;
|
||||||
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
let mut storage = SqliteStorage::new(tmp_dir.path(), true)?;
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use super::tag::{SyntheticTag, TagInner};
|
use super::tag::{SyntheticTag, TagInner};
|
||||||
use super::{Annotation, Status, Tag, Timestamp};
|
use super::{Annotation, Status, Tag, Timestamp};
|
||||||
use crate::depmap::DependencyMap;
|
use crate::depmap::DependencyMap;
|
||||||
|
use crate::errors::{Error, Result};
|
||||||
use crate::replica::Replica;
|
use crate::replica::Replica;
|
||||||
use crate::storage::TaskMap;
|
use crate::storage::TaskMap;
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
|
@ -326,7 +327,7 @@ impl<'r> TaskMut<'r> {
|
||||||
|
|
||||||
/// Set the task's status. This also adds the task to the working set if the
|
/// Set the task's status. This also adds the task to the working set if the
|
||||||
/// new status puts it in that set.
|
/// new status puts it in that set.
|
||||||
pub fn set_status(&mut self, status: Status) -> anyhow::Result<()> {
|
pub fn set_status(&mut self, status: Status) -> Result<()> {
|
||||||
match status {
|
match status {
|
||||||
Status::Pending | Status::Recurring => {
|
Status::Pending | Status::Recurring => {
|
||||||
// clear "end" when a task becomes "pending" or "recurring"
|
// clear "end" when a task becomes "pending" or "recurring"
|
||||||
|
@ -350,32 +351,28 @@ impl<'r> TaskMut<'r> {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_description(&mut self, description: String) -> anyhow::Result<()> {
|
pub fn set_description(&mut self, description: String) -> Result<()> {
|
||||||
self.set_string(Prop::Description.as_ref(), Some(description))
|
self.set_string(Prop::Description.as_ref(), Some(description))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_priority(&mut self, priority: String) -> anyhow::Result<()> {
|
pub fn set_priority(&mut self, priority: String) -> Result<()> {
|
||||||
self.set_string(Prop::Priority.as_ref(), Some(priority))
|
self.set_string(Prop::Priority.as_ref(), Some(priority))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_entry(&mut self, entry: Option<DateTime<Utc>>) -> anyhow::Result<()> {
|
pub fn set_entry(&mut self, entry: Option<DateTime<Utc>>) -> Result<()> {
|
||||||
self.set_timestamp(Prop::Entry.as_ref(), entry)
|
self.set_timestamp(Prop::Entry.as_ref(), entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_wait(&mut self, wait: Option<DateTime<Utc>>) -> anyhow::Result<()> {
|
pub fn set_wait(&mut self, wait: Option<DateTime<Utc>>) -> Result<()> {
|
||||||
self.set_timestamp(Prop::Wait.as_ref(), wait)
|
self.set_timestamp(Prop::Wait.as_ref(), wait)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_modified(&mut self, modified: DateTime<Utc>) -> anyhow::Result<()> {
|
pub fn set_modified(&mut self, modified: DateTime<Utc>) -> Result<()> {
|
||||||
self.set_timestamp(Prop::Modified.as_ref(), Some(modified))
|
self.set_timestamp(Prop::Modified.as_ref(), Some(modified))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set a tasks's property by name.
|
/// Set a tasks's property by name.
|
||||||
pub fn set_value<S: Into<String>>(
|
pub fn set_value<S: Into<String>>(&mut self, property: S, value: Option<String>) -> Result<()> {
|
||||||
&mut self,
|
|
||||||
property: S,
|
|
||||||
value: Option<String>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let property = property.into();
|
let property = property.into();
|
||||||
|
|
||||||
if let Some(ref v) = value {
|
if let Some(ref v) = value {
|
||||||
|
@ -393,7 +390,7 @@ impl<'r> TaskMut<'r> {
|
||||||
|
|
||||||
/// Start the task by creating "start": "<timestamp>", if the task is not already
|
/// Start the task by creating "start": "<timestamp>", if the task is not already
|
||||||
/// active.
|
/// active.
|
||||||
pub fn start(&mut self) -> anyhow::Result<()> {
|
pub fn start(&mut self) -> Result<()> {
|
||||||
if self.is_active() {
|
if self.is_active() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -401,12 +398,12 @@ impl<'r> TaskMut<'r> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop the task by removing the `start` key
|
/// Stop the task by removing the `start` key
|
||||||
pub fn stop(&mut self) -> anyhow::Result<()> {
|
pub fn stop(&mut self) -> Result<()> {
|
||||||
self.set_timestamp(Prop::Start.as_ref(), None)
|
self.set_timestamp(Prop::Start.as_ref(), None)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark this task as complete
|
/// Mark this task as complete
|
||||||
pub fn done(&mut self) -> anyhow::Result<()> {
|
pub fn done(&mut self) -> Result<()> {
|
||||||
self.set_status(Status::Completed)
|
self.set_status(Status::Completed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,29 +411,33 @@ impl<'r> TaskMut<'r> {
|
||||||
///
|
///
|
||||||
/// Note that this does not delete the task. It merely marks the task as
|
/// Note that this does not delete the task. It merely marks the task as
|
||||||
/// deleted.
|
/// deleted.
|
||||||
pub fn delete(&mut self) -> anyhow::Result<()> {
|
pub fn delete(&mut self) -> Result<()> {
|
||||||
self.set_status(Status::Deleted)
|
self.set_status(Status::Deleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a tag to this task. Does nothing if the tag is already present.
|
/// Add a tag to this task. Does nothing if the tag is already present.
|
||||||
pub fn add_tag(&mut self, tag: &Tag) -> anyhow::Result<()> {
|
pub fn add_tag(&mut self, tag: &Tag) -> Result<()> {
|
||||||
if tag.is_synthetic() {
|
if tag.is_synthetic() {
|
||||||
anyhow::bail!("Synthetic tags cannot be modified");
|
return Err(Error::UserError(String::from(
|
||||||
|
"Synthetic tags cannot be modified",
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
self.set_string(format!("tag_{}", tag), Some("".to_owned()))
|
self.set_string(format!("tag_{}", tag), Some("".to_owned()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a tag from this task. Does nothing if the tag is not present.
|
/// Remove a tag from this task. Does nothing if the tag is not present.
|
||||||
pub fn remove_tag(&mut self, tag: &Tag) -> anyhow::Result<()> {
|
pub fn remove_tag(&mut self, tag: &Tag) -> Result<()> {
|
||||||
if tag.is_synthetic() {
|
if tag.is_synthetic() {
|
||||||
anyhow::bail!("Synthetic tags cannot be modified");
|
return Err(Error::UserError(String::from(
|
||||||
|
"Synthetic tags cannot be modified",
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
self.set_string(format!("tag_{}", tag), None)
|
self.set_string(format!("tag_{}", tag), None)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a new annotation. Note that annotations with the same entry time
|
/// Add a new annotation. Note that annotations with the same entry time
|
||||||
/// will overwrite one another.
|
/// will overwrite one another.
|
||||||
pub fn add_annotation(&mut self, ann: Annotation) -> anyhow::Result<()> {
|
pub fn add_annotation(&mut self, ann: Annotation) -> Result<()> {
|
||||||
self.set_string(
|
self.set_string(
|
||||||
format!("annotation_{}", ann.entry.timestamp()),
|
format!("annotation_{}", ann.entry.timestamp()),
|
||||||
Some(ann.description),
|
Some(ann.description),
|
||||||
|
@ -444,7 +445,7 @@ impl<'r> TaskMut<'r> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove an annotation, based on its entry time.
|
/// Remove an annotation, based on its entry time.
|
||||||
pub fn remove_annotation(&mut self, entry: Timestamp) -> anyhow::Result<()> {
|
pub fn remove_annotation(&mut self, entry: Timestamp) -> Result<()> {
|
||||||
self.set_string(format!("annotation_{}", entry.timestamp()), None)
|
self.set_string(format!("annotation_{}", entry.timestamp()), None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -455,18 +456,14 @@ impl<'r> TaskMut<'r> {
|
||||||
namespace: impl AsRef<str>,
|
namespace: impl AsRef<str>,
|
||||||
key: impl AsRef<str>,
|
key: impl AsRef<str>,
|
||||||
value: impl Into<String>,
|
value: impl Into<String>,
|
||||||
) -> anyhow::Result<()> {
|
) -> Result<()> {
|
||||||
let key = uda_tuple_to_string(namespace, key);
|
let key = uda_tuple_to_string(namespace, key);
|
||||||
self.set_legacy_uda(key, value)
|
self.set_legacy_uda(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a user-defined attribute (UDA). This will fail if the key is defined by the data
|
/// Remove a user-defined attribute (UDA). This will fail if the key is defined by the data
|
||||||
/// model.
|
/// model.
|
||||||
pub fn remove_uda(
|
pub fn remove_uda(&mut self, namespace: impl AsRef<str>, key: impl AsRef<str>) -> Result<()> {
|
||||||
&mut self,
|
|
||||||
namespace: impl AsRef<str>,
|
|
||||||
key: impl AsRef<str>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let key = uda_tuple_to_string(namespace, key);
|
let key = uda_tuple_to_string(namespace, key);
|
||||||
self.remove_legacy_uda(key)
|
self.remove_legacy_uda(key)
|
||||||
}
|
}
|
||||||
|
@ -476,44 +473,44 @@ impl<'r> TaskMut<'r> {
|
||||||
&mut self,
|
&mut self,
|
||||||
key: impl Into<String>,
|
key: impl Into<String>,
|
||||||
value: impl Into<String>,
|
value: impl Into<String>,
|
||||||
) -> anyhow::Result<()> {
|
) -> Result<()> {
|
||||||
let key = key.into();
|
let key = key.into();
|
||||||
if Task::is_known_key(&key) {
|
if Task::is_known_key(&key) {
|
||||||
anyhow::bail!(
|
return Err(Error::UserError(format!(
|
||||||
"Property name {} as special meaning in a task and cannot be used as a UDA",
|
"Property name {} as special meaning in a task and cannot be used as a UDA",
|
||||||
key
|
key
|
||||||
);
|
)));
|
||||||
}
|
}
|
||||||
self.set_string(key, Some(value.into()))
|
self.set_string(key, Some(value.into()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a user-defined attribute (UDA), where the key is a legacy key.
|
/// Remove a user-defined attribute (UDA), where the key is a legacy key.
|
||||||
pub fn remove_legacy_uda(&mut self, key: impl Into<String>) -> anyhow::Result<()> {
|
pub fn remove_legacy_uda(&mut self, key: impl Into<String>) -> Result<()> {
|
||||||
let key = key.into();
|
let key = key.into();
|
||||||
if Task::is_known_key(&key) {
|
if Task::is_known_key(&key) {
|
||||||
anyhow::bail!(
|
return Err(Error::UserError(format!(
|
||||||
"Property name {} as special meaning in a task and cannot be used as a UDA",
|
"Property name {} as special meaning in a task and cannot be used as a UDA",
|
||||||
key
|
key
|
||||||
);
|
)));
|
||||||
}
|
}
|
||||||
self.set_string(key, None)
|
self.set_string(key, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a dependency.
|
/// Add a dependency.
|
||||||
pub fn add_dependency(&mut self, dep: Uuid) -> anyhow::Result<()> {
|
pub fn add_dependency(&mut self, dep: Uuid) -> Result<()> {
|
||||||
let key = format!("dep_{}", dep);
|
let key = format!("dep_{}", dep);
|
||||||
self.set_string(key, Some("".to_string()))
|
self.set_string(key, Some("".to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a dependency.
|
/// Remove a dependency.
|
||||||
pub fn remove_dependency(&mut self, dep: Uuid) -> anyhow::Result<()> {
|
pub fn remove_dependency(&mut self, dep: Uuid) -> Result<()> {
|
||||||
let key = format!("dep_{}", dep);
|
let key = format!("dep_{}", dep);
|
||||||
self.set_string(key, None)
|
self.set_string(key, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- utility functions
|
// -- utility functions
|
||||||
|
|
||||||
fn update_modified(&mut self) -> anyhow::Result<()> {
|
fn update_modified(&mut self) -> Result<()> {
|
||||||
if !self.updated_modified {
|
if !self.updated_modified {
|
||||||
let now = format!("{}", Utc::now().timestamp());
|
let now = format!("{}", Utc::now().timestamp());
|
||||||
trace!("task {}: set property modified={:?}", self.task.uuid, now);
|
trace!("task {}: set property modified={:?}", self.task.uuid, now);
|
||||||
|
@ -525,11 +522,7 @@ impl<'r> TaskMut<'r> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_string<S: Into<String>>(
|
fn set_string<S: Into<String>>(&mut self, property: S, value: Option<String>) -> Result<()> {
|
||||||
&mut self,
|
|
||||||
property: S,
|
|
||||||
value: Option<String>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let property = property.into();
|
let property = property.into();
|
||||||
// updated the modified timestamp unless we are setting it explicitly
|
// updated the modified timestamp unless we are setting it explicitly
|
||||||
if &property != "modified" {
|
if &property != "modified" {
|
||||||
|
@ -539,17 +532,13 @@ impl<'r> TaskMut<'r> {
|
||||||
self.set_value(property, value)
|
self.set_value(property, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_timestamp(
|
fn set_timestamp(&mut self, property: &str, value: Option<DateTime<Utc>>) -> Result<()> {
|
||||||
&mut self,
|
|
||||||
property: &str,
|
|
||||||
value: Option<DateTime<Utc>>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
self.set_string(property, value.map(|v| v.timestamp().to_string()))
|
self.set_string(property, value.map(|v| v.timestamp().to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Used by tests to ensure that updates are properly written
|
/// Used by tests to ensure that updates are properly written
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn reload(&mut self) -> anyhow::Result<()> {
|
fn reload(&mut self) -> Result<()> {
|
||||||
let uuid = self.uuid;
|
let uuid = self.uuid;
|
||||||
let task = self.replica.get_task(uuid)?.unwrap();
|
let task = self.replica.get_task(uuid)?.unwrap();
|
||||||
self.task.taskmap = task.taskmap;
|
self.task.taskmap = task.taskmap;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::errors::Error;
|
use crate::errors::{Error, Result};
|
||||||
use crate::server::SyncOp;
|
use crate::server::SyncOp;
|
||||||
use crate::storage::{ReplicaOp, StorageTxn, TaskMap};
|
use crate::storage::{ReplicaOp, StorageTxn, TaskMap};
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ use crate::storage::{ReplicaOp, StorageTxn, TaskMap};
|
||||||
/// ReplicaOp to the list of operations. Returns the TaskMap of the task after the
|
/// ReplicaOp to the list of operations. Returns the TaskMap of the task after the
|
||||||
/// operation has been applied (or an empty TaskMap for Delete). It is not an error
|
/// operation has been applied (or an empty TaskMap for Delete). It is not an error
|
||||||
/// to create an existing task, nor to delete a nonexistent task.
|
/// to create an existing task, nor to delete a nonexistent task.
|
||||||
pub(super) fn apply_and_record(txn: &mut dyn StorageTxn, op: SyncOp) -> anyhow::Result<TaskMap> {
|
pub(super) fn apply_and_record(txn: &mut dyn StorageTxn, op: SyncOp) -> Result<TaskMap> {
|
||||||
match op {
|
match op {
|
||||||
SyncOp::Create { uuid } => {
|
SyncOp::Create { uuid } => {
|
||||||
let created = txn.create_task(uuid)?;
|
let created = txn.create_task(uuid)?;
|
||||||
|
@ -59,14 +59,14 @@ pub(super) fn apply_and_record(txn: &mut dyn StorageTxn, op: SyncOp) -> anyhow::
|
||||||
txn.commit()?;
|
txn.commit()?;
|
||||||
Ok(task)
|
Ok(task)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Database(format!("Task {} does not exist", uuid)).into())
|
Err(Error::Database(format!("Task {} does not exist", uuid)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations)
|
/// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations)
|
||||||
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<()> {
|
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> Result<()> {
|
||||||
// TODO: test
|
// TODO: test
|
||||||
// TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps
|
// TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps
|
||||||
// unnecessariliy
|
// unnecessariliy
|
||||||
|
@ -74,12 +74,12 @@ pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<
|
||||||
SyncOp::Create { uuid } => {
|
SyncOp::Create { uuid } => {
|
||||||
// insert if the task does not already exist
|
// insert if the task does not already exist
|
||||||
if !txn.create_task(*uuid)? {
|
if !txn.create_task(*uuid)? {
|
||||||
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
|
return Err(Error::Database(format!("Task {} already exists", uuid)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SyncOp::Delete { ref uuid } => {
|
SyncOp::Delete { ref uuid } => {
|
||||||
if !txn.delete_task(*uuid)? {
|
if !txn.delete_task(*uuid)? {
|
||||||
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
|
return Err(Error::Database(format!("Task {} does not exist", uuid)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SyncOp::Update {
|
SyncOp::Update {
|
||||||
|
@ -96,7 +96,7 @@ pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<
|
||||||
};
|
};
|
||||||
txn.set_task(*uuid, task)?;
|
txn.set_task(*uuid, task)?;
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
|
return Err(Error::Database(format!("Task {} does not exist", uuid)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,7 +115,7 @@ mod tests {
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_create() -> anyhow::Result<()> {
|
fn test_apply_create() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let op = SyncOp::Create { uuid };
|
let op = SyncOp::Create { uuid };
|
||||||
|
@ -133,7 +133,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_create_exists() -> anyhow::Result<()> {
|
fn test_apply_create_exists() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
{
|
{
|
||||||
|
@ -167,7 +167,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_create_update() -> anyhow::Result<()> {
|
fn test_apply_create_update() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
|
@ -218,7 +218,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_create_update_delete_prop() -> anyhow::Result<()> {
|
fn test_apply_create_update_delete_prop() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
|
@ -310,7 +310,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_update_does_not_exist() -> anyhow::Result<()> {
|
fn test_apply_update_does_not_exist() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let op = SyncOp::Update {
|
let op = SyncOp::Update {
|
||||||
|
@ -335,7 +335,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_create_delete() -> anyhow::Result<()> {
|
fn test_apply_create_delete() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
|
@ -390,7 +390,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_delete_not_present() -> anyhow::Result<()> {
|
fn test_apply_delete_not_present() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let op = SyncOp::Delete { uuid };
|
let op = SyncOp::Delete { uuid };
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::server::{Server, SyncOp};
|
use crate::server::{Server, SyncOp};
|
||||||
use crate::storage::{ReplicaOp, Storage, TaskMap};
|
use crate::storage::{ReplicaOp, Storage, TaskMap};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
@ -36,38 +37,38 @@ impl TaskDb {
|
||||||
/// Aside from synchronization operations, this is the only way to modify the TaskDb. In cases
|
/// Aside from synchronization operations, this is the only way to modify the TaskDb. In cases
|
||||||
/// where an operation does not make sense, this function will do nothing and return an error
|
/// where an operation does not make sense, this function will do nothing and return an error
|
||||||
/// (but leave the TaskDb in a consistent state).
|
/// (but leave the TaskDb in a consistent state).
|
||||||
pub fn apply(&mut self, op: SyncOp) -> anyhow::Result<TaskMap> {
|
pub fn apply(&mut self, op: SyncOp) -> Result<TaskMap> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
apply::apply_and_record(txn.as_mut(), op)
|
apply::apply_and_record(txn.as_mut(), op)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add an UndoPoint operation to the list of replica operations.
|
/// Add an UndoPoint operation to the list of replica operations.
|
||||||
pub fn add_undo_point(&mut self) -> anyhow::Result<()> {
|
pub fn add_undo_point(&mut self) -> Result<()> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
txn.add_operation(ReplicaOp::UndoPoint)?;
|
txn.add_operation(ReplicaOp::UndoPoint)?;
|
||||||
txn.commit()
|
txn.commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get all tasks.
|
/// Get all tasks.
|
||||||
pub fn all_tasks(&mut self) -> anyhow::Result<Vec<(Uuid, TaskMap)>> {
|
pub fn all_tasks(&mut self) -> Result<Vec<(Uuid, TaskMap)>> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
txn.all_tasks()
|
txn.all_tasks()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the UUIDs of all tasks
|
/// Get the UUIDs of all tasks
|
||||||
pub fn all_task_uuids(&mut self) -> anyhow::Result<Vec<Uuid>> {
|
pub fn all_task_uuids(&mut self) -> Result<Vec<Uuid>> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
txn.all_task_uuids()
|
txn.all_task_uuids()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the working set
|
/// Get the working set
|
||||||
pub fn working_set(&mut self) -> anyhow::Result<Vec<Option<Uuid>>> {
|
pub fn working_set(&mut self) -> Result<Vec<Option<Uuid>>> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
txn.get_working_set()
|
txn.get_working_set()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a single task, by uuid.
|
/// Get a single task, by uuid.
|
||||||
pub fn get_task(&mut self, uuid: Uuid) -> anyhow::Result<Option<TaskMap>> {
|
pub fn get_task(&mut self, uuid: Uuid) -> Result<Option<TaskMap>> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
txn.get_task(uuid)
|
txn.get_task(uuid)
|
||||||
}
|
}
|
||||||
|
@ -76,11 +77,7 @@ impl TaskDb {
|
||||||
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
|
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
|
||||||
/// are not already in the working set but should be. The rebuild occurs in a single
|
/// are not already in the working set but should be. The rebuild occurs in a single
|
||||||
/// trasnsaction against the storage backend.
|
/// trasnsaction against the storage backend.
|
||||||
pub fn rebuild_working_set<F>(
|
pub fn rebuild_working_set<F>(&mut self, in_working_set: F, renumber: bool) -> Result<()>
|
||||||
&mut self,
|
|
||||||
in_working_set: F,
|
|
||||||
renumber: bool,
|
|
||||||
) -> anyhow::Result<()>
|
|
||||||
where
|
where
|
||||||
F: Fn(&TaskMap) -> bool,
|
F: Fn(&TaskMap) -> bool,
|
||||||
{
|
{
|
||||||
|
@ -89,7 +86,7 @@ impl TaskDb {
|
||||||
|
|
||||||
/// Add the given uuid to the working set and return its index; if it is already in the working
|
/// Add the given uuid to the working set and return its index; if it is already in the working
|
||||||
/// set, its index is returned. This does *not* renumber any existing tasks.
|
/// set, its index is returned. This does *not* renumber any existing tasks.
|
||||||
pub fn add_to_working_set(&mut self, uuid: Uuid) -> anyhow::Result<usize> {
|
pub fn add_to_working_set(&mut self, uuid: Uuid) -> Result<usize> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
// search for an existing entry for this task..
|
// search for an existing entry for this task..
|
||||||
for (i, elt) in txn.get_working_set()?.iter().enumerate() {
|
for (i, elt) in txn.get_working_set()?.iter().enumerate() {
|
||||||
|
@ -112,25 +109,21 @@ impl TaskDb {
|
||||||
///
|
///
|
||||||
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
|
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
|
||||||
/// system
|
/// system
|
||||||
pub fn sync(
|
pub fn sync(&mut self, server: &mut Box<dyn Server>, avoid_snapshots: bool) -> Result<()> {
|
||||||
&mut self,
|
|
||||||
server: &mut Box<dyn Server>,
|
|
||||||
avoid_snapshots: bool,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
sync::sync(server, txn.as_mut(), avoid_snapshots)
|
sync::sync(server, txn.as_mut(), avoid_snapshots)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Undo local operations until the most recent UndoPoint, returning false if there are no
|
/// Undo local operations until the most recent UndoPoint, returning false if there are no
|
||||||
/// local operations to undo.
|
/// local operations to undo.
|
||||||
pub fn undo(&mut self) -> anyhow::Result<bool> {
|
pub fn undo(&mut self) -> Result<bool> {
|
||||||
let mut txn = self.storage.txn()?;
|
let mut txn = self.storage.txn()?;
|
||||||
undo::undo(txn.as_mut())
|
undo::undo(txn.as_mut())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the number of un-synchronized operations in storage, excluding undo
|
/// Get the number of un-synchronized operations in storage, excluding undo
|
||||||
/// operations.
|
/// operations.
|
||||||
pub fn num_operations(&mut self) -> anyhow::Result<usize> {
|
pub fn num_operations(&mut self) -> Result<usize> {
|
||||||
let mut txn = self.storage.txn().unwrap();
|
let mut txn = self.storage.txn().unwrap();
|
||||||
Ok(txn
|
Ok(txn
|
||||||
.operations()?
|
.operations()?
|
||||||
|
@ -140,7 +133,7 @@ impl TaskDb {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the number of (un-synchronized) undo points in storage.
|
/// Get the number of (un-synchronized) undo points in storage.
|
||||||
pub fn num_undo_points(&mut self) -> anyhow::Result<usize> {
|
pub fn num_undo_points(&mut self) -> Result<usize> {
|
||||||
let mut txn = self.storage.txn().unwrap();
|
let mut txn = self.storage.txn().unwrap();
|
||||||
Ok(txn
|
Ok(txn
|
||||||
.operations()?
|
.operations()?
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::{Error, Result};
|
||||||
use crate::storage::{StorageTxn, TaskMap, VersionId};
|
use crate::storage::{StorageTxn, TaskMap, VersionId};
|
||||||
use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression};
|
use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression};
|
||||||
use serde::de::{Deserialize, Deserializer, MapAccess, Visitor};
|
use serde::de::{Deserialize, Deserializer, MapAccess, Visitor};
|
||||||
|
@ -9,7 +10,7 @@ use uuid::Uuid;
|
||||||
pub(super) struct SnapshotTasks(Vec<(Uuid, TaskMap)>);
|
pub(super) struct SnapshotTasks(Vec<(Uuid, TaskMap)>);
|
||||||
|
|
||||||
impl Serialize for SnapshotTasks {
|
impl Serialize for SnapshotTasks {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
fn serialize<'a, S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
|
@ -30,7 +31,7 @@ impl<'de> Visitor<'de> for TaskDbVisitor {
|
||||||
formatter.write_str("a map representing a task snapshot")
|
formatter.write_str("a map representing a task snapshot")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
|
fn visit_map<M>(self, mut access: M) -> core::result::Result<Self::Value, M::Error>
|
||||||
where
|
where
|
||||||
M: MapAccess<'de>,
|
M: MapAccess<'de>,
|
||||||
{
|
{
|
||||||
|
@ -45,7 +46,7 @@ impl<'de> Visitor<'de> for TaskDbVisitor {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'de> Deserialize<'de> for SnapshotTasks {
|
impl<'de> Deserialize<'de> for SnapshotTasks {
|
||||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error>
|
||||||
where
|
where
|
||||||
D: Deserializer<'de>,
|
D: Deserializer<'de>,
|
||||||
{
|
{
|
||||||
|
@ -54,13 +55,13 @@ impl<'de> Deserialize<'de> for SnapshotTasks {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SnapshotTasks {
|
impl SnapshotTasks {
|
||||||
pub(super) fn encode(&self) -> anyhow::Result<Vec<u8>> {
|
pub(super) fn encode(&self) -> Result<Vec<u8>> {
|
||||||
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
|
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
|
||||||
serde_json::to_writer(&mut encoder, &self)?;
|
serde_json::to_writer(&mut encoder, &self)?;
|
||||||
Ok(encoder.finish()?)
|
Ok(encoder.finish()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn decode(snapshot: &[u8]) -> anyhow::Result<Self> {
|
pub(super) fn decode(snapshot: &[u8]) -> Result<Self> {
|
||||||
let decoder = ZlibDecoder::new(snapshot);
|
let decoder = ZlibDecoder::new(snapshot);
|
||||||
Ok(serde_json::from_reader(decoder)?)
|
Ok(serde_json::from_reader(decoder)?)
|
||||||
}
|
}
|
||||||
|
@ -72,7 +73,7 @@ impl SnapshotTasks {
|
||||||
|
|
||||||
/// Generate a snapshot (compressed, unencrypted) for the current state of the taskdb in the given
|
/// Generate a snapshot (compressed, unencrypted) for the current state of the taskdb in the given
|
||||||
/// storage.
|
/// storage.
|
||||||
pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> anyhow::Result<Vec<u8>> {
|
pub(super) fn make_snapshot(txn: &mut dyn StorageTxn) -> Result<Vec<u8>> {
|
||||||
let all_tasks = SnapshotTasks(txn.all_tasks()?);
|
let all_tasks = SnapshotTasks(txn.all_tasks()?);
|
||||||
all_tasks.encode()
|
all_tasks.encode()
|
||||||
}
|
}
|
||||||
|
@ -82,12 +83,14 @@ pub(super) fn apply_snapshot(
|
||||||
txn: &mut dyn StorageTxn,
|
txn: &mut dyn StorageTxn,
|
||||||
version: VersionId,
|
version: VersionId,
|
||||||
snapshot: &[u8],
|
snapshot: &[u8],
|
||||||
) -> anyhow::Result<()> {
|
) -> Result<()> {
|
||||||
let all_tasks = SnapshotTasks::decode(snapshot)?;
|
let all_tasks = SnapshotTasks::decode(snapshot)?;
|
||||||
|
|
||||||
// double-check emptiness
|
// double-check emptiness
|
||||||
if !txn.is_empty()? {
|
if !txn.is_empty()? {
|
||||||
anyhow::bail!("Cannot apply snapshot to a non-empty task database");
|
return Err(Error::Database(String::from(
|
||||||
|
"Cannot apply snapshot to a non-empty task database",
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (uuid, task) in all_tasks.into_inner().drain(..) {
|
for (uuid, task) in all_tasks.into_inner().drain(..) {
|
||||||
|
@ -105,14 +108,14 @@ mod test {
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_serialize_empty() -> anyhow::Result<()> {
|
fn test_serialize_empty() -> Result<()> {
|
||||||
let empty = SnapshotTasks(vec![]);
|
let empty = SnapshotTasks(vec![]);
|
||||||
assert_eq!(serde_json::to_vec(&empty)?, b"{}".to_owned());
|
assert_eq!(serde_json::to_vec(&empty)?, b"{}".to_owned());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_serialize_tasks() -> anyhow::Result<()> {
|
fn test_serialize_tasks() -> Result<()> {
|
||||||
let u = Uuid::new_v4();
|
let u = Uuid::new_v4();
|
||||||
let m: TaskMap = vec![("description".to_owned(), "my task".to_owned())]
|
let m: TaskMap = vec![("description".to_owned(), "my task".to_owned())]
|
||||||
.drain(..)
|
.drain(..)
|
||||||
|
@ -126,7 +129,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_round_trip() -> anyhow::Result<()> {
|
fn test_round_trip() -> Result<()> {
|
||||||
let mut storage = InMemoryStorage::new();
|
let mut storage = InMemoryStorage::new();
|
||||||
let version = Uuid::new_v4();
|
let version = Uuid::new_v4();
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use super::{apply, snapshot};
|
use super::{apply, snapshot};
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency, SyncOp};
|
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency, SyncOp};
|
||||||
use crate::storage::StorageTxn;
|
use crate::storage::StorageTxn;
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
@ -16,7 +17,7 @@ pub(super) fn sync(
|
||||||
server: &mut Box<dyn Server>,
|
server: &mut Box<dyn Server>,
|
||||||
txn: &mut dyn StorageTxn,
|
txn: &mut dyn StorageTxn,
|
||||||
avoid_snapshots: bool,
|
avoid_snapshots: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> Result<()> {
|
||||||
// if this taskdb is entirely empty, then start by getting and applying a snapshot
|
// if this taskdb is entirely empty, then start by getting and applying a snapshot
|
||||||
if txn.is_empty()? {
|
if txn.is_empty()? {
|
||||||
trace!("storage is empty; attempting to apply a snapshot");
|
trace!("storage is empty; attempting to apply a snapshot");
|
||||||
|
@ -104,7 +105,7 @@ pub(super) fn sync(
|
||||||
);
|
);
|
||||||
if let Some(requested) = requested_parent_version_id {
|
if let Some(requested) = requested_parent_version_id {
|
||||||
if parent_version_id == requested {
|
if parent_version_id == requested {
|
||||||
return Err(Error::OutOfSync.into());
|
return Err(Error::OutOfSync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
requested_parent_version_id = Some(parent_version_id);
|
requested_parent_version_id = Some(parent_version_id);
|
||||||
|
@ -121,7 +122,7 @@ fn apply_version(
|
||||||
txn: &mut dyn StorageTxn,
|
txn: &mut dyn StorageTxn,
|
||||||
local_ops: &mut Vec<SyncOp>,
|
local_ops: &mut Vec<SyncOp>,
|
||||||
mut version: Version,
|
mut version: Version,
|
||||||
) -> anyhow::Result<()> {
|
) -> Result<()> {
|
||||||
// The situation here is that the server has already applied all server operations, and we
|
// The situation here is that the server has already applied all server operations, and we
|
||||||
// have already applied all local operations, so states have diverged by several
|
// have already applied all local operations, so states have diverged by several
|
||||||
// operations. We need to figure out what operations to apply locally and on the server in
|
// operations. We need to figure out what operations to apply locally and on the server in
|
||||||
|
@ -195,7 +196,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sync() -> anyhow::Result<()> {
|
fn test_sync() -> Result<()> {
|
||||||
let mut server: Box<dyn Server> = TestServer::new().server();
|
let mut server: Box<dyn Server> = TestServer::new().server();
|
||||||
|
|
||||||
let mut db1 = newdb();
|
let mut db1 = newdb();
|
||||||
|
@ -257,7 +258,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sync_create_delete() -> anyhow::Result<()> {
|
fn test_sync_create_delete() -> Result<()> {
|
||||||
let mut server: Box<dyn Server> = TestServer::new().server();
|
let mut server: Box<dyn Server> = TestServer::new().server();
|
||||||
|
|
||||||
let mut db1 = newdb();
|
let mut db1 = newdb();
|
||||||
|
@ -312,7 +313,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sync_add_snapshot_start_with_snapshot() -> anyhow::Result<()> {
|
fn test_sync_add_snapshot_start_with_snapshot() -> Result<()> {
|
||||||
let mut test_server = TestServer::new();
|
let mut test_server = TestServer::new();
|
||||||
|
|
||||||
let mut server: Box<dyn Server> = test_server.server();
|
let mut server: Box<dyn Server> = test_server.server();
|
||||||
|
@ -364,7 +365,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sync_avoids_snapshot() -> anyhow::Result<()> {
|
fn test_sync_avoids_snapshot() -> Result<()> {
|
||||||
let test_server = TestServer::new();
|
let test_server = TestServer::new();
|
||||||
|
|
||||||
let mut server: Box<dyn Server> = test_server.server();
|
let mut server: Box<dyn Server> = test_server.server();
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
use super::apply;
|
use super::apply;
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::storage::{ReplicaOp, StorageTxn};
|
use crate::storage::{ReplicaOp, StorageTxn};
|
||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
|
|
||||||
/// Undo local operations until an UndoPoint.
|
/// Undo local operations until an UndoPoint.
|
||||||
pub(super) fn undo(txn: &mut dyn StorageTxn) -> anyhow::Result<bool> {
|
pub(super) fn undo(txn: &mut dyn StorageTxn) -> Result<bool> {
|
||||||
let mut applied = false;
|
let mut applied = false;
|
||||||
let mut popped = false;
|
let mut popped = false;
|
||||||
let mut local_ops = txn.operations()?;
|
let mut local_ops = txn.operations()?;
|
||||||
|
@ -40,7 +41,7 @@ mod tests {
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_apply_create() -> anyhow::Result<()> {
|
fn test_apply_create() -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let uuid1 = Uuid::new_v4();
|
let uuid1 = Uuid::new_v4();
|
||||||
let uuid2 = Uuid::new_v4();
|
let uuid2 = Uuid::new_v4();
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::errors::Result;
|
||||||
use crate::storage::{StorageTxn, TaskMap};
|
use crate::storage::{StorageTxn, TaskMap};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
@ -5,7 +6,7 @@ use std::collections::HashSet;
|
||||||
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
|
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
|
||||||
/// are not already in the working set but should be. The rebuild occurs in a single
|
/// are not already in the working set but should be. The rebuild occurs in a single
|
||||||
/// trasnsaction against the storage backend.
|
/// trasnsaction against the storage backend.
|
||||||
pub fn rebuild<F>(txn: &mut dyn StorageTxn, in_working_set: F, renumber: bool) -> anyhow::Result<()>
|
pub fn rebuild<F>(txn: &mut dyn StorageTxn, in_working_set: F, renumber: bool) -> Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(&TaskMap) -> bool,
|
F: Fn(&TaskMap) -> bool,
|
||||||
{
|
{
|
||||||
|
@ -69,16 +70,16 @@ mod test {
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn rebuild_working_set_renumber() -> anyhow::Result<()> {
|
fn rebuild_working_set_renumber() -> Result<()> {
|
||||||
rebuild_working_set(true)
|
rebuild_working_set(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn rebuild_working_set_no_renumber() -> anyhow::Result<()> {
|
fn rebuild_working_set_no_renumber() -> Result<()> {
|
||||||
rebuild_working_set(false)
|
rebuild_working_set(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rebuild_working_set(renumber: bool) -> anyhow::Result<()> {
|
fn rebuild_working_set(renumber: bool) -> Result<()> {
|
||||||
let mut db = TaskDb::new_inmemory();
|
let mut db = TaskDb::new_inmemory();
|
||||||
let mut uuids = vec![];
|
let mut uuids = vec![];
|
||||||
uuids.push(Uuid::new_v4());
|
uuids.push(Uuid::new_v4());
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue