address review comments

This commit is contained in:
Dustin J. Mitchell 2024-11-19 18:16:13 -05:00
parent dfd74429bb
commit e93d6738f4
No known key found for this signature in database
2 changed files with 18 additions and 38 deletions

View file

@ -150,17 +150,13 @@ impl Server {
parent_version_id: VersionId, parent_version_id: VersionId,
history_segment: HistorySegment, history_segment: HistorySegment,
) -> Result<(AddVersionResult, SnapshotUrgency), ServerError> { ) -> Result<(AddVersionResult, SnapshotUrgency), ServerError> {
log::debug!("add_version(client_id: {client_id}, parent_version_id: {parent_version_id})");
let mut txn = self.storage.txn()?; let mut txn = self.storage.txn()?;
let client = txn let client = txn
.get_client(client_id)? .get_client(client_id)?
.ok_or(ServerError::NoSuchClient)?; .ok_or(ServerError::NoSuchClient)?;
log::debug!(
"add_version(client_id: {}, parent_version_id: {})",
client_id,
parent_version_id,
);
// check if this version is acceptable, under the protection of the transaction // check if this version is acceptable, under the protection of the transaction
if client.latest_version_id != NIL_VERSION_ID if client.latest_version_id != NIL_VERSION_ID
&& parent_version_id != client.latest_version_id && parent_version_id != client.latest_version_id
@ -174,10 +170,7 @@ impl Server {
// invent a version ID // invent a version ID
let version_id = Uuid::new_v4(); let version_id = Uuid::new_v4();
log::debug!( log::debug!("add_version request accepted: new version_id: {version_id}");
"add_version request accepted: new version_id: {}",
version_id
);
// update the DB // update the DB
txn.add_version(client_id, version_id, parent_version_id, history_segment)?; txn.add_version(client_id, version_id, parent_version_id, history_segment)?;
@ -211,26 +204,19 @@ impl Server {
version_id: VersionId, version_id: VersionId,
data: Vec<u8>, data: Vec<u8>,
) -> Result<(), ServerError> { ) -> Result<(), ServerError> {
log::debug!("add_snapshot(client_id: {client_id}, version_id: {version_id})");
let mut txn = self.storage.txn()?; let mut txn = self.storage.txn()?;
let client = txn let client = txn
.get_client(client_id)? .get_client(client_id)?
.ok_or(ServerError::NoSuchClient)?; .ok_or(ServerError::NoSuchClient)?;
log::debug!(
"add_snapshot(client_id: {}, version_id: {})",
client_id,
version_id,
);
// NOTE: if the snapshot is rejected, this function logs about it and returns // NOTE: if the snapshot is rejected, this function logs about it and returns
// Ok(()), as there's no reason to report an errot to the client / user. // Ok(()), as there's no reason to report an errot to the client / user.
let last_snapshot = client.snapshot.map(|snap| snap.version_id); let last_snapshot = client.snapshot.map(|snap| snap.version_id);
if Some(version_id) == last_snapshot { if Some(version_id) == last_snapshot {
log::debug!( log::debug!("rejecting snapshot for version {version_id}: already exists");
"rejecting snapshot for version {}: already exists",
version_id
);
return Ok(()); return Ok(());
} }
@ -247,20 +233,14 @@ impl Server {
if Some(vid) == last_snapshot { if Some(vid) == last_snapshot {
// the new snapshot is older than the last snapshot, so ignore it // the new snapshot is older than the last snapshot, so ignore it
log::debug!( log::debug!("rejecting snapshot for version {version_id}: newer snapshot already exists or no such version");
"rejecting snapshot for version {}: newer snapshot already exists or no such version",
version_id
);
return Ok(()); return Ok(());
} }
search_len -= 1; search_len -= 1;
if search_len <= 0 || vid == NIL_VERSION_ID { if search_len <= 0 || vid == NIL_VERSION_ID {
// this should not happen in normal operation, so warn about it // this should not happen in normal operation, so warn about it
log::warn!( log::warn!("rejecting snapshot for version {version_id}: version is too old or no such version");
"rejecting snapshot for version {}: version is too old or no such version",
version_id
);
return Ok(()); return Ok(());
} }
@ -270,15 +250,12 @@ impl Server {
} else { } else {
// this version does not exist; "this should not happen" but if it does, // this version does not exist; "this should not happen" but if it does,
// we don't need a snapshot earlier than the missing version. // we don't need a snapshot earlier than the missing version.
log::warn!( log::warn!("rejecting snapshot for version {version_id}: newer versions have already been deleted");
"rejecting snapshot for version {}: newer versions have already been deleted",
version_id
);
return Ok(()); return Ok(());
} }
} }
log::debug!("accepting snapshot for version {}", version_id); log::debug!("accepting snapshot for version {version_id}");
txn.set_snapshot( txn.set_snapshot(
client_id, client_id,
Snapshot { Snapshot {
@ -346,6 +323,7 @@ mod test {
let mut version_id = Uuid::nil(); let mut version_id = Uuid::nil();
txn.new_client(client_id, Uuid::nil())?; txn.new_client(client_id, Uuid::nil())?;
debug_assert!(num_versions < u8::MAX.into());
for vnum in 0..num_versions { for vnum in 0..num_versions {
let parent_version_id = version_id; let parent_version_id = version_id;
version_id = Uuid::new_v4(); version_id = Uuid::new_v4();
@ -354,6 +332,7 @@ mod test {
client_id, client_id,
version_id, version_id,
parent_version_id, parent_version_id,
// Generate some unique data for this version.
vec![0, 0, vnum as u8], vec![0, 0, vnum as u8],
)?; )?;
if Some(vnum) == snapshot_version { if Some(vnum) == snapshot_version {
@ -364,6 +343,7 @@ mod test {
versions_since: 0, versions_since: 0,
timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)), timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)),
}, },
// Generate some unique data for this snapshot.
vec![vnum as u8], vec![vnum as u8],
)?; )?;
} }
@ -379,11 +359,11 @@ mod test {
server: &Server, server: &Server,
client_id: Uuid, client_id: Uuid,
existing_versions: &[Uuid], existing_versions: &[Uuid],
result: (AddVersionResult, SnapshotUrgency), (add_version_result, snapshot_urgency): (AddVersionResult, SnapshotUrgency),
expected_history: Vec<u8>, expected_history: Vec<u8>,
expected_urgency: SnapshotUrgency, expected_urgency: SnapshotUrgency,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if let AddVersionResult::Ok(new_version_id) = result.0 { if let AddVersionResult::Ok(new_version_id) = add_version_result {
// check that it invented a new version ID // check that it invented a new version ID
for v in existing_versions { for v in existing_versions {
assert_ne!(&new_version_id, v); assert_ne!(&new_version_id, v);
@ -400,10 +380,10 @@ mod test {
assert_eq!(version.parent_version_id, parent_version_id); assert_eq!(version.parent_version_id, parent_version_id);
assert_eq!(version.history_segment, expected_history); assert_eq!(version.history_segment, expected_history);
} else { } else {
panic!("did not get Ok from add_version: {:?}", result); panic!("did not get Ok from add_version: {:?}", add_version_result);
} }
assert_eq!(result.1, expected_urgency); assert_eq!(snapshot_urgency, expected_urgency);
Ok(()) Ok(())
} }

View file

@ -82,7 +82,7 @@ pub(crate) async fn service(
Ok(rb.finish()) Ok(rb.finish())
} }
Err(ServerError::NoSuchClient) => { Err(ServerError::NoSuchClient) => {
// Create a new client and repeate the `add_version` call. // Create a new client and repeat the `add_version` call.
let mut txn = server_state.server.txn().map_err(server_error_to_actix)?; let mut txn = server_state.server.txn().map_err(server_error_to_actix)?;
txn.new_client(client_id, NIL_VERSION_ID) txn.new_client(client_id, NIL_VERSION_ID)
.map_err(failure_to_ise)?; .map_err(failure_to_ise)?;