make server operations fallible

This commit is contained in:
Dustin J. Mitchell 2020-11-24 18:12:48 -05:00
parent 549d3b9f6d
commit 75edd2773f
3 changed files with 27 additions and 22 deletions

View file

@ -1,4 +1,5 @@
use crate::server::{Blob, Server, VersionAdd};
use failure::Fallible;
use std::collections::HashMap;
pub(crate) struct TestServer {
@ -27,21 +28,22 @@ impl TestServer {
impl Server for TestServer {
/// Get a vector of all versions after `since_version`
fn get_versions(&self, username: &str, since_version: u64) -> Vec<Blob> {
self.users
.get(username)
.map(|user| user.get_versions(since_version))
.unwrap_or_else(|| vec![])
fn get_versions(&self, username: &str, since_version: u64) -> Fallible<Vec<Blob>> {
if let Some(user) = self.users.get(username) {
user.get_versions(since_version)
} else {
Ok(vec![])
}
}
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd {
fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<VersionAdd> {
self.get_user_mut(username).add_version(version, blob)
}
fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) {
self.get_user_mut(username).add_snapshot(version, blob);
fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()> {
self.get_user_mut(username).add_snapshot(version, blob)
}
}
@ -53,29 +55,30 @@ impl User {
}
}
fn get_versions(&self, since_version: u64) -> Vec<Blob> {
fn get_versions(&self, since_version: u64) -> Fallible<Vec<Blob>> {
let last_version = self.versions.len();
if last_version == since_version as usize {
return vec![];
return Ok(vec![]);
}
self.versions[since_version as usize..last_version]
Ok(self.versions[since_version as usize..last_version]
.iter()
.map(|r| r.clone())
.collect::<Vec<Blob>>()
.collect::<Vec<Blob>>())
}
fn add_version(&mut self, version: u64, blob: Blob) -> VersionAdd {
fn add_version(&mut self, version: u64, blob: Blob) -> Fallible<VersionAdd> {
// of by one here: client wants to send version 1 first
let expected_version = self.versions.len() as u64 + 1;
if version != expected_version {
return VersionAdd::ExpectedVersion(expected_version);
return Ok(VersionAdd::ExpectedVersion(expected_version));
}
self.versions.push(blob);
VersionAdd::Ok
Ok(VersionAdd::Ok)
}
fn add_snapshot(&mut self, version: u64, blob: Blob) {
fn add_snapshot(&mut self, version: u64, blob: Blob) -> Fallible<()> {
self.snapshots.insert(version, blob);
Ok(())
}
}

View file

@ -1,3 +1,5 @@
use failure::Fallible;
/// A Blob is a hunk of encoded data that is sent to the server. The server does not interpret
/// this data at all.
pub type Blob = Vec<u8>;
@ -13,12 +15,12 @@ pub enum VersionAdd {
/// A value implementing this trait can act as a server against which a replica can sync.
pub trait Server {
/// Get a vector of all versions after `since_version`
fn get_versions(&self, username: &str, since_version: u64) -> Vec<Blob>;
fn get_versions(&self, username: &str, since_version: u64) -> Fallible<Vec<Blob>>;
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd;
fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<VersionAdd>;
fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob);
/// TODO: undefined yet
fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()>;
}

View file

@ -172,7 +172,7 @@ impl TaskDB {
// replicas trying to sync to the same server)
loop {
// first pull changes and "rebase" on top of them
let new_versions = server.get_versions(username, txn.base_version()?);
let new_versions = server.get_versions(username, txn.base_version()?)?;
for version_blob in new_versions {
let version_str = str::from_utf8(&version_blob).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
@ -196,7 +196,7 @@ impl TaskDB {
let new_version_str = serde_json::to_string(&new_version).unwrap();
println!("sending version {:?} to server", new_version.version);
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())
server.add_version(username, new_version.version, new_version_str.into())?
{
txn.set_base_version(new_version.version)?;
txn.set_operations(vec![])?;