[breaking] Add snapshot support to server storage

This refactors the storage API pretty substantially, and represents a
breaking change to the schema used by the sqlite storage
This commit is contained in:
Dustin J. Mitchell 2021-09-29 02:48:39 +00:00
parent 8d2be3b495
commit 2570956710
7 changed files with 312 additions and 65 deletions

View file

@ -0,0 +1,2 @@
- The SQLite server storage schema has changed incompatibly, in order to add support for snapshots.
As this is not currently ready for production usage, no migration path is provided except deleting the existing database.

1
Cargo.lock generated
View file

@ -3016,6 +3016,7 @@ dependencies = [
"actix-rt", "actix-rt",
"actix-web", "actix-web",
"anyhow", "anyhow",
"chrono",
"clap", "clap",
"env_logger 0.8.4", "env_logger 0.8.4",
"futures", "futures",

View file

@ -19,6 +19,7 @@ clap = "^2.33.0"
log = "^0.4.14" log = "^0.4.14"
env_logger = "^0.8.3" env_logger = "^0.8.3"
rusqlite = { version = "0.25", features = ["bundled"] } rusqlite = { version = "0.25", features = ["bundled"] }
chrono = { version = "^0.4.10", features = ["serde"] }
[dev-dependencies] [dev-dependencies]
actix-rt = "^1.1.1" actix-rt = "^1.1.1"

View file

@ -71,7 +71,6 @@ pub(crate) fn add_version<'a>(
// update the DB // update the DB
txn.add_version(client_key, version_id, parent_version_id, history_segment)?; txn.add_version(client_key, version_id, parent_version_id, history_segment)?;
txn.set_client_latest_version_id(client_key, version_id)?;
txn.commit()?; txn.commit()?;
Ok(AddVersionResult::Ok(version_id)) Ok(AddVersionResult::Ok(version_id))
@ -102,6 +101,7 @@ mod test {
let parent_version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec(); let history_segment = b"abcd".to_vec();
txn.new_client(client_key, version_id)?;
txn.add_version( txn.add_version(
client_key, client_key,
version_id, version_id,
@ -130,6 +130,7 @@ mod test {
let existing_parent_version_id = Uuid::new_v4(); let existing_parent_version_id = Uuid::new_v4();
let client = Client { let client = Client {
latest_version_id: existing_parent_version_id, latest_version_id: existing_parent_version_id,
snapshot: None,
}; };
assert_eq!( assert_eq!(

View file

@ -1,4 +1,4 @@
use super::{Client, Storage, StorageTxn, Uuid, Version}; use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Mutex, MutexGuard}; use std::sync::{Mutex, MutexGuard};
@ -6,6 +6,9 @@ struct Inner {
/// Clients, indexed by client_key /// Clients, indexed by client_key
clients: HashMap<Uuid, Client>, clients: HashMap<Uuid, Client>,
/// Snapshot data, indexed by client key
snapshots: HashMap<Uuid, Vec<u8>>,
/// Versions, indexed by (client_key, version_id) /// Versions, indexed by (client_key, version_id)
versions: HashMap<(Uuid, Uuid), Version>, versions: HashMap<(Uuid, Uuid), Version>,
@ -20,6 +23,7 @@ impl InMemoryStorage {
pub fn new() -> Self { pub fn new() -> Self {
Self(Mutex::new(Inner { Self(Mutex::new(Inner {
clients: HashMap::new(), clients: HashMap::new(),
snapshots: HashMap::new(),
versions: HashMap::new(), versions: HashMap::new(),
children: HashMap::new(), children: HashMap::new(),
})) }))
@ -46,23 +50,44 @@ impl<'a> StorageTxn for InnerTxn<'a> {
if self.0.clients.get(&client_key).is_some() { if self.0.clients.get(&client_key).is_some() {
return Err(anyhow::anyhow!("Client {} already exists", client_key)); return Err(anyhow::anyhow!("Client {} already exists", client_key));
} }
self.0 self.0.clients.insert(
.clients client_key,
.insert(client_key, Client { latest_version_id }); Client {
latest_version_id,
snapshot: None,
},
);
Ok(()) Ok(())
} }
fn set_client_latest_version_id( fn set_snapshot(
&mut self, &mut self,
client_key: Uuid, client_key: Uuid,
latest_version_id: Uuid, snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if let Some(client) = self.0.clients.get_mut(&client_key) { let mut client = self
client.latest_version_id = latest_version_id; .0
Ok(()) .clients
} else { .get_mut(&client_key)
Err(anyhow::anyhow!("Client {} does not exist", client_key)) .ok_or_else(|| anyhow::anyhow!("no such client"))?;
client.snapshot = Some(snapshot);
self.0.snapshots.insert(client_key, data);
Ok(())
}
fn get_snapshot_data(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>> {
// sanity check
let client = self.0.clients.get(&client_key);
let client = client.ok_or_else(|| anyhow::anyhow!("no such client"))?;
if Some(&version_id) != client.snapshot.as_ref().map(|snap| &snap.version_id) {
return Err(anyhow::anyhow!("unexpected snapshot_version_id"));
} }
Ok(self.0.snapshots.get(&client_key).cloned())
} }
fn get_version_by_parent( fn get_version_by_parent(
@ -102,12 +127,21 @@ impl<'a> StorageTxn for InnerTxn<'a> {
parent_version_id, parent_version_id,
history_segment, history_segment,
}; };
if let Some(client) = self.0.clients.get_mut(&client_key) {
client.latest_version_id = version_id;
if let Some(ref mut snap) = client.snapshot {
snap.versions_since += 1;
}
} else {
return Err(anyhow::anyhow!("Client {} does not exist", client_key));
}
self.0 self.0
.children .children
.insert((client_key, version.parent_version_id), version.version_id); .insert((client_key, parent_version_id), version_id);
self.0 self.0.versions.insert((client_key, version_id), version);
.versions
.insert((client_key, version.version_id), version);
Ok(()) Ok(())
} }
@ -119,15 +153,7 @@ impl<'a> StorageTxn for InnerTxn<'a> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use chrono::Utc;
#[test]
fn test_emtpy_dir() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let maybe_client = txn.get_client(Uuid::new_v4())?;
assert!(maybe_client.is_none());
Ok(())
}
#[test] #[test]
fn test_get_client_empty() -> anyhow::Result<()> { fn test_get_client_empty() -> anyhow::Result<()> {
@ -149,12 +175,25 @@ mod test {
let client = txn.get_client(client_key)?.unwrap(); let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id); assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let latest_version_id = Uuid::new_v4(); let latest_version_id = Uuid::new_v4();
txn.set_client_latest_version_id(client_key, latest_version_id)?; txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?;
let client = txn.get_client(client_key)?.unwrap(); let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id); assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 4,
};
txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.snapshot.unwrap(), snap);
Ok(()) Ok(())
} }
@ -177,6 +216,8 @@ mod test {
let version_id = Uuid::new_v4(); let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4();
let history_segment = b"abc".to_vec(); let history_segment = b"abc".to_vec();
txn.new_client(client_key, parent_version_id)?;
txn.add_version( txn.add_version(
client_key, client_key,
version_id, version_id,
@ -200,4 +241,47 @@ mod test {
Ok(()) Ok(())
} }
#[test]
fn test_snapshots() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
txn.new_client(client_key, Uuid::new_v4())?;
assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 3,
};
txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?;
assert_eq!(
txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(),
vec![9, 8, 9]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap));
let snap2 = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 10,
};
txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?;
assert_eq!(
txn.get_snapshot_data(client_key, snap2.version_id)?
.unwrap(),
vec![0, 2, 4, 6]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2));
// check that mismatched version is detected
assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err());
Ok(())
}
} }

View file

@ -1,4 +1,4 @@
use serde::{Deserialize, Serialize}; use chrono::{DateTime, Utc};
use uuid::Uuid; use uuid::Uuid;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
@ -10,12 +10,27 @@ pub use inmemory::InMemoryStorage;
mod sqlite; mod sqlite;
pub use self::sqlite::SqliteStorage; pub use self::sqlite::SqliteStorage;
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] #[derive(Clone, PartialEq, Debug)]
pub struct Client { pub struct Client {
/// The latest version for this client (may be the nil version)
pub latest_version_id: Uuid, pub latest_version_id: Uuid,
/// Data about the latest snapshot for this client
pub snapshot: Option<Snapshot>,
} }
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] #[derive(Clone, PartialEq, Debug)]
pub struct Snapshot {
/// ID of the version at which this snapshot was made
pub version_id: Uuid,
/// Timestamp at which this snapshot was set
pub timestamp: DateTime<Utc>,
/// Number of versions since this snapshot was made
pub versions_since: u32,
}
#[derive(Clone, PartialEq, Debug)]
pub struct Version { pub struct Version {
pub version_id: Uuid, pub version_id: Uuid,
pub parent_version_id: Uuid, pub parent_version_id: Uuid,
@ -29,13 +44,22 @@ pub trait StorageTxn {
/// Create a new client with the given latest_version_id /// Create a new client with the given latest_version_id
fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()>; fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()>;
/// Set the client's latest_version_id /// Set the client's most recent snapshot.
fn set_client_latest_version_id( fn set_snapshot(
&mut self, &mut self,
client_key: Uuid, client_key: Uuid,
latest_version_id: Uuid, snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
/// Get the data for the most recent snapshot. The version_id
/// is used to verify that the snapshot is for the correct version.
fn get_snapshot_data(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>>;
/// Get a version, indexed by parent version id /// Get a version, indexed by parent version id
fn get_version_by_parent( fn get_version_by_parent(
&mut self, &mut self,
@ -50,7 +74,9 @@ pub trait StorageTxn {
version_id: Uuid, version_id: Uuid,
) -> anyhow::Result<Option<Version>>; ) -> anyhow::Result<Option<Version>>;
/// Add a version (that must not already exist) /// Add a version (that must not already exist), and
/// - update latest_version_id
/// - increment snapshot.versions_since
fn add_version( fn add_version(
&mut self, &mut self,
client_key: Uuid, client_key: Uuid,

View file

@ -1,5 +1,6 @@
use super::{Client, Storage, StorageTxn, Uuid, Version}; use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version};
use anyhow::Context; use anyhow::Context;
use chrono::{TimeZone, Utc};
use rusqlite::types::{FromSql, ToSql}; use rusqlite::types::{FromSql, ToSql};
use rusqlite::{params, Connection, OptionalExtension}; use rusqlite::{params, Connection, OptionalExtension};
use std::path::Path; use std::path::Path;
@ -30,24 +31,6 @@ impl ToSql for StoredUuid {
} }
} }
/// Stores [`Client`] in SQLite
impl FromSql for Client {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
let o: Client = serde_json::from_str(value.as_str()?)
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
Ok(o)
}
}
/// Parses Operation stored as JSON in string column
impl ToSql for Client {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
let s = serde_json::to_string(&self)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
Ok(s.into())
}
}
/// An on-disk storage backend which uses SQLite /// An on-disk storage backend which uses SQLite
pub struct SqliteStorage { pub struct SqliteStorage {
db_file: std::path::PathBuf, db_file: std::path::PathBuf,
@ -69,12 +52,19 @@ impl SqliteStorage {
let txn = con.transaction()?; let txn = con.transaction()?;
let queries = vec![ let queries = vec![
"CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);", "CREATE TABLE IF NOT EXISTS clients (
client_key STRING PRIMARY KEY,
latest_version_id STRING,
snapshot_version_id STRING,
versions_since_snapshot INTEGER,
snapshot_timestamp INTEGER,
snapshot BLOB);",
"CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_key STRING, parent_version_id STRING, history_segment BLOB);", "CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_key STRING, parent_version_id STRING, history_segment BLOB);",
"CREATE INDEX IF NOT EXISTS versions_by_parent ON versions (parent_version_id);", "CREATE INDEX IF NOT EXISTS versions_by_parent ON versions (parent_version_id);",
]; ];
for q in queries { for q in queries {
txn.execute(q, []).context("Creating table")?; txn.execute(q, [])
.context("Error while creating SQLite tables")?;
} }
txn.commit()?; txn.commit()?;
} }
@ -126,7 +116,7 @@ impl Txn {
}, },
) )
.optional() .optional()
.context("Get version query")?; .context("Error getting version")?;
Ok(r) Ok(r)
} }
} }
@ -136,17 +126,42 @@ impl StorageTxn for Txn {
let t = self.get_txn()?; let t = self.get_txn()?;
let result: Option<Client> = t let result: Option<Client> = t
.query_row( .query_row(
"SELECT latest_version_id FROM clients WHERE client_key = ? LIMIT 1", "SELECT
latest_version_id,
snapshot_timestamp,
versions_since_snapshot,
snapshot_version_id
FROM clients
WHERE client_key = ?
LIMIT 1",
[&StoredUuid(client_key)], [&StoredUuid(client_key)],
|r| { |r| {
let latest_version_id: StoredUuid = r.get(0)?; let latest_version_id: StoredUuid = r.get(0)?;
let snapshot_timestamp: Option<i64> = r.get(1)?;
let versions_since_snapshot: Option<u32> = r.get(2)?;
let snapshot_version_id: Option<StoredUuid> = r.get(3)?;
// if all of the relevant fields are non-NULL, return a snapshot
let snapshot = match (
snapshot_timestamp,
versions_since_snapshot,
snapshot_version_id,
) {
(Some(ts), Some(vs), Some(v)) => Some(Snapshot {
version_id: v.0,
timestamp: Utc.timestamp(ts, 0),
versions_since: vs,
}),
_ => None,
};
Ok(Client { Ok(Client {
latest_version_id: latest_version_id.0, latest_version_id: latest_version_id.0,
snapshot,
}) })
}, },
) )
.optional() .optional()
.context("Get client query")?; .context("Error getting client")?;
Ok(result) Ok(result)
} }
@ -158,18 +173,66 @@ impl StorageTxn for Txn {
"INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)", "INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)",
params![&StoredUuid(client_key), &StoredUuid(latest_version_id)], params![&StoredUuid(client_key), &StoredUuid(latest_version_id)],
) )
.context("Create client query")?; .context("Error creating/updating client")?;
t.commit()?; t.commit()?;
Ok(()) Ok(())
} }
fn set_client_latest_version_id( fn set_snapshot(
&mut self, &mut self,
client_key: Uuid, client_key: Uuid,
latest_version_id: Uuid, snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Implementation is same as new_client let t = self.get_txn()?;
self.new_client(client_key, latest_version_id)
t.execute(
"UPDATE clients
SET
snapshot_version_id = ?,
snapshot_timestamp = ?,
versions_since_snapshot = ?,
snapshot = ?
WHERE client_key = ?",
params![
&StoredUuid(snapshot.version_id),
snapshot.timestamp.timestamp(),
snapshot.versions_since,
data,
&StoredUuid(client_key),
],
)
.context("Error creating/updating snapshot")?;
t.commit()?;
Ok(())
}
fn get_snapshot_data(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>> {
let t = self.get_txn()?;
let r = t
.query_row(
"SELECT snapshot, snapshot_version_id FROM clients WHERE client_key = ?",
params![&StoredUuid(client_key)],
|r| {
let v: StoredUuid = r.get("snapshot_version_id")?;
let d: Vec<u8> = r.get("snapshot")?;
Ok((v.0, d))
},
)
.optional()
.context("Error getting snapshot")?;
r.map(|(v, d)| {
if v != version_id {
return Err(anyhow::anyhow!("unexpected snapshot_version_id"));
}
Ok(d)
})
.transpose()
} }
fn get_version_by_parent( fn get_version_by_parent(
@ -182,6 +245,7 @@ impl StorageTxn for Txn {
client_key, client_key,
parent_version_id) parent_version_id)
} }
fn get_version( fn get_version(
&mut self, &mut self,
client_key: Uuid, client_key: Uuid,
@ -209,9 +273,19 @@ impl StorageTxn for Txn {
StoredUuid(client_key), StoredUuid(client_key),
StoredUuid(parent_version_id), StoredUuid(parent_version_id),
history_segment history_segment
], ]
) )
.context("Add version query")?; .context("Error adding version")?;
t.execute(
"UPDATE clients
SET
latest_version_id = ?,
versions_since_snapshot = versions_since_snapshot + 1
WHERE client_key = ?",
params![StoredUuid(version_id), StoredUuid(client_key),],
)
.context("Error updating client for new version")?;
t.commit()?; t.commit()?;
Ok(()) Ok(())
} }
@ -228,6 +302,7 @@ impl StorageTxn for Txn {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use chrono::DateTime;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use tempfile::TempDir; use tempfile::TempDir;
@ -264,12 +339,25 @@ mod test {
let client = txn.get_client(client_key)?.unwrap(); let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id); assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let latest_version_id = Uuid::new_v4(); let latest_version_id = Uuid::new_v4();
txn.set_client_latest_version_id(client_key, latest_version_id)?; txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?;
let client = txn.get_client(client_key)?.unwrap(); let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id); assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2014-11-28T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 4,
};
txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.snapshot.unwrap(), snap);
Ok(()) Ok(())
} }
@ -317,4 +405,48 @@ mod test {
Ok(()) Ok(())
} }
#[test]
fn test_snapshots() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(&tmp_dir.path())?;
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
txn.new_client(client_key, Uuid::new_v4())?;
assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2013-10-08T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 3,
};
txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?;
assert_eq!(
txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(),
vec![9, 8, 9]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap));
let snap2 = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2014-11-28T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 10,
};
txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?;
assert_eq!(
txn.get_snapshot_data(client_key, snap2.version_id)?
.unwrap(),
vec![0, 2, 4, 6]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2));
// check that mismatched version is detected
assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err());
Ok(())
}
} }