Specifically, from f89cc35a4aa8b928cc63502f3dde4d47447b5551, with
addition of README.md, CONTRIBUTING.md, and a few other ancillary files.
This commit is contained in:
Dustin J. Mitchell 2024-04-07 21:43:46 -04:00
commit 31cb732f06
No known key found for this signature in database
18 changed files with 4884 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
target/

76
CODE_OF_CONDUCT.md Normal file
View file

@ -0,0 +1,76 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at dustin@cs.uchicago.edu. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

20
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,20 @@
# Welcome
TaskChampion sync-server is very open to contributions, and we'd love to have your help!
A good starting point might be one of the issues tagged with ["good first issue"][first].
[first]: https://github.com/taskchampion/taskchampion-sync-server/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22
# Development Guide
This repository is a typical Rust application.
To work on it, you'll need to [install a recent version of Rust](https://www.rust-lang.org/tools/install) (the latest stable is always a good choice).
Once you've done that, run `cargo build` at the top level of this repository to build the binary.
Alternately, run `cargo test` to run the test suite.
## Making a Pull Request
We expect contributors to follow the [GitHub Flow](https://guides.github.com/introduction/flow/).
Aside from that, we have no particular requirements on pull requests.
Make your patch, double-check that it's complete (tests? docs? documentation comments?), and make a new pull request.

1916
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

25
Cargo.toml Normal file
View file

@ -0,0 +1,25 @@
[package]
name = "taskchampion-sync-server"
version = "0.4.1"
authors = ["Dustin J. Mitchell <dustin@mozilla.com>"]
edition = "2021"
publish = false
[dependencies]
uuid = { version = "^1.8.0", features = ["serde", "v4"] }
actix-web = "^4.3.1"
anyhow = "1.0"
thiserror = "1.0"
futures = "^0.3.25"
serde_json = "^1.0"
serde = { version = "^1.0.147", features = ["derive"] }
clap = { version = "^4.3.0", features = ["string"] }
log = "^0.4.17"
env_logger = "^0.10.0"
rusqlite = { version = "0.29", features = ["bundled"] }
chrono = { version = "^0.4.22", features = ["serde"] }
[dev-dependencies]
actix-rt = "2"
tempfile = "3"
pretty_assertions = "1"

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 Dustin J. Mitchell
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

10
README.md Normal file
View file

@ -0,0 +1,10 @@
TaskChampion Sync-Server
------------------------
TaskChampion is the task database [Taskwarrior](https://github.com/GothenburgBitFactory/taskwarrior) uses to store and sync tasks.
This repository implements a sync server against which Taskwarrior and other applications embedding TaskChampion can sync.
## Status
This repository was spun off from Taskwarrior itself after the 3.0.0 release.
It is still under development and currently best described as a refernce implementation of the Taskchampion sync protocol.

207
src/api/add_snapshot.rs Normal file
View file

@ -0,0 +1,207 @@
use crate::api::{client_id_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE};
use crate::server::{add_snapshot, VersionId, NIL_VERSION_ID};
use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result};
use futures::StreamExt;
use std::sync::Arc;
/// Max snapshot size: 100MB
const MAX_SIZE: usize = 100 * 1024 * 1024;
/// Add a new snapshot, after checking prerequisites. The snapshot should be transmitted in the
/// request entity body and must have content-type `application/vnd.taskchampion.snapshot`. The
/// content can be encoded in any of the formats supported by actix-web.
///
/// On success, the response is a 200 OK. Even in a 200 OK, the snapshot may not appear in a
/// subsequent `GetSnapshot` call.
///
/// Returns other 4xx or 5xx responses on other errors.
#[post("/v1/client/add-snapshot/{version_id}")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<Arc<ServerState>>,
path: web::Path<VersionId>,
mut payload: web::Payload,
) -> Result<HttpResponse> {
let version_id = path.into_inner();
// check content-type
if req.content_type() != SNAPSHOT_CONTENT_TYPE {
return Err(error::ErrorBadRequest("Bad content-type"));
}
let client_id = client_id_header(&req)?;
// read the body in its entirety
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
return Err(error::ErrorBadRequest("Snapshot over maximum allowed size"));
}
body.extend_from_slice(&chunk);
}
if body.is_empty() {
return Err(error::ErrorBadRequest("No snapshot supplied"));
}
// note that we do not open the transaction until the body has been read
// completely, to avoid blocking other storage access while that data is
// in transit.
let mut txn = server_state.storage.txn().map_err(failure_to_ise)?;
// get, or create, the client
let client = match txn.get_client(client_id).map_err(failure_to_ise)? {
Some(client) => client,
None => {
txn.new_client(client_id, NIL_VERSION_ID)
.map_err(failure_to_ise)?;
txn.get_client(client_id).map_err(failure_to_ise)?.unwrap()
}
};
add_snapshot(
txn,
&server_state.config,
client_id,
client,
version_id,
body.to_vec(),
)
.map_err(failure_to_ise)?;
Ok(HttpResponse::Ok().body(""))
}
#[cfg(test)]
mod test {
use super::*;
use crate::api::CLIENT_ID_HEADER;
use crate::storage::{InMemoryStorage, Storage};
use crate::Server;
use actix_web::{http::StatusCode, test, App};
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[actix_rt::test]
async fn test_success() -> anyhow::Result<()> {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, version_id).unwrap();
txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?;
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.insert_header(("Content-Type", "application/vnd.taskchampion.snapshot"))
.insert_header((CLIENT_ID_HEADER, client_id.to_string()))
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
// read back that snapshot
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
use actix_web::body::MessageBody;
let bytes = resp.into_body().try_into_bytes().unwrap();
assert_eq!(bytes.as_ref(), b"abcd");
Ok(())
}
#[actix_rt::test]
async fn test_not_added_200() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, NIL_VERSION_ID).unwrap();
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
// add a snapshot for a nonexistent version
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.append_header(("Content-Type", "application/vnd.taskchampion.snapshot"))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
// read back, seeing no snapshot
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[actix_rt::test]
async fn test_bad_content_type() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.append_header(("Content-Type", "not/correct"))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[actix_rt::test]
async fn test_empty_body() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/add-snapshot/{}", version_id);
let req = test::TestRequest::post()
.uri(&uri)
.append_header((
"Content-Type",
"application/vnd.taskchampion.history-segment",
))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}

233
src/api/add_version.rs Normal file
View file

@ -0,0 +1,233 @@
use crate::api::{
client_id_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE,
PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, VERSION_ID_HEADER,
};
use crate::server::{add_version, AddVersionResult, SnapshotUrgency, VersionId, NIL_VERSION_ID};
use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result};
use futures::StreamExt;
use std::sync::Arc;
/// Max history segment size: 100MB
const MAX_SIZE: usize = 100 * 1024 * 1024;
/// Add a new version, after checking prerequisites. The history segment should be transmitted in
/// the request entity body and must have content-type
/// `application/vnd.taskchampion.history-segment`. The content can be encoded in any of the
/// formats supported by actix-web.
///
/// On success, the response is a 200 OK with the new version ID in the `X-Version-Id` header. If
/// the version cannot be added due to a conflict, the response is a 409 CONFLICT with the expected
/// parent version ID in the `X-Parent-Version-Id` header.
///
/// If included, a snapshot request appears in the `X-Snapshot-Request` header with value
/// `urgency=low` or `urgency=high`.
///
/// Returns other 4xx or 5xx responses on other errors.
#[post("/v1/client/add-version/{parent_version_id}")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<Arc<ServerState>>,
path: web::Path<VersionId>,
mut payload: web::Payload,
) -> Result<HttpResponse> {
let parent_version_id = path.into_inner();
// check content-type
if req.content_type() != HISTORY_SEGMENT_CONTENT_TYPE {
return Err(error::ErrorBadRequest("Bad content-type"));
}
let client_id = client_id_header(&req)?;
// read the body in its entirety
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
return Err(error::ErrorBadRequest("overflow"));
}
body.extend_from_slice(&chunk);
}
if body.is_empty() {
return Err(error::ErrorBadRequest("Empty body"));
}
// note that we do not open the transaction until the body has been read
// completely, to avoid blocking other storage access while that data is
// in transit.
let mut txn = server_state.storage.txn().map_err(failure_to_ise)?;
// get, or create, the client
let client = match txn.get_client(client_id).map_err(failure_to_ise)? {
Some(client) => client,
None => {
txn.new_client(client_id, NIL_VERSION_ID)
.map_err(failure_to_ise)?;
txn.get_client(client_id).map_err(failure_to_ise)?.unwrap()
}
};
let (result, snap_urgency) = add_version(
txn,
&server_state.config,
client_id,
client,
parent_version_id,
body.to_vec(),
)
.map_err(failure_to_ise)?;
Ok(match result {
AddVersionResult::Ok(version_id) => {
let mut rb = HttpResponse::Ok();
rb.append_header((VERSION_ID_HEADER, version_id.to_string()));
match snap_urgency {
SnapshotUrgency::None => {}
SnapshotUrgency::Low => {
rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=low"));
}
SnapshotUrgency::High => {
rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=high"));
}
};
rb.finish()
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
let mut rb = HttpResponse::Conflict();
rb.append_header((PARENT_VERSION_ID_HEADER, parent_version_id.to_string()));
rb.finish()
}
})
}
#[cfg(test)]
mod test {
use crate::api::CLIENT_ID_HEADER;
use crate::storage::{InMemoryStorage, Storage};
use crate::Server;
use actix_web::{http::StatusCode, test, App};
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[actix_rt::test]
async fn test_success() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, Uuid::nil()).unwrap();
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/add-version/{}", parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.append_header((
"Content-Type",
"application/vnd.taskchampion.history-segment",
))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
// the returned version ID is random, but let's check that it's not
// the passed parent version ID, at least
let new_version_id = resp.headers().get("X-Version-Id").unwrap();
assert!(new_version_id != &version_id.to_string());
// Shapshot should be requested, since there is no existing snapshot
let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap();
assert_eq!(snapshot_request, "urgency=high");
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
#[actix_rt::test]
async fn test_conflict() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, version_id).unwrap();
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/add-version/{}", parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.append_header((
"Content-Type",
"application/vnd.taskchampion.history-segment",
))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::CONFLICT);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(
resp.headers().get("X-Parent-Version-Id").unwrap(),
&version_id.to_string()
);
}
#[actix_rt::test]
async fn test_bad_content_type() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/add-version/{}", parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.append_header(("Content-Type", "not/correct"))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.set_payload(b"abcd".to_vec())
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[actix_rt::test]
async fn test_empty_body() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/add-version/{}", parent_version_id);
let req = test::TestRequest::post()
.uri(&uri)
.append_header((
"Content-Type",
"application/vnd.taskchampion.history-segment",
))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}

View file

@ -0,0 +1,170 @@
use crate::api::{
client_id_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE,
PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER,
};
use crate::server::{get_child_version, GetVersionResult, VersionId};
use actix_web::{error, get, web, HttpRequest, HttpResponse, Result};
use std::sync::Arc;
/// Get a child version.
///
/// On succcess, the response is the same sequence of bytes originally sent to the server,
/// with content-type `application/vnd.taskchampion.history-segment`. The `X-Version-Id` and
/// `X-Parent-Version-Id` headers contain the corresponding values.
///
/// If no such child exists, returns a 404 with no content.
/// Returns other 4xx or 5xx responses on other errors.
#[get("/v1/client/get-child-version/{parent_version_id}")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<Arc<ServerState>>,
path: web::Path<VersionId>,
) -> Result<HttpResponse> {
let parent_version_id = path.into_inner();
let mut txn = server_state.storage.txn().map_err(failure_to_ise)?;
let client_id = client_id_header(&req)?;
let client = txn
.get_client(client_id)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
return match get_child_version(
txn,
&server_state.config,
client_id,
client,
parent_version_id,
)
.map_err(failure_to_ise)?
{
GetVersionResult::Success {
version_id,
parent_version_id,
history_segment,
} => Ok(HttpResponse::Ok()
.content_type(HISTORY_SEGMENT_CONTENT_TYPE)
.append_header((VERSION_ID_HEADER, version_id.to_string()))
.append_header((PARENT_VERSION_ID_HEADER, parent_version_id.to_string()))
.body(history_segment)),
GetVersionResult::NotFound => Err(error::ErrorNotFound("no such version")),
GetVersionResult::Gone => Err(error::ErrorGone("version has been deleted")),
};
}
#[cfg(test)]
mod test {
use crate::api::CLIENT_ID_HEADER;
use crate::server::NIL_VERSION_ID;
use crate::storage::{InMemoryStorage, Storage};
use crate::Server;
use actix_web::{http::StatusCode, test, App};
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[actix_rt::test]
async fn test_success() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, Uuid::new_v4()).unwrap();
txn.add_version(client_id, version_id, parent_version_id, b"abcd".to_vec())
.unwrap();
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/get-child-version/{}", parent_version_id);
let req = test::TestRequest::get()
.uri(&uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get("X-Version-Id").unwrap(),
&version_id.to_string()
);
assert_eq!(
resp.headers().get("X-Parent-Version-Id").unwrap(),
&parent_version_id.to_string()
);
assert_eq!(
resp.headers().get("Content-Type").unwrap(),
&"application/vnd.taskchampion.history-segment".to_string()
);
use actix_web::body::MessageBody;
let bytes = resp.into_body().try_into_bytes().unwrap();
assert_eq!(bytes.as_ref(), b"abcd");
}
#[actix_rt::test]
async fn test_client_not_found() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = format!("/v1/client/get-child-version/{}", parent_version_id);
let req = test::TestRequest::get()
.uri(&uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
#[actix_rt::test]
async fn test_version_not_found_and_gone() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// create the client, but not the version
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, Uuid::new_v4()).unwrap();
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
// the child of an unknown parent_version_id is GONE
let uri = format!("/v1/client/get-child-version/{}", parent_version_id);
let req = test::TestRequest::get()
.uri(&uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::GONE);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
// but the child of the nil parent_version_id is NOT FOUND, since
// there is no snapshot. The tests in crate::server test more
// corner cases.
let uri = format!("/v1/client/get-child-version/{}", NIL_VERSION_ID);
let req = test::TestRequest::get()
.uri(&uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
}

115
src/api/get_snapshot.rs Normal file
View file

@ -0,0 +1,115 @@
use crate::api::{
client_id_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE, VERSION_ID_HEADER,
};
use crate::server::get_snapshot;
use actix_web::{error, get, web, HttpRequest, HttpResponse, Result};
use std::sync::Arc;
/// Get a snapshot.
///
/// If a snapshot for this client exists, it is returned with content-type
/// `application/vnd.taskchampion.snapshot`. The `X-Version-Id` header contains the version of the
/// snapshot.
///
/// If no snapshot exists, returns a 404 with no content. Returns other 4xx or 5xx responses on
/// other errors.
#[get("/v1/client/snapshot")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<Arc<ServerState>>,
) -> Result<HttpResponse> {
let mut txn = server_state.storage.txn().map_err(failure_to_ise)?;
let client_id = client_id_header(&req)?;
let client = txn
.get_client(client_id)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
if let Some((version_id, data)) =
get_snapshot(txn, &server_state.config, client_id, client).map_err(failure_to_ise)?
{
Ok(HttpResponse::Ok()
.content_type(SNAPSHOT_CONTENT_TYPE)
.append_header((VERSION_ID_HEADER, version_id.to_string()))
.body(data))
} else {
Err(error::ErrorNotFound("no snapshot"))
}
}
#[cfg(test)]
mod test {
use crate::api::CLIENT_ID_HEADER;
use crate::storage::{InMemoryStorage, Snapshot, Storage};
use crate::Server;
use actix_web::{http::StatusCode, test, App};
use chrono::{TimeZone, Utc};
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[actix_rt::test]
async fn test_not_found() {
let client_id = Uuid::new_v4();
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, Uuid::new_v4()).unwrap();
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[actix_rt::test]
async fn test_success() {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let snapshot_data = vec![1, 2, 3, 4];
let storage: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = storage.txn().unwrap();
txn.new_client(client_id, Uuid::new_v4()).unwrap();
txn.set_snapshot(
client_id,
Snapshot {
version_id,
versions_since: 3,
timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40),
},
snapshot_data.clone(),
)
.unwrap();
}
let server = Server::new(Default::default(), storage);
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let uri = "/v1/client/snapshot";
let req = test::TestRequest::get()
.uri(uri)
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
use actix_web::body::MessageBody;
let bytes = resp.into_body().try_into_bytes().unwrap();
assert_eq!(bytes.as_ref(), snapshot_data);
}
}

61
src/api/mod.rs Normal file
View file

@ -0,0 +1,61 @@
use crate::server::ClientId;
use crate::storage::Storage;
use crate::ServerConfig;
use actix_web::{error, http::StatusCode, web, HttpRequest, Result, Scope};
mod add_snapshot;
mod add_version;
mod get_child_version;
mod get_snapshot;
/// The content-type for history segments (opaque blobs of bytes)
pub(crate) const HISTORY_SEGMENT_CONTENT_TYPE: &str =
"application/vnd.taskchampion.history-segment";
/// The content-type for snapshots (opaque blobs of bytes)
pub(crate) const SNAPSHOT_CONTENT_TYPE: &str = "application/vnd.taskchampion.snapshot";
/// The header name for version ID
pub(crate) const VERSION_ID_HEADER: &str = "X-Version-Id";
/// The header name for client id
pub(crate) const CLIENT_ID_HEADER: &str = "X-Client-Id";
/// The header name for parent version ID
pub(crate) const PARENT_VERSION_ID_HEADER: &str = "X-Parent-Version-Id";
/// The header name for parent version ID
pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request";
/// The type containing a reference to the persistent state for the server
pub(crate) struct ServerState {
pub(crate) storage: Box<dyn Storage>,
pub(crate) config: ServerConfig,
}
pub(crate) fn api_scope() -> Scope {
web::scope("")
.service(get_child_version::service)
.service(add_version::service)
.service(get_snapshot::service)
.service(add_snapshot::service)
}
/// Convert a failure::Error to an Actix ISE
fn failure_to_ise(err: anyhow::Error) -> impl actix_web::ResponseError {
error::InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
}
/// Get the client id
fn client_id_header(req: &HttpRequest) -> Result<ClientId> {
fn badrequest() -> error::Error {
error::ErrorBadRequest("bad x-client-id")
}
if let Some(client_id_hdr) = req.headers().get(CLIENT_ID_HEADER) {
let client_id = client_id_hdr.to_str().map_err(|_| badrequest())?;
let client_id = ClientId::parse_str(client_id).map_err(|_| badrequest())?;
Ok(client_id)
} else {
Err(badrequest())
}
}

View file

@ -0,0 +1,77 @@
#![deny(clippy::all)]
use actix_web::{middleware::Logger, App, HttpServer};
use clap::{arg, builder::ValueParser, value_parser, Command};
use std::ffi::OsString;
use taskchampion_sync_server::storage::SqliteStorage;
use taskchampion_sync_server::{Server, ServerConfig};
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let defaults = ServerConfig::default();
let default_snapshot_versions = defaults.snapshot_versions.to_string();
let default_snapshot_days = defaults.snapshot_days.to_string();
let matches = Command::new("taskchampion-sync-server")
.version(env!("CARGO_PKG_VERSION"))
.about("Server for TaskChampion")
.arg(
arg!(-p --port <PORT> "Port on which to serve")
.help("Port on which to serve")
.value_parser(value_parser!(usize))
.default_value("8080"),
)
.arg(
arg!(-d --"data-dir" <DIR> "Directory in which to store data")
.value_parser(ValueParser::os_string())
.default_value("/var/lib/taskchampion-sync-server"),
)
.arg(
arg!(--"snapshot-versions" <NUM> "Target number of versions between snapshots")
.value_parser(value_parser!(u32))
.default_value(default_snapshot_versions),
)
.arg(
arg!(--"snapshot-days" <NUM> "Target number of days between snapshots")
.value_parser(value_parser!(i64))
.default_value(default_snapshot_days),
)
.get_matches();
let data_dir: &OsString = matches.get_one("data-dir").unwrap();
let port: usize = *matches.get_one("port").unwrap();
let snapshot_versions: u32 = *matches.get_one("snapshot-versions").unwrap();
let snapshot_days: i64 = *matches.get_one("snapshot-days").unwrap();
let config = ServerConfig::from_args(snapshot_days, snapshot_versions)?;
let server = Server::new(config, Box::new(SqliteStorage::new(data_dir)?));
log::warn!("Serving on port {}", port);
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.configure(|cfg| server.config(cfg))
})
.bind(format!("0.0.0.0:{}", port))?
.run()
.await?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use actix_web::{test, App};
use taskchampion_sync_server::storage::InMemoryStorage;
#[actix_rt::test]
async fn test_index_get() {
let server = Server::new(Default::default(), Box::new(InMemoryStorage::new()));
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let req = test::TestRequest::get().uri("/").to_request();
let resp = test::call_service(&mut app, req).await;
assert!(resp.status().is_success());
}
}

72
src/lib.rs Normal file
View file

@ -0,0 +1,72 @@
#![deny(clippy::all)]
mod api;
mod server;
pub mod storage;
use crate::storage::Storage;
use actix_web::{get, middleware, web, Responder};
use api::{api_scope, ServerState};
use std::sync::Arc;
pub use server::ServerConfig;
#[get("/")]
async fn index() -> impl Responder {
format!("TaskChampion sync server v{}", env!("CARGO_PKG_VERSION"))
}
/// A Server represents a sync server.
#[derive(Clone)]
pub struct Server {
server_state: Arc<ServerState>,
}
impl Server {
/// Create a new sync server with the given storage implementation.
pub fn new(config: ServerConfig, storage: Box<dyn Storage>) -> Self {
Self {
server_state: Arc::new(ServerState { config, storage }),
}
}
/// Get an Actix-web service for this server.
pub fn config(&self, cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("")
.app_data(web::Data::new(self.server_state.clone()))
.wrap(
middleware::DefaultHeaders::new().add(("Cache-Control", "no-store, max-age=0")),
)
.service(index)
.service(api_scope()),
);
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::InMemoryStorage;
use actix_web::{test, App};
use pretty_assertions::assert_eq;
pub(crate) fn init_logging() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[actix_rt::test]
async fn test_cache_control() {
let server = Server::new(Default::default(), Box::new(InMemoryStorage::new()));
let app = App::new().configure(|sc| server.config(sc));
let mut app = test::init_service(app).await;
let req = test::TestRequest::get().uri("/").to_request();
let resp = test::call_service(&mut app, req).await;
assert!(resp.status().is_success());
assert_eq!(
resp.headers().get("Cache-Control").unwrap(),
&"no-store, max-age=0".to_string()
)
}
}

1048
src/server.rs Normal file

File diff suppressed because it is too large Load diff

286
src/storage/inmemory.rs Normal file
View file

@ -0,0 +1,286 @@
use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version};
use std::collections::HashMap;
use std::sync::{Mutex, MutexGuard};
struct Inner {
/// Clients, indexed by client_id
clients: HashMap<Uuid, Client>,
/// Snapshot data, indexed by client id
snapshots: HashMap<Uuid, Vec<u8>>,
/// Versions, indexed by (client_id, version_id)
versions: HashMap<(Uuid, Uuid), Version>,
/// Child versions, indexed by (client_id, parent_version_id)
children: HashMap<(Uuid, Uuid), Uuid>,
}
pub struct InMemoryStorage(Mutex<Inner>);
impl InMemoryStorage {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self(Mutex::new(Inner {
clients: HashMap::new(),
snapshots: HashMap::new(),
versions: HashMap::new(),
children: HashMap::new(),
}))
}
}
struct InnerTxn<'a>(MutexGuard<'a, Inner>);
/// In-memory storage for testing and experimentation.
///
/// NOTE: this does not implement transaction rollback.
impl Storage for InMemoryStorage {
fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
Ok(Box::new(InnerTxn(self.0.lock().expect("poisoned lock"))))
}
}
impl<'a> StorageTxn for InnerTxn<'a> {
fn get_client(&mut self, client_id: Uuid) -> anyhow::Result<Option<Client>> {
Ok(self.0.clients.get(&client_id).cloned())
}
fn new_client(&mut self, client_id: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> {
if self.0.clients.get(&client_id).is_some() {
return Err(anyhow::anyhow!("Client {} already exists", client_id));
}
self.0.clients.insert(
client_id,
Client {
latest_version_id,
snapshot: None,
},
);
Ok(())
}
fn set_snapshot(
&mut self,
client_id: Uuid,
snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()> {
let client = self
.0
.clients
.get_mut(&client_id)
.ok_or_else(|| anyhow::anyhow!("no such client"))?;
client.snapshot = Some(snapshot);
self.0.snapshots.insert(client_id, data);
Ok(())
}
fn get_snapshot_data(
&mut self,
client_id: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>> {
// sanity check
let client = self.0.clients.get(&client_id);
let client = client.ok_or_else(|| anyhow::anyhow!("no such client"))?;
if Some(&version_id) != client.snapshot.as_ref().map(|snap| &snap.version_id) {
return Err(anyhow::anyhow!("unexpected snapshot_version_id"));
}
Ok(self.0.snapshots.get(&client_id).cloned())
}
fn get_version_by_parent(
&mut self,
client_id: Uuid,
parent_version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
if let Some(parent_version_id) = self.0.children.get(&(client_id, parent_version_id)) {
Ok(self
.0
.versions
.get(&(client_id, *parent_version_id))
.cloned())
} else {
Ok(None)
}
}
fn get_version(
&mut self,
client_id: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
Ok(self.0.versions.get(&(client_id, version_id)).cloned())
}
fn add_version(
&mut self,
client_id: Uuid,
version_id: Uuid,
parent_version_id: Uuid,
history_segment: Vec<u8>,
) -> anyhow::Result<()> {
// TODO: verify it doesn't exist (`.entry`?)
let version = Version {
version_id,
parent_version_id,
history_segment,
};
if let Some(client) = self.0.clients.get_mut(&client_id) {
client.latest_version_id = version_id;
if let Some(ref mut snap) = client.snapshot {
snap.versions_since += 1;
}
} else {
return Err(anyhow::anyhow!("Client {} does not exist", client_id));
}
self.0
.children
.insert((client_id, parent_version_id), version_id);
self.0.versions.insert((client_id, version_id), version);
Ok(())
}
fn commit(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use chrono::Utc;
#[test]
fn test_get_client_empty() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let maybe_client = txn.get_client(Uuid::new_v4())?;
assert!(maybe_client.is_none());
Ok(())
}
#[test]
fn test_client_storage() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let latest_version_id = Uuid::new_v4();
txn.new_client(client_id, latest_version_id)?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let latest_version_id = Uuid::new_v4();
txn.add_version(client_id, latest_version_id, Uuid::new_v4(), vec![1, 1])?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 4,
};
txn.set_snapshot(client_id, snap.clone(), vec![1, 2, 3])?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.snapshot.unwrap(), snap);
Ok(())
}
#[test]
fn test_gvbp_empty() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?;
assert!(maybe_version.is_none());
Ok(())
}
#[test]
fn test_add_version_and_get_version() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abc".to_vec();
txn.new_client(client_id, parent_version_id)?;
txn.add_version(
client_id,
version_id,
parent_version_id,
history_segment.clone(),
)?;
let expected = Version {
version_id,
parent_version_id,
history_segment,
};
let version = txn
.get_version_by_parent(client_id, parent_version_id)?
.unwrap();
assert_eq!(version, expected);
let version = txn.get_version(client_id, version_id)?.unwrap();
assert_eq!(version, expected);
Ok(())
}
#[test]
fn test_snapshots() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
txn.new_client(client_id, Uuid::new_v4())?;
assert!(txn.get_client(client_id)?.unwrap().snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 3,
};
txn.set_snapshot(client_id, snap.clone(), vec![9, 8, 9])?;
assert_eq!(
txn.get_snapshot_data(client_id, snap.version_id)?.unwrap(),
vec![9, 8, 9]
);
assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap));
let snap2 = Snapshot {
version_id: Uuid::new_v4(),
timestamp: Utc::now(),
versions_since: 10,
};
txn.set_snapshot(client_id, snap2.clone(), vec![0, 2, 4, 6])?;
assert_eq!(
txn.get_snapshot_data(client_id, snap2.version_id)?.unwrap(),
vec![0, 2, 4, 6]
);
assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap2));
// check that mismatched version is detected
assert!(txn.get_snapshot_data(client_id, Uuid::new_v4()).is_err());
Ok(())
}
}

95
src/storage/mod.rs Normal file
View file

@ -0,0 +1,95 @@
use chrono::{DateTime, Utc};
use uuid::Uuid;
#[cfg(debug_assertions)]
mod inmemory;
#[cfg(debug_assertions)]
pub use inmemory::InMemoryStorage;
mod sqlite;
pub use self::sqlite::SqliteStorage;
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Client {
/// The latest version for this client (may be the nil version)
pub latest_version_id: Uuid,
/// Data about the latest snapshot for this client
pub snapshot: Option<Snapshot>,
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Snapshot {
/// ID of the version at which this snapshot was made
pub version_id: Uuid,
/// Timestamp at which this snapshot was set
pub timestamp: DateTime<Utc>,
/// Number of versions since this snapshot was made
pub versions_since: u32,
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Version {
pub version_id: Uuid,
pub parent_version_id: Uuid,
pub history_segment: Vec<u8>,
}
pub trait StorageTxn {
/// Get information about the given client
fn get_client(&mut self, client_id: Uuid) -> anyhow::Result<Option<Client>>;
/// Create a new client with the given latest_version_id
fn new_client(&mut self, client_id: Uuid, latest_version_id: Uuid) -> anyhow::Result<()>;
/// Set the client's most recent snapshot.
fn set_snapshot(
&mut self,
client_id: Uuid,
snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()>;
/// Get the data for the most recent snapshot. The version_id
/// is used to verify that the snapshot is for the correct version.
fn get_snapshot_data(
&mut self,
client_id: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>>;
/// Get a version, indexed by parent version id
fn get_version_by_parent(
&mut self,
client_id: Uuid,
parent_version_id: Uuid,
) -> anyhow::Result<Option<Version>>;
/// Get a version, indexed by its own version id
fn get_version(&mut self, client_id: Uuid, version_id: Uuid)
-> anyhow::Result<Option<Version>>;
/// Add a version (that must not already exist), and
/// - update latest_version_id
/// - increment snapshot.versions_since
fn add_version(
&mut self,
client_id: Uuid,
version_id: Uuid,
parent_version_id: Uuid,
history_segment: Vec<u8>,
) -> anyhow::Result<()>;
/// Commit any changes made in the transaction. It is an error to call this more than
/// once. It is safe to skip this call for read-only operations.
fn commit(&mut self) -> anyhow::Result<()>;
}
/// A trait for objects able to act as storage. Most of the interesting behavior is in the
/// [`crate::storage::StorageTxn`] trait.
pub trait Storage: Send + Sync {
/// Begin a transaction
fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>>;
}

451
src/storage/sqlite.rs Normal file
View file

@ -0,0 +1,451 @@
use super::{Client, Snapshot, Storage, StorageTxn, Uuid, Version};
use anyhow::Context;
use chrono::{TimeZone, Utc};
use rusqlite::types::{FromSql, ToSql};
use rusqlite::{params, Connection, OptionalExtension};
use std::path::Path;
#[derive(Debug, thiserror::Error)]
enum SqliteError {
#[error("Failed to create SQLite transaction")]
CreateTransactionFailed,
}
/// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid`
struct StoredUuid(Uuid);
/// Conversion from Uuid stored as a string (rusqlite's uuid feature stores as binary blob)
impl FromSql for StoredUuid {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
let u = Uuid::parse_str(value.as_str()?)
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
Ok(StoredUuid(u))
}
}
/// Store Uuid as string in database
impl ToSql for StoredUuid {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
let s = self.0.to_string();
Ok(s.into())
}
}
/// An on-disk storage backend which uses SQLite
pub struct SqliteStorage {
db_file: std::path::PathBuf,
}
impl SqliteStorage {
fn new_connection(&self) -> anyhow::Result<Connection> {
Ok(Connection::open(&self.db_file)?)
}
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<SqliteStorage> {
std::fs::create_dir_all(&directory)?;
let db_file = directory.as_ref().join("taskchampion-sync-server.sqlite3");
let o = SqliteStorage { db_file };
{
let mut con = o.new_connection()?;
let txn = con.transaction()?;
let queries = vec![
"CREATE TABLE IF NOT EXISTS clients (
client_id STRING PRIMARY KEY,
latest_version_id STRING,
snapshot_version_id STRING,
versions_since_snapshot INTEGER,
snapshot_timestamp INTEGER,
snapshot BLOB);",
"CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_id STRING, parent_version_id STRING, history_segment BLOB);",
"CREATE INDEX IF NOT EXISTS versions_by_parent ON versions (parent_version_id);",
];
for q in queries {
txn.execute(q, [])
.context("Error while creating SQLite tables")?;
}
txn.commit()?;
}
Ok(o)
}
}
impl Storage for SqliteStorage {
fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
let con = self.new_connection()?;
let t = Txn { con };
Ok(Box::new(t))
}
}
struct Txn {
con: Connection,
}
impl Txn {
fn get_txn(&mut self) -> Result<rusqlite::Transaction, SqliteError> {
self.con
.transaction()
.map_err(|_e| SqliteError::CreateTransactionFailed)
}
/// Implementation for queries from the versions table
fn get_version_impl(
&mut self,
query: &'static str,
client_id: Uuid,
version_id_arg: Uuid,
) -> anyhow::Result<Option<Version>> {
let t = self.get_txn()?;
let r = t
.query_row(
query,
params![&StoredUuid(version_id_arg), &StoredUuid(client_id)],
|r| {
let version_id: StoredUuid = r.get("version_id")?;
let parent_version_id: StoredUuid = r.get("parent_version_id")?;
Ok(Version {
version_id: version_id.0,
parent_version_id: parent_version_id.0,
history_segment: r.get("history_segment")?,
})
},
)
.optional()
.context("Error getting version")?;
Ok(r)
}
}
impl StorageTxn for Txn {
fn get_client(&mut self, client_id: Uuid) -> anyhow::Result<Option<Client>> {
let t = self.get_txn()?;
let result: Option<Client> = t
.query_row(
"SELECT
latest_version_id,
snapshot_timestamp,
versions_since_snapshot,
snapshot_version_id
FROM clients
WHERE client_id = ?
LIMIT 1",
[&StoredUuid(client_id)],
|r| {
let latest_version_id: StoredUuid = r.get(0)?;
let snapshot_timestamp: Option<i64> = r.get(1)?;
let versions_since_snapshot: Option<u32> = r.get(2)?;
let snapshot_version_id: Option<StoredUuid> = r.get(3)?;
// if all of the relevant fields are non-NULL, return a snapshot
let snapshot = match (
snapshot_timestamp,
versions_since_snapshot,
snapshot_version_id,
) {
(Some(ts), Some(vs), Some(v)) => Some(Snapshot {
version_id: v.0,
timestamp: Utc.timestamp(ts, 0),
versions_since: vs,
}),
_ => None,
};
Ok(Client {
latest_version_id: latest_version_id.0,
snapshot,
})
},
)
.optional()
.context("Error getting client")?;
Ok(result)
}
fn new_client(&mut self, client_id: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> {
let t = self.get_txn()?;
t.execute(
"INSERT OR REPLACE INTO clients (client_id, latest_version_id) VALUES (?, ?)",
params![&StoredUuid(client_id), &StoredUuid(latest_version_id)],
)
.context("Error creating/updating client")?;
t.commit()?;
Ok(())
}
fn set_snapshot(
&mut self,
client_id: Uuid,
snapshot: Snapshot,
data: Vec<u8>,
) -> anyhow::Result<()> {
let t = self.get_txn()?;
t.execute(
"UPDATE clients
SET
snapshot_version_id = ?,
snapshot_timestamp = ?,
versions_since_snapshot = ?,
snapshot = ?
WHERE client_id = ?",
params![
&StoredUuid(snapshot.version_id),
snapshot.timestamp.timestamp(),
snapshot.versions_since,
data,
&StoredUuid(client_id),
],
)
.context("Error creating/updating snapshot")?;
t.commit()?;
Ok(())
}
fn get_snapshot_data(
&mut self,
client_id: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Vec<u8>>> {
let t = self.get_txn()?;
let r = t
.query_row(
"SELECT snapshot, snapshot_version_id FROM clients WHERE client_id = ?",
params![&StoredUuid(client_id)],
|r| {
let v: StoredUuid = r.get("snapshot_version_id")?;
let d: Vec<u8> = r.get("snapshot")?;
Ok((v.0, d))
},
)
.optional()
.context("Error getting snapshot")?;
r.map(|(v, d)| {
if v != version_id {
return Err(anyhow::anyhow!("unexpected snapshot_version_id"));
}
Ok(d)
})
.transpose()
}
fn get_version_by_parent(
&mut self,
client_id: Uuid,
parent_version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
self.get_version_impl(
"SELECT version_id, parent_version_id, history_segment FROM versions WHERE parent_version_id = ? AND client_id = ?",
client_id,
parent_version_id)
}
fn get_version(
&mut self,
client_id: Uuid,
version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
self.get_version_impl(
"SELECT version_id, parent_version_id, history_segment FROM versions WHERE version_id = ? AND client_id = ?",
client_id,
version_id)
}
fn add_version(
&mut self,
client_id: Uuid,
version_id: Uuid,
parent_version_id: Uuid,
history_segment: Vec<u8>,
) -> anyhow::Result<()> {
let t = self.get_txn()?;
t.execute(
"INSERT INTO versions (version_id, client_id, parent_version_id, history_segment) VALUES(?, ?, ?, ?)",
params![
StoredUuid(version_id),
StoredUuid(client_id),
StoredUuid(parent_version_id),
history_segment
]
)
.context("Error adding version")?;
t.execute(
"UPDATE clients
SET
latest_version_id = ?,
versions_since_snapshot = versions_since_snapshot + 1
WHERE client_id = ?",
params![StoredUuid(version_id), StoredUuid(client_id),],
)
.context("Error updating client for new version")?;
t.commit()?;
Ok(())
}
fn commit(&mut self) -> anyhow::Result<()> {
// FIXME: Note the queries aren't currently run in a
// transaction, as storing the transaction object and a pooled
// connection in the `Txn` object is complex.
// https://github.com/taskchampion/taskchampion/pull/206#issuecomment-860336073
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use chrono::DateTime;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
#[test]
fn test_emtpy_dir() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let non_existant = tmp_dir.path().join("subdir");
let storage = SqliteStorage::new(non_existant)?;
let mut txn = storage.txn()?;
let maybe_client = txn.get_client(Uuid::new_v4())?;
assert!(maybe_client.is_none());
Ok(())
}
#[test]
fn test_get_client_empty() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(tmp_dir.path())?;
let mut txn = storage.txn()?;
let maybe_client = txn.get_client(Uuid::new_v4())?;
assert!(maybe_client.is_none());
Ok(())
}
#[test]
fn test_client_storage() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(tmp_dir.path())?;
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let latest_version_id = Uuid::new_v4();
txn.new_client(client_id, latest_version_id)?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let latest_version_id = Uuid::new_v4();
txn.add_version(client_id, latest_version_id, Uuid::new_v4(), vec![1, 1])?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2014-11-28T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 4,
};
txn.set_snapshot(client_id, snap.clone(), vec![1, 2, 3])?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.snapshot.unwrap(), snap);
Ok(())
}
#[test]
fn test_gvbp_empty() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(tmp_dir.path())?;
let mut txn = storage.txn()?;
let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?;
assert!(maybe_version.is_none());
Ok(())
}
#[test]
fn test_add_version_and_get_version() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(tmp_dir.path())?;
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abc".to_vec();
txn.add_version(
client_id,
version_id,
parent_version_id,
history_segment.clone(),
)?;
let expected = Version {
version_id,
parent_version_id,
history_segment,
};
let version = txn
.get_version_by_parent(client_id, parent_version_id)?
.unwrap();
assert_eq!(version, expected);
let version = txn.get_version(client_id, version_id)?.unwrap();
assert_eq!(version, expected);
Ok(())
}
#[test]
fn test_snapshots() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(tmp_dir.path())?;
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
txn.new_client(client_id, Uuid::new_v4())?;
assert!(txn.get_client(client_id)?.unwrap().snapshot.is_none());
let snap = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2013-10-08T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 3,
};
txn.set_snapshot(client_id, snap.clone(), vec![9, 8, 9])?;
assert_eq!(
txn.get_snapshot_data(client_id, snap.version_id)?.unwrap(),
vec![9, 8, 9]
);
assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap));
let snap2 = Snapshot {
version_id: Uuid::new_v4(),
timestamp: "2014-11-28T12:00:09Z".parse::<DateTime<Utc>>().unwrap(),
versions_since: 10,
};
txn.set_snapshot(client_id, snap2.clone(), vec![0, 2, 4, 6])?;
assert_eq!(
txn.get_snapshot_data(client_id, snap2.version_id)?.unwrap(),
vec![0, 2, 4, 6]
);
assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap2));
// check that mismatched version is detected
assert!(txn.get_snapshot_data(client_id, Uuid::new_v4()).is_err());
Ok(())
}
}