Request snapshots in AddVersion

This commit is contained in:
Dustin J. Mitchell 2021-10-03 02:14:36 +00:00
parent d1da8eee52
commit 7bb6ea6865
3 changed files with 328 additions and 78 deletions

View file

@ -1,8 +1,8 @@
use crate::api::{
client_key_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE,
PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER,
PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, VERSION_ID_HEADER,
};
use crate::server::{add_version, AddVersionResult, VersionId, NO_VERSION_ID};
use crate::server::{add_version, AddVersionResult, SnapshotUrgency, VersionId, NO_VERSION_ID};
use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result};
use futures::StreamExt;
@ -18,6 +18,9 @@ const MAX_SIZE: usize = 100 * 1024 * 1024;
/// the version cannot be added due to a conflict, the response is a 409 CONFLICT with the expected
/// parent version ID in the `X-Parent-Version-Id` header.
///
/// If included, a snapshot request appears in the `X-Snapshot-Request` header with value
/// `urgency=low` or `urgency=high`.
///
/// Returns other 4xx or 5xx responses on other errors.
#[post("/v1/client/add-version/{parent_version_id}")]
pub(crate) async fn service(
@ -63,15 +66,30 @@ pub(crate) async fn service(
}
};
let result = add_version(txn, client_key, client, parent_version_id, body.to_vec())
.map_err(failure_to_ise)?;
let (result, snap_urgency) =
add_version(txn, client_key, client, parent_version_id, body.to_vec())
.map_err(failure_to_ise)?;
Ok(match result {
AddVersionResult::Ok(version_id) => HttpResponse::Ok()
.header(VERSION_ID_HEADER, version_id.to_string())
.body(""),
AddVersionResult::ExpectedParentVersion(parent_version_id) => HttpResponse::Conflict()
.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string())
.body(""),
AddVersionResult::Ok(version_id) => {
let mut rb = HttpResponse::Ok();
rb.header(VERSION_ID_HEADER, version_id.to_string());
match snap_urgency {
SnapshotUrgency::None => {}
SnapshotUrgency::Low => {
rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=low");
}
SnapshotUrgency::High => {
rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=high");
}
};
rb.finish()
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
let mut rb = HttpResponse::Conflict();
rb.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string());
rb.finish()
}
})
}
@ -117,6 +135,10 @@ mod test {
let new_version_id = resp.headers().get("X-Version-Id").unwrap();
assert!(new_version_id != &version_id.to_string());
// Shapshot should be requested, since there is no existing snapshot
let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap();
assert_eq!(snapshot_request, "urgency=high");
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}

View file

@ -24,6 +24,9 @@ pub(crate) const CLIENT_KEY_HEADER: &str = "X-Client-Key";
/// The header name for parent version ID
pub(crate) const PARENT_VERSION_ID_HEADER: &str = "X-Parent-Version-Id";
/// The header name for parent version ID
pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request";
/// The type containing a reference to the Storage object in the Actix state.
pub(crate) type ServerState = Arc<dyn Storage>;

View file

@ -13,6 +13,12 @@ pub const NO_VERSION_ID: VersionId = Uuid::nil();
/// than this will be rejected.
const SNAPSHOT_SEARCH_LEN: i32 = 5;
/// Maximum number of days between snapshots
const SNAPSHOT_DAYS: i64 = 14;
/// Maximum number of versions between snapshots
const SNAPSHOT_VERSIONS: u32 = 30;
pub(crate) type HistorySegment = Vec<u8>;
pub(crate) type ClientKey = Uuid;
pub(crate) type VersionId = Uuid;
@ -75,6 +81,43 @@ pub(crate) enum AddVersionResult {
ExpectedParentVersion(VersionId),
}
/// Urgency of a snapshot for a client; used to create the `X-Snapshot-Request` header.
#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)]
pub(crate) enum SnapshotUrgency {
/// Don't need a snapshot right now.
None,
/// A snapshot would be good, but can wait for other replicas to provide it.
Low,
/// A snapshot is needed right now.
High,
}
impl SnapshotUrgency {
/// Calculate the urgency for a snapshot based on its age in days
fn for_days(days: i64) -> Self {
if days >= SNAPSHOT_DAYS * 3 / 2 {
SnapshotUrgency::High
} else if days >= SNAPSHOT_DAYS {
SnapshotUrgency::Low
} else {
SnapshotUrgency::None
}
}
/// Calculate the urgency for a snapshot based on its age in versions
fn for_versions_since(versions_since: u32) -> Self {
if versions_since >= SNAPSHOT_VERSIONS * 3 / 2 {
SnapshotUrgency::High
} else if versions_since >= SNAPSHOT_VERSIONS {
SnapshotUrgency::Low
} else {
SnapshotUrgency::None
}
}
}
/// Implementation of the AddVersion protocol transaction
pub(crate) fn add_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
@ -82,7 +125,7 @@ pub(crate) fn add_version<'a>(
client: Client,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> {
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
log::debug!(
"add_version(client_key: {}, parent_version_id: {})",
client_key,
@ -92,8 +135,9 @@ pub(crate) fn add_version<'a>(
// check if this version is acceptable, under the protection of the transaction
if client.latest_version_id != NO_VERSION_ID && parent_version_id != client.latest_version_id {
log::debug!("add_version request rejected: mismatched latest_version_id");
return Ok(AddVersionResult::ExpectedParentVersion(
client.latest_version_id,
return Ok((
AddVersionResult::ExpectedParentVersion(client.latest_version_id),
SnapshotUrgency::None,
));
}
@ -108,7 +152,26 @@ pub(crate) fn add_version<'a>(
txn.add_version(client_key, version_id, parent_version_id, history_segment)?;
txn.commit()?;
Ok(AddVersionResult::Ok(version_id))
// calculate the urgency
let time_urgency = match client.snapshot {
None => SnapshotUrgency::High,
Some(Snapshot { timestamp, .. }) => {
SnapshotUrgency::for_days((Utc::now() - timestamp).num_days())
}
};
println!("{:?}", client.snapshot);
let version_urgency = match client.snapshot {
None => SnapshotUrgency::High,
Some(Snapshot { versions_since, .. }) => {
SnapshotUrgency::for_versions_since(versions_since)
}
};
Ok((
AddVersionResult::Ok(version_id),
std::cmp::max(time_urgency, version_urgency),
))
}
/// Implementation of the AddSnapshot protocol transaction
@ -214,11 +277,44 @@ mod test {
use super::*;
use crate::storage::{InMemoryStorage, Snapshot, Storage};
use crate::test::init_logging;
use chrono::{TimeZone, Utc};
use chrono::{Duration, TimeZone, Utc};
use pretty_assertions::assert_eq;
#[test]
fn gcv_not_found_initial() -> anyhow::Result<()> {
fn snapshot_urgency_max() {
use SnapshotUrgency::*;
assert_eq!(std::cmp::max(None, None), None);
assert_eq!(std::cmp::max(None, Low), Low);
assert_eq!(std::cmp::max(None, High), High);
assert_eq!(std::cmp::max(Low, None), Low);
assert_eq!(std::cmp::max(Low, Low), Low);
assert_eq!(std::cmp::max(Low, High), High);
assert_eq!(std::cmp::max(High, None), High);
assert_eq!(std::cmp::max(High, Low), High);
assert_eq!(std::cmp::max(High, High), High);
}
#[test]
fn snapshot_urgency_for_days() {
use SnapshotUrgency::*;
assert_eq!(SnapshotUrgency::for_days(0), None);
assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS), Low);
assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS * 2), High);
}
#[test]
fn snapshot_urgency_for_versions_since() {
use SnapshotUrgency::*;
assert_eq!(SnapshotUrgency::for_versions_since(0), None);
assert_eq!(SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS), Low);
assert_eq!(
SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS * 2),
High
);
}
#[test]
fn get_child_version_not_found_initial() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
@ -236,7 +332,7 @@ mod test {
}
#[test]
fn gcv_gone_initial() -> anyhow::Result<()> {
fn get_child_version_gone_initial() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
@ -264,7 +360,7 @@ mod test {
}
#[test]
fn gcv_not_found_up_to_date() -> anyhow::Result<()> {
fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
@ -285,7 +381,7 @@ mod test {
}
#[test]
fn gcv_gone() -> anyhow::Result<()> {
fn get_child_version_gone() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
@ -306,7 +402,7 @@ mod test {
}
#[test]
fn gcv_found() -> anyhow::Result<()> {
fn get_child_version_found() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
@ -336,90 +432,221 @@ mod test {
Ok(())
}
#[test]
fn av_conflict() -> anyhow::Result<()> {
/// Utility setup function for add_version tests
fn av_setup(
storage: &InMemoryStorage,
num_versions: u32,
snapshot_version: Option<u32>,
snapshot_days_ago: Option<i64>,
) -> anyhow::Result<(Uuid, Vec<Uuid>)> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
let existing_parent_version_id = Uuid::new_v4();
let client = Client {
latest_version_id: existing_parent_version_id,
snapshot: None,
};
let mut versions = vec![];
let mut version_id = Uuid::nil();
txn.new_client(client_key, Uuid::nil())?;
for vnum in 0..num_versions {
let parent_version_id = version_id;
version_id = Uuid::new_v4();
versions.push(version_id);
txn.add_version(
client_key,
version_id,
parent_version_id,
vec![0, 0, vnum as u8],
)?;
if Some(vnum) == snapshot_version {
txn.set_snapshot(
client_key,
Snapshot {
version_id,
versions_since: 0,
timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)),
},
vec![vnum as u8],
)?;
}
}
Ok((client_key, versions))
}
/// Utility function to check the results of an add_version call
fn av_success_check(
storage: &InMemoryStorage,
client_key: Uuid,
existing_versions: &[Uuid],
result: (AddVersionResult, SnapshotUrgency),
expected_history: Vec<u8>,
expected_urgency: SnapshotUrgency,
) -> anyhow::Result<()> {
if let AddVersionResult::Ok(new_version_id) = result.0 {
// check that it invented a new version ID
for v in existing_versions {
assert_ne!(&new_version_id, v);
}
// verify that the storage was updated
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, new_version_id);
let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil);
let version = txn.get_version(client_key, new_version_id)?.unwrap();
assert_eq!(version.version_id, new_version_id);
assert_eq!(version.parent_version_id, parent_version_id);
assert_eq!(version.history_segment, expected_history);
} else {
panic!("did not get Ok from add_version: {:?}", result);
}
assert_eq!(result.1, expected_urgency);
Ok(())
}
#[test]
fn add_version_conflict() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_key, versions) = av_setup(&storage, 3, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
// try to add a child of a version other than the latest
assert_eq!(
add_version(txn, client_key, client, parent_version_id, history_segment)?,
AddVersionResult::ExpectedParentVersion(existing_parent_version_id)
add_version(txn, client_key, client, versions[1], vec![3, 6, 9])?.0,
AddVersionResult::ExpectedParentVersion(versions[2])
);
// verify that the storage wasn't updated
txn = storage.txn()?;
assert_eq!(txn.get_client(client_key)?, None);
assert_eq!(
txn.get_version_by_parent(client_key, parent_version_id)?,
None
txn.get_client(client_key)?.unwrap().latest_version_id,
versions[2]
);
assert_eq!(txn.get_version_by_parent(client_key, versions[2])?, None);
Ok(())
}
fn test_av_success(latest_version_id_nil: bool) -> anyhow::Result<()> {
init_logging();
#[test]
fn add_version_with_existing_history() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_key = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
let latest_version_id = if latest_version_id_nil {
Uuid::nil()
} else {
parent_version_id
};
let (client_key, versions) = av_setup(&storage, 1, None, None)?;
txn.new_client(client_key, latest_version_id)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(
txn,
client_key,
client,
parent_version_id,
history_segment.clone(),
)?;
if let AddVersionResult::Ok(new_version_id) = result {
// check that it invented a new version ID
assert!(new_version_id != parent_version_id);
let result = add_version(txn, client_key, client, versions[0], vec![3, 6, 9])?;
// verify that the storage was updated
txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
assert_eq!(client.latest_version_id, new_version_id);
let version = txn
.get_version_by_parent(client_key, parent_version_id)?
.unwrap();
assert_eq!(version.version_id, new_version_id);
assert_eq!(version.parent_version_id, parent_version_id);
assert_eq!(version.history_segment, history_segment);
} else {
panic!("did not get Ok from add_version");
}
av_success_check(
&storage,
client_key,
&versions,
result,
vec![3, 6, 9],
// urgency=high because there are no snapshots yet
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn av_success_with_existing_history() -> anyhow::Result<()> {
test_av_success(true)
fn add_version_with_no_history() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_key, versions) = av_setup(&storage, 0, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let parent_version_id = Uuid::nil();
let result = add_version(txn, client_key, client, parent_version_id, vec![3, 6, 9])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![3, 6, 9],
// urgency=high because there are no snapshots yet
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn av_success_nil_latest_version_id() -> anyhow::Result<()> {
test_av_success(false)
fn add_version_success_recent_snapshot() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_key, versions) = av_setup(&storage, 1, Some(0), None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![1, 2, 3],
// no snapshot request since the previous version has a snapshot
SnapshotUrgency::None,
)?;
Ok(())
}
#[test]
fn add_version_success_aged_snapshot() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
// one snapshot, but it was 50 days ago
let (client_key, versions) = av_setup(&storage, 1, Some(0), Some(50))?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![1, 2, 3],
// urgency=high due to days since the snapshot
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
// one snapshot, but it was 50 versions ago
let (client_key, versions) = av_setup(&storage, 50, Some(0), None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();
let result = add_version(txn, client_key, client, versions[49], vec![1, 2, 3])?;
av_success_check(
&storage,
client_key,
&versions,
result,
vec![1, 2, 3],
// urgency=high due to number of versions since the snapshot
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
@ -580,8 +807,6 @@ mod test {
let client = txn.get_client(client_key)?.unwrap();
add_snapshot(txn, client_key, client, version_ids[0], vec![9, 9, 9])?;
println!("{:?}", version_ids);
// verify the snapshot was not replaced
let mut txn = storage.txn()?;
let client = txn.get_client(client_key)?.unwrap();