actually support synchronization

This commit is contained in:
Dustin J. Mitchell 2019-12-28 22:46:10 -05:00
parent 0a2293a9c5
commit 59f4e6abd7
7 changed files with 219 additions and 2 deletions

2
Cargo.lock generated
View file

@ -73,6 +73,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -163,6 +164,7 @@ dependencies = [
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]

View file

@ -8,8 +8,9 @@ edition = "2018"
[dependencies]
uuid = { version = "0.8.1", features = ["serde", "v4"] }
serde = "1.0.104"
serde_json = "1.0"
chrono = "0.4.10"
chrono = { version = "0.4.10", features = ["serde"] }
failure = {version = "0.1.5", features = ["derive"] }
[dev-dependencies]

View file

@ -3,7 +3,9 @@
mod errors;
mod operation;
mod server;
mod taskdb;
pub use operation::Operation;
pub use server::Server;
pub use taskdb::DB;

View file

@ -1,8 +1,9 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
#[derive(PartialEq, Clone, Debug)]
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
Create {
uuid: Uuid,

87
src/server.rs Normal file
View file

@ -0,0 +1,87 @@
use std::collections::HashMap;
type Blob = Vec<u8>;
struct User {
// versions, indexed at v-1
versions: Vec<Blob>,
snapshots: HashMap<u64, Blob>,
}
pub struct Server {
users: HashMap<String, User>,
}
pub enum VersionAdd {
// OK, version added
Ok,
// Rejected, must be based on the the given version
ExpectedVersion(u64),
}
impl User {
fn new() -> User {
User {
versions: vec![],
snapshots: HashMap::new(),
}
}
fn get_versions(&self, since_version: u64) -> Vec<Blob> {
let last_version = self.versions.len();
if last_version == since_version as usize {
return vec![];
}
self.versions[since_version as usize..last_version]
.iter()
.map(|r| r.clone())
.collect::<Vec<Blob>>()
}
fn add_version(&mut self, version: u64, blob: Blob) -> VersionAdd {
// of by one here: client wants to send version 1 first
let expected_version = self.versions.len() as u64 + 1;
if version != expected_version {
return VersionAdd::ExpectedVersion(expected_version);
}
self.versions.push(blob);
VersionAdd::Ok
}
fn add_snapshot(&mut self, version: u64, blob: Blob) {
self.snapshots.insert(version, blob);
}
}
impl Server {
pub fn new() -> Server {
Server {
users: HashMap::new(),
}
}
fn get_user_mut(&mut self, username: &str) -> &mut User {
self.users
.entry(username.to_string())
.or_insert_with(User::new)
}
/// Get a vector of all versions after `since_version`
pub fn get_versions(&self, username: &str, since_version: u64) -> Vec<Blob> {
self.users
.get(username)
.map(|user| user.get_versions(since_version))
.unwrap_or_else(|| vec![])
}
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
pub fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd {
self.get_user_mut(username).add_version(version, blob)
}
pub fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) {
self.get_user_mut(username).add_snapshot(version, blob);
}
}

View file

@ -1,7 +1,10 @@
use crate::operation::Operation;
use crate::server::{Server, VersionAdd};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::str;
use uuid::Uuid;
#[derive(PartialEq, Debug, Clone)]
@ -18,6 +21,12 @@ pub struct DB {
operations: Vec<Operation>,
}
#[derive(Serialize, Deserialize, Debug)]
struct Version {
version: u64,
operations: Vec<Operation>,
}
impl DB {
/// Create a new, empty database
pub fn new() -> DB {
@ -60,6 +69,62 @@ impl DB {
pub fn tasks(&self) -> &HashMap<Uuid, HashMap<String, Value>> {
&self.tasks
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut Server) {
loop {
// first pull changes and "rebase" on top of them
let new_versions = server.get_versions(username, self.base_version);
for version_blob in new_versions {
let version_str = str::from_utf8(&version_blob).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
assert_eq!(version.version, self.base_version + 1);
println!("applying version {:?} from server", version.version);
self.apply_version(version);
}
if self.operations.len() == 0 {
break;
}
// now make a version of our local changes and push those
let new_version = Version {
version: self.base_version + 1,
operations: self.operations.clone(),
};
let new_version_str = serde_json::to_string(&new_version).unwrap();
println!("sending version {:?} to server", new_version.version);
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())
{
break;
}
}
}
fn apply_version(&mut self, mut version: Version) {
for server_op in version.operations.drain(..) {
let mut new_local_ops = Vec::with_capacity(self.operations.len());
let mut svr_op = Some(server_op);
for local_op in self.operations.drain(..) {
if let Some(o) = svr_op {
let (new_server_op, new_local_op) = Operation::transform(o, local_op);
svr_op = new_server_op;
if let Some(o) = new_local_op {
new_local_ops.push(o);
}
} else {
new_local_ops.push(local_op);
}
}
if let Some(o) = svr_op {
self.apply(o);
}
self.operations = new_local_ops;
}
self.base_version = version.version;
}
}
#[cfg(test)]

59
tests/sync.rs Normal file
View file

@ -0,0 +1,59 @@
use chrono::Utc;
use ot::{Operation, Server, DB};
use uuid::Uuid;
#[test]
fn test_sync() {
let mut server = Server::new();
let mut db1 = DB::new();
db1.sync("me", &mut server);
let mut db2 = DB::new();
db2.sync("me", &mut server);
// make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4();
db1.apply(Operation::Create { uuid: uuid1 });
db1.apply(Operation::Update {
uuid: uuid1,
property: "title".into(),
value: "my first task".into(),
timestamp: Utc::now(),
});
let uuid2 = Uuid::new_v4();
db2.apply(Operation::Create { uuid: uuid2 });
db2.apply(Operation::Update {
uuid: uuid2,
property: "title".into(),
value: "my second task".into(),
timestamp: Utc::now(),
});
// and synchronize those around
db1.sync("me", &mut server);
db2.sync("me", &mut server);
db1.sync("me", &mut server);
assert_eq!(db1.tasks(), db2.tasks());
// now make updates to the same task on both sides
db1.apply(Operation::Update {
uuid: uuid2,
property: "priority".into(),
value: "H".into(),
timestamp: Utc::now(),
});
db2.apply(Operation::Update {
uuid: uuid2,
property: "project".into(),
value: "personal".into(),
timestamp: Utc::now(),
});
// and synchronize those around
db1.sync("me", &mut server);
db2.sync("me", &mut server);
db1.sync("me", &mut server);
assert_eq!(db1.tasks(), db2.tasks());
}