implement a local sync server

This commit is contained in:
Dustin J. Mitchell 2020-11-25 19:13:32 -05:00
parent 8f7e2e2790
commit 3537db9bbe
9 changed files with 249 additions and 72 deletions

View file

@ -51,7 +51,9 @@ impl CommandInvocation {
))
}
pub(super) fn get_server(&self) -> impl server::Server {
server::LocalServer::new()
pub(super) fn get_server(&self) -> Fallible<impl server::Server> {
Ok(server::LocalServer::new(Path::new(
"/tmp/task-sync-server",
))?)
}
}

View file

@ -22,7 +22,7 @@ define_subcommand! {
subcommand_invocation! {
fn run(&self, command: &CommandInvocation) -> Fallible<()> {
let mut replica = command.get_replica();
let mut server = command.get_server();
let mut server = command.get_server()?;
replica.sync(&mut server)?;
Ok(())
}

View file

@ -29,6 +29,7 @@ pub mod server;
mod task;
mod taskdb;
pub mod taskstorage;
mod utils;
pub use replica::Replica;
pub use task::Priority;

View file

@ -1,33 +1,108 @@
use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID,
};
use crate::utils::Key;
use failure::Fallible;
use std::collections::HashMap;
use kv::msgpack::Msgpack;
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
use serde::{Deserialize, Serialize};
use std::path::Path;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Debug)]
struct Version {
version_id: VersionId,
parent_version_id: VersionId,
history_segment: HistorySegment,
}
pub struct LocalServer {
latest_version_id: VersionId,
pub struct LocalServer<'t> {
store: Store,
// NOTE: indexed by parent_version_id!
versions: HashMap<VersionId, Version>,
versions_bucket: Bucket<'t, Key, ValueBuf<Msgpack<Version>>>,
latest_version_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
}
impl LocalServer {
impl<'t> LocalServer<'t> {
/// A test server has no notion of clients, signatures, encryption, etc.
pub fn new() -> LocalServer {
LocalServer {
latest_version_id: NO_VERSION_ID,
versions: HashMap::new(),
pub fn new(directory: &Path) -> Fallible<LocalServer> {
let mut config = Config::default(directory);
config.bucket("versions", None);
config.bucket("numbers", None);
config.bucket("latest_version", None);
config.bucket("operations", None);
config.bucket("working_set", None);
let store = Store::new(config)?;
// versions are stored indexed by VersionId (uuid)
let versions_bucket = store.bucket::<Key, ValueBuf<Msgpack<Version>>>(Some("versions"))?;
// this bucket contains the latest version at key 0
let latest_version_bucket =
store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("latest_version"))?;
Ok(LocalServer {
store,
versions_bucket,
latest_version_bucket,
})
}
fn get_latest_version_id(&mut self) -> Fallible<VersionId> {
let txn = self.store.read_txn()?;
let base_version = match txn.get(&self.latest_version_bucket, 0.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(NO_VERSION_ID),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version as VersionId)
}
fn set_latest_version_id(&mut self, version_id: VersionId) -> Fallible<()> {
let mut txn = self.store.write_txn()?;
txn.set(
&self.latest_version_bucket,
0.into(),
Msgpack::to_value_buf(version_id as Uuid)?,
)?;
txn.commit()?;
Ok(())
}
fn get_version_by_parent_version_id(
&mut self,
parent_version_id: VersionId,
) -> Fallible<Option<Version>> {
let txn = self.store.read_txn()?;
let version = match txn.get(&self.versions_bucket, parent_version_id.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(None),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(Some(version))
}
fn add_version_by_parent_version_id(&mut self, version: Version) -> Fallible<()> {
let mut txn = self.store.write_txn()?;
txn.set(
&self.versions_bucket,
version.parent_version_id.into(),
Msgpack::to_value_buf(version)?,
)?;
txn.commit()?;
Ok(())
}
}
impl Server for LocalServer {
impl<'t> Server for LocalServer<'t> {
// TODO: better transaction isolation for add_version (gets and sets should be in the same
// transaction)
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(
@ -39,33 +114,27 @@ impl Server for LocalServer {
// no signature validation
// check the parent_version_id for linearity
if self.latest_version_id != NO_VERSION_ID {
if parent_version_id != self.latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(
self.latest_version_id,
));
}
let latest_version_id = self.get_latest_version_id()?;
if latest_version_id != NO_VERSION_ID && parent_version_id != latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(latest_version_id));
}
// invent a new ID for this version
let version_id = Uuid::new_v4();
self.versions.insert(
self.add_version_by_parent_version_id(Version {
version_id,
parent_version_id,
Version {
version_id,
parent_version_id,
history_segment,
},
);
self.latest_version_id = version_id;
history_segment,
})?;
self.set_latest_version_id(version_id)?;
Ok(AddVersionResult::Ok(version_id))
}
/// Get a vector of all versions after `since_version`
fn get_child_version(&self, parent_version_id: VersionId) -> Fallible<GetVersionResult> {
if let Some(version) = self.versions.get(&parent_version_id) {
fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible<GetVersionResult> {
if let Some(version) = self.get_version_by_parent_version_id(parent_version_id)? {
Ok(GetVersionResult::Version {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
@ -77,3 +146,93 @@ impl Server for LocalServer {
}
}
#[cfg(test)]
mod test {
use super::*;
use failure::Fallible;
use tempdir::TempDir;
#[test]
fn test_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let child_version = server.get_child_version(NO_VERSION_ID)?;
assert_eq!(child_version, GetVersionResult::NoSuchVersion);
Ok(())
}
#[test]
fn test_add_zero_base() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec();
match server.add_version(NO_VERSION_ID, history.clone())? {
AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version")
}
AddVersionResult::Ok(version_id) => {
let new_version = server.get_child_version(NO_VERSION_ID)?;
assert_eq!(
new_version,
GetVersionResult::Version {
version_id,
parent_version_id: NO_VERSION_ID,
history_segment: history,
}
);
}
}
Ok(())
}
#[test]
fn test_add_nonzero_base() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec();
let parent_version_id = Uuid::new_v4() as VersionId;
// This is OK because the server has no latest_version_id yet
match server.add_version(parent_version_id, history.clone())? {
AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version")
}
AddVersionResult::Ok(version_id) => {
let new_version = server.get_child_version(parent_version_id)?;
assert_eq!(
new_version,
GetVersionResult::Version {
version_id,
parent_version_id,
history_segment: history,
}
);
}
}
Ok(())
}
#[test]
fn test_add_nonzero_base_forbidden() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec();
let parent_version_id = Uuid::new_v4() as VersionId;
// add a version
if let AddVersionResult::ExpectedParentVersion(_) =
server.add_version(parent_version_id, history.clone())?
{
panic!("should have accepted the version")
}
// then add another, not based on that one
if let AddVersionResult::Ok(_) = server.add_version(parent_version_id, history.clone())? {
panic!("should not have accepted the version")
}
Ok(())
}
}

View file

@ -64,7 +64,7 @@ impl Server for TestServer {
}
/// Get a vector of all versions after `since_version`
fn get_child_version(&self, parent_version_id: VersionId) -> Fallible<GetVersionResult> {
fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible<GetVersionResult> {
if let Some(version) = self.versions.get(&parent_version_id) {
Ok(GetVersionResult::Version {
version_id: version.version_id,

View file

@ -12,6 +12,7 @@ pub const NO_VERSION_ID: VersionId = Uuid::nil();
pub type HistorySegment = Vec<u8>;
/// VersionAdd is the response type from [`crate:server::Server::add_version`].
#[derive(Debug, PartialEq)]
pub enum AddVersionResult {
/// OK, version added with the given ID
Ok(VersionId),
@ -20,6 +21,7 @@ pub enum AddVersionResult {
}
/// A version as downloaded from the server
#[derive(Debug, PartialEq)]
pub enum GetVersionResult {
/// No such version exists
NoSuchVersion,
@ -42,5 +44,5 @@ pub trait Server {
) -> Fallible<AddVersionResult>;
/// Get the version with the given parent VersionId
fn get_child_version(&self, parent_version_id: VersionId) -> Fallible<GetVersionResult>;
fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible<GetVersionResult>;
}

View file

@ -1,7 +1,7 @@
use crate::errors::Error;
use crate::server::{AddVersionResult, GetVersionResult, Server};
use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use failure::{format_err, Fallible};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::str;
@ -168,7 +168,9 @@ impl TaskDB {
let mut txn = self.storage.txn()?;
// retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server)
// replicas trying to sync to the same server). If the server insists on the same base
// version twice, then we have diverged.
let mut requested_parent_version_id = None;
loop {
let mut base_version_id = txn.base_version()?;
@ -189,6 +191,7 @@ impl TaskDB {
txn.set_base_version(version_id)?;
base_version_id = version_id;
} else {
println!("no child versions of {:?}", base_version_id);
// at the moment, no more child versions, so we can try adding our own
break;
}
@ -196,6 +199,7 @@ impl TaskDB {
let operations: Vec<Operation> = txn.operations()?.to_vec();
if operations.is_empty() {
println!("no changes to push to server");
// nothing to sync back to the server..
break;
}
@ -216,7 +220,14 @@ impl TaskDB {
"new version rejected; must be based on {:?}",
parent_version_id
);
// ..continue the outer loop
if let Some(requested) = requested_parent_version_id {
if parent_version_id == requested {
return Err(format_err!(
"Server's task history has diverged from this replica"
));
}
}
requested_parent_version_id = Some(parent_version_id);
}
}
}

View file

@ -1,50 +1,13 @@
use crate::taskstorage::{
Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION,
};
use crate::utils::Key;
use failure::Fallible;
use kv::msgpack::Msgpack;
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
use std::convert::TryInto;
use std::path::Path;
use uuid::Uuid;
/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form
/// of a UUID.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct Key(uuid::Bytes);
impl From<&[u8]> for Key {
fn from(bytes: &[u8]) -> Key {
Key(bytes.try_into().unwrap())
}
}
impl From<&Uuid> for Key {
fn from(uuid: &Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Uuid> for Key {
fn from(uuid: Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Key> for Uuid {
fn from(key: Key) -> Uuid {
Uuid::from_bytes(key.0)
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
/// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate.
pub struct KVStorage<'t> {
store: Store,

39
taskchampion/src/utils.rs Normal file
View file

@ -0,0 +1,39 @@
use std::convert::TryInto;
use uuid::Uuid;
/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form
/// of a UUID.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct Key(uuid::Bytes);
impl From<&[u8]> for Key {
fn from(bytes: &[u8]) -> Key {
Key(bytes.try_into().unwrap())
}
}
impl From<&Uuid> for Key {
fn from(uuid: &Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Uuid> for Key {
fn from(uuid: Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Key> for Uuid {
fn from(key: Key) -> Uuid {
Uuid::from_bytes(key.0)
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}