Merge pull request #289 from taskchampion/issue23

Add support for snapshots
This commit is contained in:
Dustin J. Mitchell 2021-10-09 09:40:57 -04:00 committed by GitHub
commit f109056340
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1742 additions and 190 deletions

View file

@ -0,0 +1,2 @@
- The SQLite server storage schema has changed incompatibly, in order to add support for snapshots.
As this is not currently ready for production usage, no migration path is provided except deleting the existing database.

1
Cargo.lock generated
View file

@ -3016,6 +3016,7 @@ dependencies = [
"actix-rt",
"actix-web",
"anyhow",
"chrono",
"clap",
"env_logger 0.8.4",
"futures",

View file

@ -19,5 +19,6 @@
* [Tasks](./tasks.md)
* [Synchronization and the Sync Server](./sync.md)
* [Synchronization Model](./sync-model.md)
* [Snapshots](./snapshots.md)
* [Server-Replica Protocol](./sync-protocol.md)
* [Planned Functionality](./plans.md)

48
docs/src/snapshots.md Normal file
View file

@ -0,0 +1,48 @@
# Snapshots
The basic synchronization model described in the previous page has a few shortcomings:
* servers must store an ever-increasing quantity of versions
* a new replica must download all versions since the beginning in order to derive the current state
Snapshots allow TaskChampion to avoid both of these issues.
A snapshot is a copy of the task database at a specific version.
It is created by a replica, encrypted, and stored on the server.
A new replica can simply download a recent snapshot and apply any additional versions synchronized since that snapshot was made.
Servers can delete and reclaim space used by older versions, as long as newer snapshots are available.
## Snapshot Heuristics
A server implementation must answer a few questions:
* How often should snapshots be made?
* When can versions be deleted?
* When can snapshots be deleted?
A critical invariant is that at least one snapshot must exist for any database that does not have a child of the nil version.
This ensures that a new replica can always derive the latest state.
Aside from that invariant, the server implementation can vary in its answers to these questions, with the following considerations:
Snapshots should be made frequently enough that a new replica can initialize quickly.
Existing replicas will fail to synchronize if they request a child version that has been deleted.
This failure can cause data loss if the replica had local changes.
It's conceivable that replicas may not sync for weeks or months if, for example, they are located on a home computer while the user is on holiday.
## Requesting New Snapshots
The server requests snapshots from replicas, indicating an urgency for the request.
Some replicas, such as those running on PCs or servers, can produce a snapshot even at low urgency.
Other replicas, in more restricted environments such as mobile devices, will only produce a snapshot at high urgency.
This saves resources in these restricted environments.
A snapshot must be made on a replica with no unsynchronized operations.
As such, it only makes sense to request a snapshot in response to a successful AddVersion request.
## Handling Deleted Versions
When a replica requests a child version, the response must distinguish two cases:
1. No such child version exists because the replica is up-to-date.
1. No such child version exists because it has been deleted, and the replica must re-initialize itself.
The details of this logic are covered in the [Server-Replica Protocol](./sync-protocol.md).

View file

@ -7,26 +7,36 @@ The protocol builds on the model presented in the previous chapter, and in parti
## Clients
From the server's perspective, replicas are indistinguishable, so this protocol uses the term "client" to refer generically to all replicas replicating a single task history.
From the server's perspective, replicas accessing the same task history are indistinguishable, so this protocol uses the term "client" to refer generically to all replicas replicating a single task history.
Each client is identified and authenticated with a "client key", known only to the server and to the replicas replicating the task history.
## Server
For each client, the server is responsible for storing the task history, in the form of a branch-free sequence of versions.
It also stores the latest snapshot, if any exists.
* versions: a set of {versionId: UUID, parentVersionId: UUID, historySegment: bytes}
* latestVersionId: UUID
* snapshotVersionId: UUID
* snapshot: bytes
For each client, it stores a set of versions as well as the latest version ID, defaulting to the nil UUID.
Each version has a version ID, a parent version ID, and a history segment (opaque data containing the operations for that version).
The server should maintain the following invariants:
The server should maintain the following invariants for each client:
1. Given a client c, c.latestVersion is nil or exists in the set of versions.
1. Given versions v1 and v2 for a client, with v1.versionId != v2.versionId and v1.parentVersionId != nil, v1.parentVersionId != v2.parentVersionId.
1. latestVersionId is nil or exists in the set of versions.
2. Given versions v1 and v2 for a client, with v1.versionId != v2.versionId and v1.parentVersionId != nil, v1.parentVersionId != v2.parentVersionId.
In other words, versions do not branch.
3. If snapshotVersionId is nil, then there is a version with parentVersionId == nil.
4. If snapshotVersionId is not nil, then there is a version with parentVersionId = snapshotVersionId.
Note that versions form a linked list beginning with the version stored in he client.
Note that versions form a linked list beginning with the latestVersionId stored for the client.
This linked list need not continue back to a version with v.parentVersionId = nil.
It may end at any point when v.parentVersionId is not found in the set of Versions.
This observation allows the server to discard older versions.
The third invariant prevents the server from discarding versions if there is no snapshot.
The fourth invariant prevents the server from discarding versions newer than the snapshot.
## Transactions
@ -45,6 +55,7 @@ If it already has one or more versions for the client, then it accepts the versi
If the version is accepted, the server generates a new version ID for it.
The version is added to the set of versions for the client, the client's latest version ID is set to the new version ID.
The new version ID is returned in the response to the client.
The response may also include a request for a snapshot, with associated urgency.
If the version is not accepted, the server makes no changes, but responds to the client with a conflict indication containing the latest version ID.
The client may then "rebase" its operations and try again.
@ -61,7 +72,32 @@ If found, it returns the version's
* parent version ID (matching that in the request), and
* history segment.
If not found, the server returns a negative response.
The response is either a version (success, _not-found_, or _gone_, as determined by the first of the following to apply:
* If a version with parentVersionId equal to the requested parentVersionId exists, it is returned.
* If the requested parentVersionId is the nil UUID ..
* ..and snapshotVersionId is nil, the response is _not-found_ (the client has no versions).
* ..and snapshotVersionId is not nil, the response is _gone_ (the first version has been deleted).
* If a version with versionId equal to the requested parentVersionId exists, the response is _not-found_ (the client is up-to-date)
* Otherwise, the response is _gone_ (the requested version has been deleted).
### AddSnapshot
The AddSnapshot transaction requests that the server store a new snapshot, generated by the client.
The request contains the following:
* version ID at which the snapshot was made
* snapshot data (opaque to the server)
The server should validate that the snapshot is for an existing version and is newer than any existing snapshot.
It may also validate that the snapshot is for a "recent" version (e.g., one of the last 5 versions).
If a snapshot already exists for the given version, the server may keep or discard the new snapshot but should return a success indication to the client.
The server response is empty.
### GetSnapshot
The GetSnapshot transaction requests that the server provide the latest snapshot.
The response contains the snapshot version ID and the snapshot data, if those exist.
## HTTP Representation
@ -79,6 +115,7 @@ The content-type must be `application/vnd.taskchampion.history-segment`.
The success response is a 200 OK with an empty body.
The new version ID appears in the `X-Version-Id` header.
If included, a snapshot request appears in the `X-Snapshot-Request` header with value `urgency=low` or `urgency=high`.
On conflict, the response is a 409 CONFLICT with an empty body.
The expected parent version ID appears in the `X-Parent-Version-Id` header.
@ -88,8 +125,40 @@ Other error responses (4xx or 5xx) may be returned and should be treated appropr
### GetChildVersion
The request is a `GET` to `<origin>/v1/client/get-child-version/<parentVersionId>`.
The response is 404 NOT FOUND if no such version exists.
Otherwise, the response is a 200 OK.
The response is determined as described above.
The _not-found_ response is 404 NOT FOUND.
The _gone_ response is 410 GONE.
Neither has a response body.
On success, the response is a 200 OK.
The version's history segment is returned in the response body, with content-type `application/vnd.taskchampion.history-segment`.
The version ID appears in the `X-Version-Id` header.
The response body may be encoded, in accordance with any `Accept-Encoding` header in the request.
On failure, a client should treat a 404 NOT FOUND as indicating that it is up-to-date.
Clients should treat a 410 GONE as a synchronization error.
If the client has pending changes to send to the server, based on a now-removed version, then those changes cannot be reconciled and will be lost.
The client should, optionally after consulting the user, download and apply the latest snapshot.
### AddSnapshot
The request is a `POST` to `<origin>/v1/client/add-snapshot/<versionId>`.
The request body contains the snapshot data, optionally encoded using any encoding supported by actix-web.
The content-type must be `application/vnd.taskchampion.snapshot`.
If the version is invalid, as described above, the response should be 400 BAD REQUEST.
The server response should be 200 OK on success.
### GetSnapshot
The request is a `GET` to `<origin>/v1/client/snapshot`.
The response is a 200 OK.
The snapshot is returned in the response body, with content-type `application/vnd.taskchampion.snapshot`.
The version ID appears in the `X-Version-Id` header.
The response body may be encoded, in accordance with any `Accept-Encoding` header in the request.
After downloading and decrypting a snapshot, a client must replace its entire local task database with the content of the snapshot.
Any local operations that had not yet been synchronized must be discarded.
After the snapshot is applied, the client should begin the synchronization process again, starting from the snapshot version.

View file

@ -19,6 +19,7 @@ clap = "^2.33.0"
log = "^0.4.14"
env_logger = "^0.8.3"
rusqlite = { version = "0.25", features = ["bundled"] }
chrono = { version = "^0.4.10", features = ["serde"] }
[dev-dependencies]
actix-rt = "^1.1.1"

View file

@ -0,0 +1,191 @@
use crate::api::{client_key_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE};
use crate::server::{add_snapshot, VersionId, NO_VERSION_ID};
use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result};
use futures::StreamExt;
/// Max snapshot size: 100MB
const MAX_SIZE: usize = 100 * 1024 * 1024;
/// Add a new snapshot, after checking prerequisites. The snapshot should be transmitted in the
/// request entity body and must have content-type `application/vnd.taskchampion.snapshot`. The
/// content can be encoded in any of the formats supported by actix-web.
///
/// On success, the response is a 200 OK. Even in a 200 OK, the snapshot may not appear in a
/// subsequent `GetSnapshot` call.
///
/// Returns other 4xx or 5xx responses on other errors.
#[post("/v1/client/add-snapshot/{version_id}")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<ServerState>,
web::Path((version_id,)): web::Path<(VersionId,)>,
mut payload: web::Payload,
) -> Result<HttpResponse> {
// check content-type
if req.content_type() != SNAPSHOT_CONTENT_TYPE {
return Err(error::ErrorBadRequest("Bad content-type"));
}
let client_key = client_key_header(&req)?;
// read the body in its entirety
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
return Err(error::ErrorBadRequest("Snapshot over maximum allowed size"));
}
body.extend_from_slice(&chunk);
}
if body.is_empty() {
return Err(error::ErrorBadRequest("No snapshot supplied"));
}
// note that we do not open the transaction until the body has been read
// completely, to avoid blocking other storage access while that data is
// in transit.
let mut txn = server_state.txn().map_err(failure_to_ise)?;
// get, or create, the client
let client = match txn.get_client(client_key).map_err(failure_to_ise)? {
Some(client) => client,
None => {
txn.new_client(client_key, NO_VERSION_ID)
.map_err(failure_to_ise)?;
txn.get_client(client_key).map_err(failure_to_ise)?.unwrap()
}
};
add_snapshot(txn, client_key, client, version_id, body.to_vec()).map_err(failure_to_ise)?;
Ok(HttpResponse::Ok().body(""))
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::{InMemoryStorage, Storage};
use crate::Server;
use actix_web::{http::StatusCode, test, App};
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[actix_rt::test]
async fn test_success() -> anyhow::Result<()> {
let client_key = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_key, version_id).unwrap();
txn.add_version(client_key, version_id, NO_VERSION_ID, vec![])?;
}
let server = Server::new(storage);
let mut app = test::init_service(App::new().service(server.service())).await;
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header("Content-Type", "application/vnd.taskchampion.snapshot")
.header("X-Client-Key", client_key.to_string())
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
// read back that snapshot
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.header("X-Client-Key", client_key.to_string())
.to_request();
let mut resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
use futures::StreamExt;
let (bytes, _) = resp.take_body().into_future().await;
assert_eq!(bytes.unwrap().unwrap().as_ref(), b"abcd");
Ok(())
}
#[actix_rt::test]
async fn test_not_added_200() {
let client_key = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_key, NO_VERSION_ID).unwrap();
}
let server = Server::new(storage);
let mut app = test::init_service(App::new().service(server.service())).await;
// add a snapshot for a nonexistent version
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header("Content-Type", "application/vnd.taskchampion.snapshot")
.header("X-Client-Key", client_key.to_string())
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
// read back, seeing no snapshot
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.header("X-Client-Key", client_key.to_string())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[actix_rt::test]
async fn test_bad_content_type() {
let client_key = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server = Server::new(storage);
let mut app = test::init_service(App::new().service(server.service())).await;
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header("Content-Type", "not/correct")
.header("X-Client-Key", client_key.to_string())
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[actix_rt::test]
async fn test_empty_body() {
let client_key = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server = Server::new(storage);
let mut app = test::init_service(App::new().service(server.service())).await;
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.header(
"Content-Type",
"application/vnd.taskchampion.history-segment",
)
.header("X-Client-Key", client_key.to_string())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}

View file

@ -1,8 +1,8 @@
use crate::api::{
client_key_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE,
PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER,
PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, VERSION_ID_HEADER,
};
use crate::server::{add_version, AddVersionResult, VersionId, NO_VERSION_ID};
use crate::server::{add_version, AddVersionResult, SnapshotUrgency, VersionId, NO_VERSION_ID};
use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result};
use futures::StreamExt;
@ -18,6 +18,9 @@ const MAX_SIZE: usize = 100 * 1024 * 1024;
/// the version cannot be added due to a conflict, the response is a 409 CONFLICT with the expected
/// parent version ID in the `X-Parent-Version-Id` header.
///
/// If included, a snapshot request appears in the `X-Snapshot-Request` header with value
/// `urgency=low` or `urgency=high`.
///
/// Returns other 4xx or 5xx responses on other errors.
#[post("/v1/client/add-version/{parent_version_id}")]
pub(crate) async fn service(
@ -63,15 +66,30 @@ pub(crate) async fn service(
}
};
let result = add_version(txn, client_key, client, parent_version_id, body.to_vec())
.map_err(failure_to_ise)?;
let (result, snap_urgency) =
add_version(txn, client_key, client, parent_version_id, body.to_vec())
.map_err(failure_to_ise)?;
Ok(match result {
AddVersionResult::Ok(version_id) => HttpResponse::Ok()
.header(VERSION_ID_HEADER, version_id.to_string())
.body(""),
AddVersionResult::ExpectedParentVersion(parent_version_id) => HttpResponse::Conflict()
.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string())
.body(""),
AddVersionResult::Ok(version_id) => {
let mut rb = HttpResponse::Ok();
rb.header(VERSION_ID_HEADER, version_id.to_string());
match snap_urgency {
SnapshotUrgency::None => {}
SnapshotUrgency::Low => {
rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=low");
}
SnapshotUrgency::High => {
rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=high");
}
};
rb.finish()
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
let mut rb = HttpResponse::Conflict();
rb.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string());
rb.finish()
}
})
}
@ -117,6 +135,10 @@ mod test {
let new_version_id = resp.headers().get("X-Version-Id").unwrap();
assert!(new_version_id != &version_id.to_string());
// Shapshot should be requested, since there is no existing snapshot
let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap();
assert_eq!(snapshot_request, "urgency=high");
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}

View file

@ -2,7 +2,7 @@ use crate::api::{
client_key_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE,
PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER,
};
use crate::server::{get_child_version, VersionId};
use crate::server::{get_child_version, GetVersionResult, VersionId};
use actix_web::{error, get, web, HttpRequest, HttpResponse, Result};
/// Get a child version.
@ -23,27 +23,31 @@ pub(crate) async fn service(
let client_key = client_key_header(&req)?;
txn.get_client(client_key)
let client = txn
.get_client(client_key)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
let result = get_child_version(txn, client_key, parent_version_id).map_err(failure_to_ise)?;
if let Some(result) = result {
Ok(HttpResponse::Ok()
return match get_child_version(txn, client_key, client, parent_version_id)
.map_err(failure_to_ise)?
{
GetVersionResult::Success {
version_id,
parent_version_id,
history_segment,
} => Ok(HttpResponse::Ok()
.content_type(HISTORY_SEGMENT_CONTENT_TYPE)
.header(VERSION_ID_HEADER, result.version_id.to_string())
.header(
PARENT_VERSION_ID_HEADER,
result.parent_version_id.to_string(),
)
.body(result.history_segment))
} else {
Err(error::ErrorNotFound("no such version"))
}
.header(VERSION_ID_HEADER, version_id.to_string())
.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string())
.body(history_segment)),
GetVersionResult::NotFound => Err(error::ErrorNotFound("no such version")),
GetVersionResult::Gone => Err(error::ErrorGone("version has been deleted")),
};
}
#[cfg(test)]
mod test {
use crate::server::NO_VERSION_ID;
use crate::storage::{InMemoryStorage, Storage};
use crate::Server;
use actix_web::{http::StatusCode, test, App};
@ -113,7 +117,7 @@ mod test {
}
#[actix_rt::test]
async fn test_version_not_found() {
async fn test_version_not_found_and_gone() {
let client_key = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
@ -126,12 +130,26 @@ mod test {
let server = Server::new(storage);
let mut app = test::init_service(App::new().service(server.service())).await;
// the child of an unknown parent_version_id is GONE
let uri = format!("/v1/client/get-child-version/{}", parent_version_id);
let req = test::TestRequest::get()
.uri(&uri)
.header("X-Client-Key", client_key.to_string())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::GONE);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
// but the child of the nil parent_version_id is NOT FOUND, since
// there is no snapshot. The tests in crate::server test more
// corner cases.
let uri = format!("/v1/client/get-child-version/{}", NO_VERSION_ID);
let req = test::TestRequest::get()
.uri(&uri)
.header("X-Client-Key", client_key.to_string())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);

View file

@ -0,0 +1,111 @@
use crate::api::{
client_key_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE, VERSION_ID_HEADER,
};
use crate::server::get_snapshot;
use actix_web::{error, get, web, HttpRequest, HttpResponse, Result};
/// Get a snapshot.
///
/// If a snapshot for this client exists, it is returned with content-type
/// `application/vnd.taskchampion.snapshot`. The `X-Version-Id` header contains the version of the
/// snapshot.
///
/// If no snapshot exists, returns a 404 with no content. Returns other 4xx or 5xx responses on
/// other errors.
#[get("/v1/client/snapshot")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<ServerState>,
) -> Result<HttpResponse> {
let mut txn = server_state.txn().map_err(failure_to_ise)?;
let client_key = client_key_header(&req)?;
let client = txn
.get_client(client_key)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
if let Some((version_id, data)) =
get_snapshot(txn, client_key, client).map_err(failure_to_ise)?
{
Ok(HttpResponse::Ok()
.content_type(SNAPSHOT_CONTENT_TYPE)
.header(VERSION_ID_HEADER, version_id.to_string())
.body(data))
} else {
Err(error::ErrorNotFound("no snapshot"))
}
}
#[cfg(test)]
mod test {
use crate::storage::{InMemoryStorage, Snapshot, Storage};
use crate::Server;
use actix_web::{http::StatusCode, test, App};
use chrono::{TimeZone, Utc};
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[actix_rt::test]
async fn test_not_found() {
let client_key = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_key, Uuid::new_v4()).unwrap();
}
let server = Server::new(storage);
let mut app = test::init_service(App::new().service(server.service())).await;
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.header("X-Client-Key", client_key.to_string())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[actix_rt::test]
async fn test_success() {
let client_key = Uuid::new_v4();
let version_id = Uuid::new_v4();
let snapshot_data = vec![1, 2, 3, 4];
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_key, Uuid::new_v4()).unwrap();
txn.set_snapshot(
client_key,
Snapshot {
version_id,
versions_since: 3,
timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40),
},
snapshot_data.clone(),
)
.unwrap();
}
let server = Server::new(storage);
let mut app = test::init_service(App::new().service(server.service())).await;
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.header("X-Client-Key", client_key.to_string())
.to_request();
let mut resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
use futures::StreamExt;
let (bytes, _) = resp.take_body().into_future().await;
assert_eq!(bytes.unwrap().unwrap().as_ref(), snapshot_data);
}
}

View file

@ -3,13 +3,18 @@ use crate::storage::Storage;
use actix_web::{error, http::StatusCode, web, HttpRequest, Result, Scope};
use std::sync::Arc;
mod add_snapshot;
mod add_version;
mod get_child_version;
mod get_snapshot;
/// The content-type for history segments (opaque blobs of bytes)
pub(crate) const HISTORY_SEGMENT_CONTENT_TYPE: &str =
"application/vnd.taskchampion.history-segment";
/// The content-type for snapshots (opaque blobs of bytes)
pub(crate) const SNAPSHOT_CONTENT_TYPE: &str = "application/vnd.taskchampion.snapshot";
/// The header name for version ID
pub(crate) const VERSION_ID_HEADER: &str = "X-Version-Id";
@ -19,6 +24,9 @@ pub(crate) const CLIENT_KEY_HEADER: &str = "X-Client-Key";
/// The header name for parent version ID
pub(crate) const PARENT_VERSION_ID_HEADER: &str = "X-Parent-Version-Id";
/// The header name for parent version ID
pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request";
/// The type containing a reference to the Storage object in the Actix state.
pub(crate) type ServerState = Arc<dyn Storage>;
@ -26,6 +34,8 @@ pub(crate) fn api_scope() -> Scope {
web::scope("")
.service(get_child_version::service)
.service(add_version::service)
.service(get_snapshot::service)
.service(add_snapshot::service)
}
/// Convert a failure::Error to an Actix ISE

View file

@ -35,3 +35,10 @@ impl Server {
.service(api_scope())
}
}
#[cfg(test)]
mod test {
pub(crate) fn init_logging() {
let _ = env_logger::builder().is_test(true).try_init();
}
}

View file

@ -1,35 +1,75 @@
//! This module implements the core logic of the server: handling transactions, upholding
//! invariants, and so on.
use crate::storage::{Client, StorageTxn};
//! invariants, and so on. This does not implement the HTTP-specific portions; those
//! are in [`crate::api`]. See the protocol documentation for details.
use crate::storage::{Client, Snapshot, StorageTxn};
use chrono::Utc;
use uuid::Uuid;
/// The distinguished value for "no version"
pub const NO_VERSION_ID: VersionId = Uuid::nil();
/// Number of versions to search back from the latest to find the
/// version for a newly-added snapshot. Snapshots for versions older
/// than this will be rejected.
const SNAPSHOT_SEARCH_LEN: i32 = 5;
/// Maximum number of days between snapshots
const SNAPSHOT_DAYS: i64 = 14;
/// Maximum number of versions between snapshots
const SNAPSHOT_VERSIONS: u32 = 30;
pub(crate) type HistorySegment = Vec<u8>;
pub(crate) type ClientKey = Uuid;
pub(crate) type VersionId = Uuid;
/// Response to get_child_version
/// Response to get_child_version. See the protocol documentation.
#[derive(Clone, PartialEq, Debug)]
pub(crate) struct GetVersionResult {
pub(crate) version_id: Uuid,
pub(crate) parent_version_id: Uuid,
pub(crate) history_segment: HistorySegment,
pub(crate) enum GetVersionResult {
NotFound,
Gone,
Success {
version_id: Uuid,
parent_version_id: Uuid,
history_segment: HistorySegment,
},
}
/// Implementation of the GetChildVersion protocol transaction
pub(crate) fn get_child_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_key: ClientKey,
client: Client,
parent_version_id: VersionId,
) -> anyhow::Result<Option<GetVersionResult>> {
Ok(txn
.get_version_by_parent(client_key, parent_version_id)?
.map(|version| GetVersionResult {
) -> anyhow::Result<GetVersionResult> {
// If a version with parentVersionId equal to the requested parentVersionId exists, it is returned.
if let Some(version) = txn.get_version_by_parent(client_key, parent_version_id)? {
return Ok(GetVersionResult::Success {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
history_segment: version.history_segment,
}))
});
}
// If the requested parentVersionId is the nil UUID ..
if parent_version_id == NO_VERSION_ID {
return Ok(match client.snapshot {
// ..and snapshotVersionId is nil, the response is _not-found_ (the client has no
// versions).
None => GetVersionResult::NotFound,
// ..and snapshotVersionId is not nil, the response is _gone_ (the first version has
// been deleted).
Some(_) => GetVersionResult::Gone,
});
}
// If a version with versionId equal to the requested parentVersionId exists, the response is _not-found_ (the client is up-to-date)
if txn.get_version(client_key, parent_version_id)?.is_some() {
return Ok(GetVersionResult::NotFound);
}
// Otherwise, the response is _gone_ (the requested version has been deleted).
Ok(GetVersionResult::Gone)
}
/// Response to add_version
@ -41,13 +81,51 @@ pub(crate) enum AddVersionResult {
ExpectedParentVersion(VersionId),
}
/// Urgency of a snapshot for a client; used to create the `X-Snapshot-Request` header.
#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)]
pub(crate) enum SnapshotUrgency {
/// Don't need a snapshot right now.
None,
/// A snapshot would be good, but can wait for other replicas to provide it.
Low,
/// A snapshot is needed right now.
High,
}
impl SnapshotUrgency {
/// Calculate the urgency for a snapshot based on its age in days
fn for_days(days: i64) -> Self {
if days >= SNAPSHOT_DAYS * 3 / 2 {
SnapshotUrgency::High
} else if days >= SNAPSHOT_DAYS {
SnapshotUrgency::Low
} else {
SnapshotUrgency::None
}
}
/// Calculate the urgency for a snapshot based on its age in versions
fn for_versions_since(versions_since: u32) -> Self {
if versions_since >= SNAPSHOT_VERSIONS * 3 / 2 {
SnapshotUrgency::High
} else if versions_since >= SNAPSHOT_VERSIONS {
SnapshotUrgency::Low
} else {
SnapshotUrgency::None
}
}
}
/// Implementation of the AddVersion protocol transaction
pub(crate) fn add_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_key: ClientKey,
client: Client,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> {
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
log::debug!(
"add_version(client_key: {}, parent_version_id: {})",
client_key,
@ -57,8 +135,9 @@ pub(crate) fn add_version<'a>(
// check if this version is acceptable, under the protection of the transaction
if client.latest_version_id != NO_VERSION_ID && parent_version_id != client.latest_version_id {
log::debug!("add_version request rejected: mismatched latest_version_id");
return Ok(AddVersionResult::ExpectedParentVersion(
client.latest_version_id,
return Ok((
AddVersionResult::ExpectedParentVersion(client.latest_version_id),
SnapshotUrgency::None,
));
}
@ -71,30 +150,261 @@ pub(crate) fn add_version<'a>(
// update the DB
txn.add_version(client_key, version_id, parent_version_id, history_segment)?;
txn.set_client_latest_version_id(client_key, version_id)?;
txn.commit()?;
Ok(AddVersionResult::Ok(version_id))
// calculate the urgency
let time_urgency = match client.snapshot {
None => SnapshotUrgency::High,
Some(Snapshot { timestamp, .. }) => {
SnapshotUrgency::for_days((Utc::now() - timestamp).num_days())
}
};
println!("{:?}", client.snapshot);
let version_urgency = match client.snapshot {
None => SnapshotUrgency::High,
Some(Snapshot { versions_since, .. }) => {
SnapshotUrgency::for_versions_since(versions_since)
}
};
Ok((
AddVersionResult::Ok(version_id),
std::cmp::max(time_urgency, version_urgency),
))
}
/// Implementation of the AddSnapshot protocol transaction
pub(crate) fn add_snapshot<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_key: ClientKey,
client: Client,
version_id: VersionId,
data: Vec<u8>,
) -> anyhow::Result<()> {
log::debug!(
"add_snapshot(client_key: {}, version_id: {})",
client_key,
version_id,
);
// NOTE: if the snapshot is rejected, this function logs about it and returns
// Ok(()), as there's no reason to report an errot to the client / user.
let last_snapshot = client.snapshot.map(|snap| snap.version_id);
if Some(version_id) == last_snapshot {
log::debug!(
"rejecting snapshot for version {}: already exists",
version_id
);
return Ok(());
}
// look for this version in the history of this client, starting at the latest version, and
// only iterating for a limited number of versions.
let mut search_len = SNAPSHOT_SEARCH_LEN;
let mut vid = client.latest_version_id;
loop {
if vid == version_id && version_id != NO_VERSION_ID {
// the new snapshot is for a recent version, so proceed
break;
}
if Some(vid) == last_snapshot {
// the new snapshot is older than the last snapshot, so ignore it
log::debug!(
"rejecting snapshot for version {}: newer snapshot already exists or no such version",
version_id
);
return Ok(());
}
search_len -= 1;
if search_len <= 0 || vid == NO_VERSION_ID {
// this should not happen in normal operation, so warn about it
log::warn!(
"rejecting snapshot for version {}: version is too old or no such version",
version_id
);
return Ok(());
}
// get the parent version ID
if let Some(parent) = txn.get_version(client_key, vid)? {
vid = parent.parent_version_id;
} else {
// this version does not exist; "this should not happen" but if it does,
// we don't need a snapshot earlier than the missing version.
log::warn!(
"rejecting snapshot for version {}: newer versions have already been deleted",
version_id
);
return Ok(());
}
}
log::warn!("accepting snapshot for version {}", version_id);
txn.set_snapshot(
client_key,
Snapshot {
version_id,
timestamp: Utc::now(),
versions_since: 0,
},
data,
)?;
txn.commit()?;
Ok(())
}
/// Implementation of the GetSnapshot protocol transaction
pub(crate) fn get_snapshot<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_key: ClientKey,
client: Client,
) -> anyhow::Result<Option<(Uuid, Vec<u8>)>> {
Ok(if let Some(snap) = client.snapshot {
txn.get_snapshot_data(client_key, snap.version_id)?
.map(|data| (snap.version_id, data))
} else {
None
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::{InMemoryStorage, Storage};
use crate::storage::{InMemoryStorage, Snapshot, Storage};
use crate::test::init_logging;
use chrono::{Duration, TimeZone, Utc};
use pretty_assertions::assert_eq;
#[test]
fn gcv_not_found() -> anyhow::Result<()> {
fn snapshot_urgency_max() {
use SnapshotUrgency::*;
assert_eq!(std::cmp::max(None, None), None);
assert_eq!(std::cmp::max(None, Low), Low);
assert_eq!(std::cmp::max(None, High), High);
assert_eq!(std::cmp::max(Low, None), Low);
assert_eq!(std::cmp::max(Low, Low), Low);
assert_eq!(std::cmp::max(Low, High), High);
assert_eq!(std::cmp::max(High, None), High);
assert_eq!(std::cmp::max(High, Low), High);
assert_eq!(std::cmp::max(High, High), High);
}
#[test]
fn snapshot_urgency_for_days() {
use SnapshotUrgency::*;
assert_eq!(SnapshotUrgency::for_days(0), None);
assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS), Low);
assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS * 2), High);
}
#[test]
fn snapshot_urgency_for_versions_since() {
use SnapshotUrgency::*;
assert_eq!(SnapshotUrgency::for_versions_since(0), None);
assert_eq!(SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS), Low);
assert_eq!(
SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS * 2),
High
);
}
#[test]
fn get_child_version_not_found_initial() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let txn = storage.txn()?;
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
assert_eq!(get_child_version(txn, client_key, parent_version_id)?, None);
txn.new_client(client_key, NO_VERSION_ID)?;
// when no snapshot exists, the first version is NotFound
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(
get_child_version(txn, client_key, client, NO_VERSION_ID)?,
GetVersionResult::NotFound
);
Ok(())
}
#[test]
fn gcv_found() -> anyhow::Result<()> {
fn get_child_version_gone_initial() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
txn.new_client(client_key, Uuid::new_v4())?;
txn.set_snapshot(
client_key,
Snapshot {
version_id: Uuid::new_v4(),
versions_since: 0,
timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40),
},
vec![1, 2, 3],
)?;
// when a snapshot exists, the first version is GONE
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(
get_child_version(txn, client_key, client, NO_VERSION_ID)?,
GetVersionResult::Gone
);
Ok(())
}
#[test]
fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
// add a parent version, but not the requested child version
let parent_version_id = Uuid::new_v4();
txn.new_client(client_key, parent_version_id)?;
txn.add_version(client_key, parent_version_id, NO_VERSION_ID, vec![])?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(
get_child_version(txn, client_key, client, parent_version_id)?,
GetVersionResult::NotFound
);
Ok(())
}
#[test]
fn get_child_version_gone() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
// make up a parent version id, but neither that version
// nor its child exists (presumed to have been deleted)
let parent_version_id = Uuid::new_v4();
txn.new_client(client_key, Uuid::new_v4())?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(
get_child_version(txn, client_key, client, parent_version_id)?,
GetVersionResult::Gone
);
Ok(())
}
#[test]
fn get_child_version_found() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
@ -102,6 +412,7 @@ mod test {
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
txn.new_client(client_key, version_id)?;
txn.add_version(
client_key,
version_id,
@ -109,95 +420,473 @@ mod test {
history_segment.clone(),
)?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(
get_child_version(txn, client_key, parent_version_id)?,
Some(GetVersionResult {
get_child_version(txn, client_key, client, parent_version_id)?,
GetVersionResult::Success {
version_id,
parent_version_id,
history_segment,
})
}
);
Ok(())
}
#[test]
fn av_conflict() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
/// Utility setup function for add_version tests
fn av_setup(
storage: &InMemoryStorage,
num_versions: u32,
snapshot_version: Option<u32>,
snapshot_days_ago: Option<i64>,
) -> anyhow::Result<(Uuid, Vec<Uuid>)> {
init_logging();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
let existing_parent_version_id = Uuid::new_v4();
let client = Client {
latest_version_id: existing_parent_version_id,
};
let mut versions = vec![];
let mut version_id = Uuid::nil();
txn.new_client(client_key, Uuid::nil())?;
for vnum in 0..num_versions {
let parent_version_id = version_id;
version_id = Uuid::new_v4();
versions.push(version_id);
txn.add_version(
client_key,
version_id,
parent_version_id,
vec![0, 0, vnum as u8],
)?;
if Some(vnum) == snapshot_version {
txn.set_snapshot(
client_key,
Snapshot {
version_id,
versions_since: 0,
timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)),
},
vec![vnum as u8],
)?;
}
}
Ok((client_key, versions))
}
/// Utility function to check the results of an add_version call
fn av_success_check(
storage: &InMemoryStorage,
client_key: Uuid,
existing_versions: &[Uuid],
result: (AddVersionResult, SnapshotUrgency),
expected_history: Vec<u8>,
expected_urgency: SnapshotUrgency,
) -> anyhow::Result<()> {
if let AddVersionResult::Ok(new_version_id) = result.0 {
// check that it invented a new version ID
for v in existing_versions {
assert_ne!(&new_version_id, v);
}
// verify that the storage was updated
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, new_version_id);
let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil);
let version = txn.get_version(client_key, new_version_id)?.unwrap();
assert_eq!(version.version_id, new_version_id);
assert_eq!(version.parent_version_id, parent_version_id);
assert_eq!(version.history_segment, expected_history);
} else {
panic!("did not get Ok from add_version: {:?}", result);
}
assert_eq!(result.1, expected_urgency);
Ok(())
}
#[test]
fn add_version_conflict() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_key, versions) = av_setup(&storage, 3, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
// try to add a child of a version other than the latest
assert_eq!(
add_version(txn, client_key, client, parent_version_id, history_segment)?,
AddVersionResult::ExpectedParentVersion(existing_parent_version_id)
add_version(txn, client_key, client, versions[1], vec![3, 6, 9])?.0,
AddVersionResult::ExpectedParentVersion(versions[2])
);
// verify that the storage wasn't updated
txn = storage.txn()?;
assert_eq!(txn.get_client(client_key)?, None);
assert_eq!(
txn.get_version_by_parent(client_key, parent_version_id)?,
None
txn.get_client(client_key)?.unwrap().latest_version_id,
versions[2]
);
assert_eq!(txn.get_version_by_parent(client_key, versions[2])?, None);
Ok(())
}
#[test]
fn add_version_with_existing_history() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_key, versions) = av_setup(&storage, 1, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(txn, client_key, client, versions[0], vec![3, 6, 9])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![3, 6, 9],
// urgency=high because there are no snapshots yet
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_version_with_no_history() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_key, versions) = av_setup(&storage, 0, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let parent_version_id = Uuid::nil();
let result = add_version(txn, client_key, client, parent_version_id, vec![3, 6, 9])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![3, 6, 9],
// urgency=high because there are no snapshots yet
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_version_success_recent_snapshot() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_key, versions) = av_setup(&storage, 1, Some(0), None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![1, 2, 3],
// no snapshot request since the previous version has a snapshot
SnapshotUrgency::None,
)?;
Ok(())
}
#[test]
fn add_version_success_aged_snapshot() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
// one snapshot, but it was 50 days ago
let (client_key, versions) = av_setup(&storage, 1, Some(0), Some(50))?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![1, 2, 3],
// urgency=high due to days since the snapshot
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
// one snapshot, but it was 50 versions ago
let (client_key, versions) = av_setup(&storage, 50, Some(0), None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(txn, client_key, client, versions[49], vec![1, 2, 3])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![1, 2, 3],
// urgency=high due to number of versions since the snapshot
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_snapshot_success_latest() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let version_id = Uuid::new_v4();
// set up a task DB with one version in it
txn.new_client(client_key, version_id)?;
txn.add_version(client_key, version_id, NO_VERSION_ID, vec![])?;
// add a snapshot for that version
let client = txn.get_client(client_key)?.unwrap();
add_snapshot(txn, client_key, client, version_id, vec![1, 2, 3])?;
// verify the snapshot
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let snapshot = client.snapshot.unwrap();
assert_eq!(snapshot.version_id, version_id);
assert_eq!(snapshot.versions_since, 0);
assert_eq!(
txn.get_snapshot_data(client_key, version_id).unwrap(),
Some(vec![1, 2, 3])
);
Ok(())
}
fn test_av_success(latest_version_id_nil: bool) -> anyhow::Result<()> {
#[test]
fn add_snapshot_success_older() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
let latest_version_id = if latest_version_id_nil {
Uuid::nil()
} else {
parent_version_id
};
let version_id_1 = Uuid::new_v4();
let version_id_2 = Uuid::new_v4();
txn.new_client(client_key, latest_version_id)?;
// set up a task DB with two versions in it
txn.new_client(client_key, version_id_2)?;
txn.add_version(client_key, version_id_1, NO_VERSION_ID, vec![])?;
txn.add_version(client_key, version_id_2, version_id_1, vec![])?;
// add a snapshot for version 1
let client = txn.get_client(client_key)?.unwrap();
add_snapshot(txn, client_key, client, version_id_1, vec![1, 2, 3])?;
let result = add_version(
txn,
client_key,
client,
parent_version_id,
history_segment.clone(),
)?;
if let AddVersionResult::Ok(new_version_id) = result {
// check that it invented a new version ID
assert!(new_version_id != parent_version_id);
// verify that the storage was updated
txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, new_version_id);
let version = txn
.get_version_by_parent(client_key, parent_version_id)?
.unwrap();
assert_eq!(version.version_id, new_version_id);
assert_eq!(version.parent_version_id, parent_version_id);
assert_eq!(version.history_segment, history_segment);
} else {
panic!("did not get Ok from add_version");
}
// verify the snapshot
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let snapshot = client.snapshot.unwrap();
assert_eq!(snapshot.version_id, version_id_1);
assert_eq!(snapshot.versions_since, 0);
assert_eq!(
txn.get_snapshot_data(client_key, version_id_1).unwrap(),
Some(vec![1, 2, 3])
);
Ok(())
}
#[test]
fn av_success_with_existing_history() -> anyhow::Result<()> {
test_av_success(true)
fn add_snapshot_fails_no_such() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let version_id_1 = Uuid::new_v4();
let version_id_2 = Uuid::new_v4();
// set up a task DB with two versions in it
txn.new_client(client_key, version_id_2)?;
txn.add_version(client_key, version_id_1, NO_VERSION_ID, vec![])?;
txn.add_version(client_key, version_id_2, version_id_1, vec![])?;
// add a snapshot for unknown version
let client = txn.get_client(client_key)?.unwrap();
let version_id_unk = Uuid::new_v4();
add_snapshot(txn, client_key, client, version_id_unk, vec![1, 2, 3])?;
// verify the snapshot does not exist
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
assert!(client.snapshot.is_none());
Ok(())
}
#[test]
fn av_success_nil_latest_version_id() -> anyhow::Result<()> {
test_av_success(false)
fn add_snapshot_fails_too_old() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let mut version_id = Uuid::new_v4();
let mut parent_version_id = Uuid::nil();
let mut version_ids = vec![];
// set up a task DB with 10 versions in it (oldest to newest)
txn.new_client(client_key, Uuid::nil())?;
for _ in 0..10 {
txn.add_version(client_key, version_id, parent_version_id, vec![])?;
version_ids.push(version_id);
parent_version_id = version_id;
version_id = Uuid::new_v4();
}
// add a snapshot for the earliest of those
let client = txn.get_client(client_key)?.unwrap();
add_snapshot(txn, client_key, client, version_ids[0], vec![1, 2, 3])?;
// verify the snapshot does not exist
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
assert!(client.snapshot.is_none());
Ok(())
}
#[test]
fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let mut version_id = Uuid::new_v4();
let mut parent_version_id = Uuid::nil();
let mut version_ids = vec![];
// set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the middle
// one
txn.new_client(client_key, Uuid::nil())?;
for _ in 0..5 {
txn.add_version(client_key, version_id, parent_version_id, vec![])?;
version_ids.push(version_id);
parent_version_id = version_id;
version_id = Uuid::new_v4();
}
txn.set_snapshot(
client_key,
Snapshot {
version_id: version_ids[2],
versions_since: 2,
timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40),
},
vec![1, 2, 3],
)?;
// add a snapshot for the earliest of those
let client = txn.get_client(client_key)?.unwrap();
add_snapshot(txn, client_key, client, version_ids[0], vec![9, 9, 9])?;
// verify the snapshot was not replaced
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let snapshot = client.snapshot.unwrap();
assert_eq!(snapshot.version_id, version_ids[2]);
assert_eq!(snapshot.versions_since, 2);
assert_eq!(
txn.get_snapshot_data(client_key, version_ids[2]).unwrap(),
Some(vec![1, 2, 3])
);
Ok(())
}
#[test]
fn add_snapshot_fails_nil_version() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
// just set up the client
txn.new_client(client_key, NO_VERSION_ID)?;
// add a snapshot for the nil version
let client = txn.get_client(client_key)?.unwrap();
add_snapshot(txn, client_key, client, NO_VERSION_ID, vec![9, 9, 9])?;
// verify the snapshot does not exist
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
assert!(client.snapshot.is_none());
Ok(())
}
#[test]
fn get_snapshot_found() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let data = vec![1, 2, 3];
let snapshot_version_id = Uuid::new_v4();
txn.new_client(client_key, snapshot_version_id)?;
txn.set_snapshot(
client_key,
Snapshot {
version_id: snapshot_version_id,
versions_since: 3,
timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40),
},
data.clone(),
)?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(
get_snapshot(txn, client_key, client)?,
Some((snapshot_version_id, data.clone()))
);
Ok(())
}
#[test]
fn get_snapshot_not_found() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
txn.new_client(client_key, NO_VERSION_ID)?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(get_snapshot(txn, client_key, client)?, None);
Ok(())
}
}

View file

@ -1,4 +1,4 @@
use super::{Client, Storage, StorageTxn, Uuid, Version};
use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version};
use std::collections::HashMap;
use std::sync::{Mutex, MutexGuard};
@ -6,8 +6,14 @@ struct Inner {
/// Clients, indexed by client_key
clients: HashMap<Uuid, Client>,
/// Versions, indexed by (client_key, parent_version_id)
/// Snapshot data, indexed by client key
snapshots: HashMap<Uuid, Vec<u8>>,
/// Versions, indexed by (client_key, version_id)
versions: HashMap<(Uuid, Uuid), Version>,
/// Child versions, indexed by (client_key, parent_version_id)
children: HashMap<(Uuid, Uuid), Uuid>,
}
pub struct InMemoryStorage(Mutex<Inner>);
@ -17,7 +23,9 @@ impl InMemoryStorage {
pub fn new() -> Self {
Self(Mutex::new(Inner {
clients: HashMap::new(),
snapshots: HashMap::new(),
versions: HashMap::new(),
children: HashMap::new(),
}))
}
}
@ -42,23 +50,44 @@ impl<'a> StorageTxn for InnerTxn<'a> {
if self.0.clients.get(&client_key).is_some() {
return Err(anyhow::anyhow!("Client {} already exists", client_key));
}
self.0
.clients
.insert(client_key, Client { latest_version_id });
self.0.clients.insert(
client_key,
Client {
latest_version_id,
snapshot: None,
},
);
Ok(())
}
fn set_client_latest_version_id(
fn set_snapshot(
&mut self,
client_key: Uuid,
latest_version_id: Uuid,
snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()> {
if let Some(client) = self.0.clients.get_mut(&client_key) {
client.latest_version_id = latest_version_id;
Ok(())
} else {
Err(anyhow::anyhow!("Client {} does not exist", client_key))
let mut client = self
.0
.clients
.get_mut(&client_key)
.ok_or_else(|| anyhow::anyhow!("no such client"))?;
client.snapshot = Some(snapshot);
self.0.snapshots.insert(client_key, data);
Ok(())
}
fn get_snapshot_data(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>> {
// sanity check
let client = self.0.clients.get(&client_key);
let client = client.ok_or_else(|| anyhow::anyhow!("no such client"))?;
if Some(&version_id) != client.snapshot.as_ref().map(|snap| &snap.version_id) {
return Err(anyhow::anyhow!("unexpected snapshot_version_id"));
}
Ok(self.0.snapshots.get(&client_key).cloned())
}
fn get_version_by_parent(
@ -66,11 +95,23 @@ impl<'a> StorageTxn for InnerTxn<'a> {
client_key: Uuid,
parent_version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
Ok(self
.0
.versions
.get(&(client_key, parent_version_id))
.cloned())
if let Some(parent_version_id) = self.0.children.get(&(client_key, parent_version_id)) {
Ok(self
.0
.versions
.get(&(client_key, *parent_version_id))
.cloned())
} else {
Ok(None)
}
}
fn get_version(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
Ok(self.0.versions.get(&(client_key, version_id)).cloned())
}
fn add_version(
@ -86,9 +127,21 @@ impl<'a> StorageTxn for InnerTxn<'a> {
parent_version_id,
history_segment,
};
if let Some(client) = self.0.clients.get_mut(&client_key) {
client.latest_version_id = version_id;
if let Some(ref mut snap) = client.snapshot {
snap.versions_since += 1;
}
} else {
return Err(anyhow::anyhow!("Client {} does not exist", client_key));
}
self.0
.versions
.insert((client_key, version.parent_version_id), version);
.children
.insert((client_key, parent_version_id), version_id);
self.0.versions.insert((client_key, version_id), version);
Ok(())
}
@ -96,3 +149,139 @@ impl<'a> StorageTxn for InnerTxn<'a> {
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use chrono::Utc;
#[test]
fn test_get_client_empty() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let maybe_client = txn.get_client(Uuid::new_v4())?;
assert!(maybe_client.is_none());
Ok(())
}
#[test]
fn test_client_storage() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let latest_version_id = Uuid::new_v4();
txn.new_client(client_key, latest_version_id)?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let latest_version_id = Uuid::new_v4();
txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 4,
};
txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.snapshot.unwrap(), snap);
Ok(())
}
#[test]
fn test_gvbp_empty() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?;
assert!(maybe_version.is_none());
Ok(())
}
#[test]
fn test_add_version_and_get_version() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abc".to_vec();
txn.new_client(client_key, parent_version_id)?;
txn.add_version(
client_key,
version_id,
parent_version_id,
history_segment.clone(),
)?;
let expected = Version {
version_id,
parent_version_id,
history_segment,
};
let version = txn
.get_version_by_parent(client_key, parent_version_id)?
.unwrap();
assert_eq!(version, expected);
let version = txn.get_version(client_key, version_id)?.unwrap();
assert_eq!(version, expected);
Ok(())
}
#[test]
fn test_snapshots() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
txn.new_client(client_key, Uuid::new_v4())?;
assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 3,
};
txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?;
assert_eq!(
txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(),
vec![9, 8, 9]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap));
let snap2 = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 10,
};
txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?;
assert_eq!(
txn.get_snapshot_data(client_key, snap2.version_id)?
.unwrap(),
vec![0, 2, 4, 6]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2));
// check that mismatched version is detected
assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err());
Ok(())
}
}

View file

@ -1,4 +1,4 @@
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
use uuid::Uuid;
#[cfg(debug_assertions)]
@ -10,12 +10,27 @@ pub use inmemory::InMemoryStorage;
mod sqlite;
pub use self::sqlite::SqliteStorage;
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Debug)]
pub struct Client {
/// The latest version for this client (may be the nil version)
pub latest_version_id: Uuid,
/// Data about the latest snapshot for this client
pub snapshot: Option<Snapshot>,
}
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Debug)]
pub struct Snapshot {
/// ID of the version at which this snapshot was made
pub version_id: Uuid,
/// Timestamp at which this snapshot was set
pub timestamp: DateTime<Utc>,
/// Number of versions since this snapshot was made
pub versions_since: u32,
}
#[derive(Clone, PartialEq, Debug)]
pub struct Version {
pub version_id: Uuid,
pub parent_version_id: Uuid,
@ -29,13 +44,22 @@ pub trait StorageTxn {
/// Create a new client with the given latest_version_id
fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()>;
/// Set the client's latest_version_id
fn set_client_latest_version_id(
/// Set the client's most recent snapshot.
fn set_snapshot(
&mut self,
client_key: Uuid,
latest_version_id: Uuid,
snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()>;
/// Get the data for the most recent snapshot. The version_id
/// is used to verify that the snapshot is for the correct version.
fn get_snapshot_data(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>>;
/// Get a version, indexed by parent version id
fn get_version_by_parent(
&mut self,
@ -43,7 +67,16 @@ pub trait StorageTxn {
parent_version_id: Uuid,
) -> anyhow::Result<Option<Version>>;
/// Add a version (that must not already exist)
/// Get a version, indexed by its own version id
fn get_version(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Version>>;
/// Add a version (that must not already exist), and
/// - update latest_version_id
/// - increment snapshot.versions_since
fn add_version(
&mut self,
client_key: Uuid,

View file

@ -1,5 +1,6 @@
use super::{Client, Storage, StorageTxn, Uuid, Version};
use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version};
use anyhow::Context;
use chrono::{TimeZone, Utc};
use rusqlite::types::{FromSql, ToSql};
use rusqlite::{params, Connection, OptionalExtension};
use std::path::Path;
@ -30,24 +31,6 @@ impl ToSql for StoredUuid {
}
}
/// Stores [`Client`] in SQLite
impl FromSql for Client {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
let o: Client = serde_json::from_str(value.as_str()?)
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
Ok(o)
}
}
/// Parses Operation stored as JSON in string column
impl ToSql for Client {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
let s = serde_json::to_string(&self)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
Ok(s.into())
}
}
/// An on-disk storage backend which uses SQLite
pub struct SqliteStorage {
db_file: std::path::PathBuf,
@ -69,11 +52,19 @@ impl SqliteStorage {
let txn = con.transaction()?;
let queries = vec![
"CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);",
"CREATE TABLE IF NOT EXISTS clients (
client_key STRING PRIMARY KEY,
latest_version_id STRING,
snapshot_version_id STRING,
versions_since_snapshot INTEGER,
snapshot_timestamp INTEGER,
snapshot BLOB);",
"CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_key STRING, parent_version_id STRING, history_segment BLOB);",
"CREATE INDEX IF NOT EXISTS versions_by_parent ON versions (parent_version_id);",
];
for q in queries {
txn.execute(q, []).context("Creating table")?;
txn.execute(q, [])
.context("Error while creating SQLite tables")?;
}
txn.commit()?;
}
@ -100,6 +91,34 @@ impl Txn {
.transaction()
.map_err(|_e| SqliteError::CreateTransactionFailed)
}
/// Implementation for queries from the versions table
fn get_version_impl(
&mut self,
query: &'static str,
client_key: Uuid,
version_id_arg: Uuid,
) -> anyhow::Result<Option<Version>> {
let t = self.get_txn()?;
let r = t
.query_row(
query,
params![&StoredUuid(version_id_arg), &StoredUuid(client_key)],
|r| {
let version_id: StoredUuid = r.get("version_id")?;
let parent_version_id: StoredUuid = r.get("parent_version_id")?;
Ok(Version {
version_id: version_id.0,
parent_version_id: parent_version_id.0,
history_segment: r.get("history_segment")?,
})
},
)
.optional()
.context("Error getting version")?;
Ok(r)
}
}
impl StorageTxn for Txn {
@ -107,17 +126,42 @@ impl StorageTxn for Txn {
let t = self.get_txn()?;
let result: Option<Client> = t
.query_row(
"SELECT latest_version_id FROM clients WHERE client_key = ? LIMIT 1",
"SELECT
latest_version_id,
snapshot_timestamp,
versions_since_snapshot,
snapshot_version_id
FROM clients
WHERE client_key = ?
LIMIT 1",
[&StoredUuid(client_key)],
|r| {
let latest_version_id: StoredUuid = r.get(0)?;
let snapshot_timestamp: Option<i64> = r.get(1)?;
let versions_since_snapshot: Option<u32> = r.get(2)?;
let snapshot_version_id: Option<StoredUuid> = r.get(3)?;
// if all of the relevant fields are non-NULL, return a snapshot
let snapshot = match (
snapshot_timestamp,
versions_since_snapshot,
snapshot_version_id,
) {
(Some(ts), Some(vs), Some(v)) => Some(Snapshot {
version_id: v.0,
timestamp: Utc.timestamp(ts, 0),
versions_since: vs,
}),
_ => None,
};
Ok(Client {
latest_version_id: latest_version_id.0,
snapshot,
})
},
)
.optional()
.context("Get client query")?;
.context("Error getting client")?;
Ok(result)
}
@ -129,18 +173,66 @@ impl StorageTxn for Txn {
"INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)",
params![&StoredUuid(client_key), &StoredUuid(latest_version_id)],
)
.context("Create client query")?;
.context("Error creating/updating client")?;
t.commit()?;
Ok(())
}
fn set_client_latest_version_id(
fn set_snapshot(
&mut self,
client_key: Uuid,
latest_version_id: Uuid,
snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()> {
// Implementation is same as new_client
self.new_client(client_key, latest_version_id)
let t = self.get_txn()?;
t.execute(
"UPDATE clients
SET
snapshot_version_id = ?,
snapshot_timestamp = ?,
versions_since_snapshot = ?,
snapshot = ?
WHERE client_key = ?",
params![
&StoredUuid(snapshot.version_id),
snapshot.timestamp.timestamp(),
snapshot.versions_since,
data,
&StoredUuid(client_key),
],
)
.context("Error creating/updating snapshot")?;
t.commit()?;
Ok(())
}
fn get_snapshot_data(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>> {
let t = self.get_txn()?;
let r = t
.query_row(
"SELECT snapshot, snapshot_version_id FROM clients WHERE client_key = ?",
params![&StoredUuid(client_key)],
|r| {
let v: StoredUuid = r.get("snapshot_version_id")?;
let d: Vec<u8> = r.get("snapshot")?;
Ok((v.0, d))
},
)
.optional()
.context("Error getting snapshot")?;
r.map(|(v, d)| {
if v != version_id {
return Err(anyhow::anyhow!("unexpected snapshot_version_id"));
}
Ok(d)
})
.transpose()
}
fn get_version_by_parent(
@ -148,24 +240,21 @@ impl StorageTxn for Txn {
client_key: Uuid,
parent_version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
let t = self.get_txn()?;
let r = t.query_row(
self.get_version_impl(
"SELECT version_id, parent_version_id, history_segment FROM versions WHERE parent_version_id = ? AND client_key = ?",
params![&StoredUuid(parent_version_id), &StoredUuid(client_key)],
|r| {
let version_id: StoredUuid = r.get("version_id")?;
let parent_version_id: StoredUuid = r.get("parent_version_id")?;
client_key,
parent_version_id)
}
Ok(Version{
version_id: version_id.0,
parent_version_id: parent_version_id.0,
history_segment: r.get("history_segment")?,
})}
)
.optional()
.context("Get version query")
?;
Ok(r)
fn get_version(
&mut self,
client_key: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
self.get_version_impl(
"SELECT version_id, parent_version_id, history_segment FROM versions WHERE version_id = ? AND client_key = ?",
client_key,
version_id)
}
fn add_version(
@ -184,9 +273,19 @@ impl StorageTxn for Txn {
StoredUuid(client_key),
StoredUuid(parent_version_id),
history_segment
],
]
)
.context("Add version query")?;
.context("Error adding version")?;
t.execute(
"UPDATE clients
SET
latest_version_id = ?,
versions_since_snapshot = versions_since_snapshot + 1
WHERE client_key = ?",
params![StoredUuid(version_id), StoredUuid(client_key),],
)
.context("Error updating client for new version")?;
t.commit()?;
Ok(())
}
@ -203,6 +302,7 @@ impl StorageTxn for Txn {
#[cfg(test)]
mod test {
use super::*;
use chrono::DateTime;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
@ -239,12 +339,25 @@ mod test {
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let latest_version_id = Uuid::new_v4();
txn.set_client_latest_version_id(client_key, latest_version_id)?;
txn.add_version(client_key, latest_version_id, Uuid::new_v4(), vec![1, 1])?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2014-11-28T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 4,
};
txn.set_snapshot(client_key, snap.clone(), vec![1, 2, 3])?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.snapshot.unwrap(), snap);
Ok(())
}
@ -260,7 +373,7 @@ mod test {
}
#[test]
fn test_add_version_and_gvbp() -> anyhow::Result<()> {
fn test_add_version_and_get_version() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(&tmp_dir.path())?;
let mut txn = storage.txn()?;
@ -275,18 +388,65 @@ mod test {
parent_version_id,
history_segment.clone(),
)?;
let expected = Version {
version_id,
parent_version_id,
history_segment,
};
let version = txn
.get_version_by_parent(client_key, parent_version_id)?
.unwrap();
assert_eq!(version, expected);
let version = txn.get_version(client_key, version_id)?.unwrap();
assert_eq!(version, expected);
Ok(())
}
#[test]
fn test_snapshots() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(&tmp_dir.path())?;
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
txn.new_client(client_key, Uuid::new_v4())?;
assert!(txn.get_client(client_key)?.unwrap().snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2013-10-08T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 3,
};
txn.set_snapshot(client_key, snap.clone(), vec![9, 8, 9])?;
assert_eq!(
version,
Version {
version_id,
parent_version_id,
history_segment,
}
txn.get_snapshot_data(client_key, snap.version_id)?.unwrap(),
vec![9, 8, 9]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap));
let snap2 = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2014-11-28T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 10,
};
txn.set_snapshot(client_key, snap2.clone(), vec![0, 2, 4, 6])?;
assert_eq!(
txn.get_snapshot_data(client_key, snap2.version_id)?
.unwrap(),
vec![0, 2, 4, 6]
);
assert_eq!(txn.get_client(client_key)?.unwrap().snapshot, Some(snap2));
// check that mismatched version is detected
assert!(txn.get_snapshot_data(client_key, Uuid::new_v4()).is_err());
Ok(())
}
}