diff --git a/core/src/inmemory.rs b/core/src/inmemory.rs index 0fa90f4..6dacdd1 100644 --- a/core/src/inmemory.rs +++ b/core/src/inmemory.rs @@ -38,14 +38,16 @@ impl InMemoryStorage { } struct InnerTxn<'a> { + client_id: Uuid, guard: MutexGuard<'a, Inner>, written: bool, committed: bool, } impl Storage for InMemoryStorage { - fn txn(&self) -> anyhow::Result> { + fn txn(&self, client_id: Uuid) -> anyhow::Result> { Ok(Box::new(InnerTxn { + client_id, guard: self.0.lock().expect("poisoned lock"), written: false, committed: false, @@ -54,16 +56,16 @@ impl Storage for InMemoryStorage { } impl<'a> StorageTxn for InnerTxn<'a> { - fn get_client(&mut self, client_id: Uuid) -> anyhow::Result> { - Ok(self.guard.clients.get(&client_id).cloned()) + fn get_client(&mut self) -> anyhow::Result> { + Ok(self.guard.clients.get(&self.client_id).cloned()) } - fn new_client(&mut self, client_id: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> { - if self.guard.clients.contains_key(&client_id) { - return Err(anyhow::anyhow!("Client {} already exists", client_id)); + fn new_client(&mut self, latest_version_id: Uuid) -> anyhow::Result<()> { + if self.guard.clients.contains_key(&self.client_id) { + return Err(anyhow::anyhow!("Client {} already exists", self.client_id)); } self.guard.clients.insert( - client_id, + self.client_id, Client { latest_version_id, snapshot: None, @@ -73,64 +75,57 @@ impl<'a> StorageTxn for InnerTxn<'a> { Ok(()) } - fn set_snapshot( - &mut self, - client_id: Uuid, - snapshot: Snapshot, - data: Vec, - ) -> anyhow::Result<()> { + fn set_snapshot(&mut self, snapshot: Snapshot, data: Vec) -> anyhow::Result<()> { let client = self .guard .clients - .get_mut(&client_id) + .get_mut(&self.client_id) .ok_or_else(|| anyhow::anyhow!("no such client"))?; client.snapshot = Some(snapshot); - self.guard.snapshots.insert(client_id, data); + self.guard.snapshots.insert(self.client_id, data); self.written = true; Ok(()) } - fn get_snapshot_data( - &mut self, - client_id: Uuid, - version_id: Uuid, - ) -> anyhow::Result>> { + fn get_snapshot_data(&mut self, version_id: Uuid) -> anyhow::Result>> { // sanity check - let client = self.guard.clients.get(&client_id); + let client = self.guard.clients.get(&self.client_id); let client = client.ok_or_else(|| anyhow::anyhow!("no such client"))?; if Some(&version_id) != client.snapshot.as_ref().map(|snap| &snap.version_id) { return Err(anyhow::anyhow!("unexpected snapshot_version_id")); } - Ok(self.guard.snapshots.get(&client_id).cloned()) + Ok(self.guard.snapshots.get(&self.client_id).cloned()) } fn get_version_by_parent( &mut self, - client_id: Uuid, parent_version_id: Uuid, ) -> anyhow::Result> { - if let Some(parent_version_id) = self.guard.children.get(&(client_id, parent_version_id)) { + if let Some(parent_version_id) = self + .guard + .children + .get(&(self.client_id, parent_version_id)) + { Ok(self .guard .versions - .get(&(client_id, *parent_version_id)) + .get(&(self.client_id, *parent_version_id)) .cloned()) } else { Ok(None) } } - fn get_version( - &mut self, - client_id: Uuid, - version_id: Uuid, - ) -> anyhow::Result> { - Ok(self.guard.versions.get(&(client_id, version_id)).cloned()) + fn get_version(&mut self, version_id: Uuid) -> anyhow::Result> { + Ok(self + .guard + .versions + .get(&(self.client_id, version_id)) + .cloned()) } fn add_version( &mut self, - client_id: Uuid, version_id: Uuid, parent_version_id: Uuid, history_segment: Vec, @@ -142,19 +137,21 @@ impl<'a> StorageTxn for InnerTxn<'a> { history_segment, }; - if let Some(client) = self.guard.clients.get_mut(&client_id) { + if let Some(client) = self.guard.clients.get_mut(&self.client_id) { client.latest_version_id = version_id; if let Some(ref mut snap) = client.snapshot { snap.versions_since += 1; } } else { - return Err(anyhow::anyhow!("Client {} does not exist", client_id)); + return Err(anyhow::anyhow!("Client {} does not exist", self.client_id)); } self.guard .children - .insert((client_id, parent_version_id), version_id); - self.guard.versions.insert((client_id, version_id), version); + .insert((self.client_id, parent_version_id), version_id); + self.guard + .versions + .insert((self.client_id, version_id), version); self.written = true; Ok(()) @@ -182,8 +179,8 @@ mod test { #[test] fn test_get_client_empty() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let maybe_client = txn.get_client(Uuid::new_v4())?; + let mut txn = storage.txn(Uuid::new_v4())?; + let maybe_client = txn.get_client()?; assert!(maybe_client.is_none()); Ok(()) } @@ -191,20 +188,20 @@ mod test { #[test] fn test_client_storage() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let latest_version_id = Uuid::new_v4(); - txn.new_client(client_id, latest_version_id)?; + let mut txn = storage.txn(client_id)?; - let client = txn.get_client(client_id)?.unwrap(); + let latest_version_id = Uuid::new_v4(); + txn.new_client(latest_version_id)?; + + let client = txn.get_client()?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); assert!(client.snapshot.is_none()); let latest_version_id = Uuid::new_v4(); - txn.add_version(client_id, latest_version_id, Uuid::new_v4(), vec![1, 1])?; + txn.add_version(latest_version_id, Uuid::new_v4(), vec![1, 1])?; - let client = txn.get_client(client_id)?.unwrap(); + let client = txn.get_client()?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); assert!(client.snapshot.is_none()); @@ -213,9 +210,9 @@ mod test { timestamp: Utc::now(), versions_since: 4, }; - txn.set_snapshot(client_id, snap.clone(), vec![1, 2, 3])?; + txn.set_snapshot(snap.clone(), vec![1, 2, 3])?; - let client = txn.get_client(client_id)?.unwrap(); + let client = txn.get_client()?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); assert_eq!(client.snapshot.unwrap(), snap); @@ -226,8 +223,9 @@ mod test { #[test] fn test_gvbp_empty() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?; + let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; + let maybe_version = txn.get_version_by_parent(Uuid::new_v4())?; assert!(maybe_version.is_none()); Ok(()) } @@ -235,20 +233,15 @@ mod test { #[test] fn test_add_version_and_get_version() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; + let version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); let history_segment = b"abc".to_vec(); - txn.new_client(client_id, parent_version_id)?; - txn.add_version( - client_id, - version_id, - parent_version_id, - history_segment.clone(), - )?; + txn.new_client(parent_version_id)?; + txn.add_version(version_id, parent_version_id, history_segment.clone())?; let expected = Version { version_id, @@ -256,12 +249,10 @@ mod test { history_segment, }; - let version = txn - .get_version_by_parent(client_id, parent_version_id)? - .unwrap(); + let version = txn.get_version_by_parent(parent_version_id)?.unwrap(); assert_eq!(version, expected); - let version = txn.get_version(client_id, version_id)?.unwrap(); + let version = txn.get_version(version_id)?.unwrap(); assert_eq!(version, expected); txn.commit()?; @@ -271,41 +262,40 @@ mod test { #[test] fn test_snapshots() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; - txn.new_client(client_id, Uuid::new_v4())?; - assert!(txn.get_client(client_id)?.unwrap().snapshot.is_none()); + txn.new_client(Uuid::new_v4())?; + assert!(txn.get_client()?.unwrap().snapshot.is_none()); let snap = Snapshot { version_id: Uuid::new_v4(), timestamp: Utc::now(), versions_since: 3, }; - txn.set_snapshot(client_id, snap.clone(), vec![9, 8, 9])?; + txn.set_snapshot(snap.clone(), vec![9, 8, 9])?; assert_eq!( - txn.get_snapshot_data(client_id, snap.version_id)?.unwrap(), + txn.get_snapshot_data(snap.version_id)?.unwrap(), vec![9, 8, 9] ); - assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap)); + assert_eq!(txn.get_client()?.unwrap().snapshot, Some(snap)); let snap2 = Snapshot { version_id: Uuid::new_v4(), timestamp: Utc::now(), versions_since: 10, }; - txn.set_snapshot(client_id, snap2.clone(), vec![0, 2, 4, 6])?; + txn.set_snapshot(snap2.clone(), vec![0, 2, 4, 6])?; assert_eq!( - txn.get_snapshot_data(client_id, snap2.version_id)?.unwrap(), + txn.get_snapshot_data(snap2.version_id)?.unwrap(), vec![0, 2, 4, 6] ); - assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap2)); + assert_eq!(txn.get_client()?.unwrap().snapshot, Some(snap2)); // check that mismatched version is detected - assert!(txn.get_snapshot_data(client_id, Uuid::new_v4()).is_err()); + assert!(txn.get_snapshot_data(Uuid::new_v4()).is_err()); txn.commit()?; Ok(()) diff --git a/core/src/server.rs b/core/src/server.rs index c22fa31..09222d5 100644 --- a/core/src/server.rs +++ b/core/src/server.rs @@ -111,14 +111,12 @@ impl Server { client_id: ClientId, parent_version_id: VersionId, ) -> Result { - let mut txn = self.storage.txn()?; - let client = txn - .get_client(client_id)? - .ok_or(ServerError::NoSuchClient)?; + let mut txn = self.storage.txn(client_id)?; + let client = txn.get_client()?.ok_or(ServerError::NoSuchClient)?; // If a version with parentVersionId equal to the requested parentVersionId exists, it is // returned. - if let Some(version) = txn.get_version_by_parent(client_id, parent_version_id)? { + if let Some(version) = txn.get_version_by_parent(parent_version_id)? { return Ok(GetVersionResult::Success { version_id: version.version_id, parent_version_id: version.parent_version_id, @@ -152,10 +150,8 @@ impl Server { ) -> Result<(AddVersionResult, SnapshotUrgency), ServerError> { log::debug!("add_version(client_id: {client_id}, parent_version_id: {parent_version_id})"); - let mut txn = self.storage.txn()?; - let client = txn - .get_client(client_id)? - .ok_or(ServerError::NoSuchClient)?; + let mut txn = self.storage.txn(client_id)?; + let client = txn.get_client()?.ok_or(ServerError::NoSuchClient)?; // check if this version is acceptable, under the protection of the transaction if client.latest_version_id != NIL_VERSION_ID @@ -173,7 +169,7 @@ impl Server { log::debug!("add_version request accepted: new version_id: {version_id}"); // update the DB - txn.add_version(client_id, version_id, parent_version_id, history_segment)?; + txn.add_version(version_id, parent_version_id, history_segment)?; txn.commit()?; // calculate the urgency @@ -206,10 +202,8 @@ impl Server { ) -> Result<(), ServerError> { log::debug!("add_snapshot(client_id: {client_id}, version_id: {version_id})"); - let mut txn = self.storage.txn()?; - let client = txn - .get_client(client_id)? - .ok_or(ServerError::NoSuchClient)?; + let mut txn = self.storage.txn(client_id)?; + let client = txn.get_client()?.ok_or(ServerError::NoSuchClient)?; // NOTE: if the snapshot is rejected, this function logs about it and returns // Ok(()), as there's no reason to report an errot to the client / user. @@ -245,7 +239,7 @@ impl Server { } // get the parent version ID - if let Some(parent) = txn.get_version(client_id, vid)? { + if let Some(parent) = txn.get_version(vid)? { vid = parent.parent_version_id; } else { // this version does not exist; "this should not happen" but if it does, @@ -257,7 +251,6 @@ impl Server { log::debug!("accepting snapshot for version {version_id}"); txn.set_snapshot( - client_id, Snapshot { version_id, timestamp: Utc::now(), @@ -274,13 +267,11 @@ impl Server { &self, client_id: ClientId, ) -> Result)>, ServerError> { - let mut txn = self.storage.txn()?; - let client = txn - .get_client(client_id)? - .ok_or(ServerError::NoSuchClient)?; + let mut txn = self.storage.txn(client_id)?; + let client = txn.get_client()?.ok_or(ServerError::NoSuchClient)?; Ok(if let Some(snap) = client.snapshot { - txn.get_snapshot_data(client_id, snap.version_id)? + txn.get_snapshot_data(snap.version_id)? .map(|data| (snap.version_id, data)) } else { None @@ -288,8 +279,8 @@ impl Server { } /// Convenience method to get a transaction for the embedded storage. - pub fn txn(&self) -> Result, ServerError> { - Ok(self.storage.txn()?) + pub fn txn(&self, client_id: Uuid) -> Result, ServerError> { + Ok(self.storage.txn(client_id)?) } } @@ -303,14 +294,15 @@ mod test { fn setup(init: INIT) -> anyhow::Result<(Server, RES)> where - INIT: FnOnce(&mut dyn StorageTxn) -> anyhow::Result, + INIT: FnOnce(&mut dyn StorageTxn, Uuid) -> anyhow::Result, { let _ = env_logger::builder().is_test(true).try_init(); let storage = InMemoryStorage::new(); + let client_id = Uuid::new_v4(); let res; { - let mut txn = storage.txn()?; - res = init(txn.as_mut())?; + let mut txn = storage.txn(client_id)?; + res = init(txn.as_mut(), client_id)?; txn.commit()?; } Ok((Server::new(ServerConfig::default(), storage), res)) @@ -322,19 +314,17 @@ mod test { snapshot_version: Option, snapshot_days_ago: Option, ) -> anyhow::Result<(Server, Uuid, Vec)> { - let (server, (client_id, versions)) = setup(|txn| { - let client_id = Uuid::new_v4(); + let (server, (client_id, versions)) = setup(|txn, client_id| { let mut versions = vec![]; let mut version_id = Uuid::nil(); - txn.new_client(client_id, Uuid::nil())?; + txn.new_client(Uuid::nil())?; debug_assert!(num_versions < u8::MAX.into()); for vnum in 0..num_versions { let parent_version_id = version_id; version_id = Uuid::new_v4(); versions.push(version_id); txn.add_version( - client_id, version_id, parent_version_id, // Generate some unique data for this version. @@ -342,7 +332,6 @@ mod test { )?; if Some(vnum) == snapshot_version { txn.set_snapshot( - client_id, Snapshot { version_id, versions_since: 0, @@ -375,12 +364,12 @@ mod test { } // verify that the storage was updated - let mut txn = server.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = server.txn(client_id)?; + let client = txn.get_client()?.unwrap(); assert_eq!(client.latest_version_id, new_version_id); let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil); - let version = txn.get_version(client_id, new_version_id)?.unwrap(); + let version = txn.get_version(new_version_id)?.unwrap(); assert_eq!(version.version_id, new_version_id); assert_eq!(version.parent_version_id, parent_version_id); assert_eq!(version.history_segment, expected_history); @@ -439,9 +428,8 @@ mod test { #[test] fn get_child_version_not_found_initial_nil() -> anyhow::Result<()> { - let (server, client_id) = setup(|txn| { - let client_id = Uuid::new_v4(); - txn.new_client(client_id, NIL_VERSION_ID)?; + let (server, client_id) = setup(|txn, client_id| { + txn.new_client(NIL_VERSION_ID)?; Ok(client_id) })?; @@ -455,9 +443,8 @@ mod test { #[test] fn get_child_version_not_found_initial_continuing() -> anyhow::Result<()> { - let (server, client_id) = setup(|txn| { - let client_id = Uuid::new_v4(); - txn.new_client(client_id, NIL_VERSION_ID)?; + let (server, client_id) = setup(|txn, client_id| { + txn.new_client(NIL_VERSION_ID)?; Ok(client_id) })?; @@ -473,12 +460,11 @@ mod test { #[test] fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> { - let (server, (client_id, parent_version_id)) = setup(|txn| { + let (server, (client_id, parent_version_id)) = setup(|txn, client_id| { // add a parent version, but not the requested child version - let client_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); - txn.new_client(client_id, parent_version_id)?; - txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; + txn.new_client(parent_version_id)?; + txn.add_version(parent_version_id, NIL_VERSION_ID, vec![])?; Ok((client_id, parent_version_id)) })?; @@ -492,13 +478,11 @@ mod test { #[test] fn get_child_version_gone_not_latest() -> anyhow::Result<()> { - let (server, client_id) = setup(|txn| { - let client_id = Uuid::new_v4(); - + let (server, client_id) = setup(|txn, client_id| { // Add a parent version, but not the requested parent version let parent_version_id = Uuid::new_v4(); - txn.new_client(client_id, parent_version_id)?; - txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?; + txn.new_client(parent_version_id)?; + txn.add_version(parent_version_id, NIL_VERSION_ID, vec![])?; Ok(client_id) })?; @@ -512,22 +496,17 @@ mod test { #[test] fn get_child_version_found() -> anyhow::Result<()> { - let (server, (client_id, version_id, parent_version_id, history_segment)) = setup(|txn| { - let client_id = Uuid::new_v4(); - let version_id = Uuid::new_v4(); - let parent_version_id = Uuid::new_v4(); - let history_segment = b"abcd".to_vec(); + let (server, (client_id, version_id, parent_version_id, history_segment)) = + setup(|txn, client_id| { + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); + let history_segment = b"abcd".to_vec(); - txn.new_client(client_id, version_id)?; - txn.add_version( - client_id, - version_id, - parent_version_id, - history_segment.clone(), - )?; + txn.new_client(version_id)?; + txn.add_version(version_id, parent_version_id, history_segment.clone())?; - Ok((client_id, version_id, parent_version_id, history_segment)) - })?; + Ok((client_id, version_id, parent_version_id, history_segment)) + })?; assert_eq!( server.get_child_version(client_id, parent_version_id)?, GetVersionResult::Success { @@ -550,12 +529,9 @@ mod test { ); // verify that the storage wasn't updated - let mut txn = server.txn()?; - assert_eq!( - txn.get_client(client_id)?.unwrap().latest_version_id, - versions[2] - ); - assert_eq!(txn.get_version_by_parent(client_id, versions[2])?, None); + let mut txn = server.txn(client_id)?; + assert_eq!(txn.get_client()?.unwrap().latest_version_id, versions[2]); + assert_eq!(txn.get_version_by_parent(versions[2])?, None); Ok(()) } @@ -661,13 +637,12 @@ mod test { #[test] fn add_snapshot_success_latest() -> anyhow::Result<()> { - let (server, (client_id, version_id)) = setup(|txn| { - let client_id = Uuid::new_v4(); + let (server, (client_id, version_id)) = setup(|txn, client_id| { let version_id = Uuid::new_v4(); // set up a task DB with one version in it - txn.new_client(client_id, version_id)?; - txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?; + txn.new_client(version_id)?; + txn.add_version(version_id, NIL_VERSION_ID, vec![])?; // add a snapshot for that version Ok((client_id, version_id)) @@ -675,13 +650,13 @@ mod test { server.add_snapshot(client_id, version_id, vec![1, 2, 3])?; // verify the snapshot - let mut txn = server.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = server.txn(client_id)?; + let client = txn.get_client()?.unwrap(); let snapshot = client.snapshot.unwrap(); assert_eq!(snapshot.version_id, version_id); assert_eq!(snapshot.versions_since, 0); assert_eq!( - txn.get_snapshot_data(client_id, version_id).unwrap(), + txn.get_snapshot_data(version_id).unwrap(), Some(vec![1, 2, 3]) ); @@ -690,15 +665,14 @@ mod test { #[test] fn add_snapshot_success_older() -> anyhow::Result<()> { - let (server, (client_id, version_id_1)) = setup(|txn| { - let client_id = Uuid::new_v4(); + let (server, (client_id, version_id_1)) = setup(|txn, client_id| { let version_id_1 = Uuid::new_v4(); let version_id_2 = Uuid::new_v4(); // set up a task DB with two versions in it - txn.new_client(client_id, version_id_2)?; - txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; - txn.add_version(client_id, version_id_2, version_id_1, vec![])?; + txn.new_client(version_id_2)?; + txn.add_version(version_id_1, NIL_VERSION_ID, vec![])?; + txn.add_version(version_id_2, version_id_1, vec![])?; Ok((client_id, version_id_1)) })?; @@ -706,13 +680,13 @@ mod test { server.add_snapshot(client_id, version_id_1, vec![1, 2, 3])?; // verify the snapshot - let mut txn = server.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = server.txn(client_id)?; + let client = txn.get_client()?.unwrap(); let snapshot = client.snapshot.unwrap(); assert_eq!(snapshot.version_id, version_id_1); assert_eq!(snapshot.versions_since, 0); assert_eq!( - txn.get_snapshot_data(client_id, version_id_1).unwrap(), + txn.get_snapshot_data(version_id_1).unwrap(), Some(vec![1, 2, 3]) ); @@ -721,15 +695,14 @@ mod test { #[test] fn add_snapshot_fails_no_such() -> anyhow::Result<()> { - let (server, client_id) = setup(|txn| { - let client_id = Uuid::new_v4(); + let (server, client_id) = setup(|txn, client_id| { let version_id_1 = Uuid::new_v4(); let version_id_2 = Uuid::new_v4(); // set up a task DB with two versions in it - txn.new_client(client_id, version_id_2)?; - txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?; - txn.add_version(client_id, version_id_2, version_id_1, vec![])?; + txn.new_client(version_id_2)?; + txn.add_version(version_id_1, NIL_VERSION_ID, vec![])?; + txn.add_version(version_id_2, version_id_1, vec![])?; // add a snapshot for unknown version Ok(client_id) @@ -739,8 +712,8 @@ mod test { server.add_snapshot(client_id, version_id_unk, vec![1, 2, 3])?; // verify the snapshot does not exist - let mut txn = server.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = server.txn(client_id)?; + let client = txn.get_client()?.unwrap(); assert!(client.snapshot.is_none()); Ok(()) @@ -748,16 +721,15 @@ mod test { #[test] fn add_snapshot_fails_too_old() -> anyhow::Result<()> { - let (server, (client_id, version_ids)) = setup(|txn| { - let client_id = Uuid::new_v4(); + let (server, (client_id, version_ids)) = setup(|txn, client_id| { let mut version_id = Uuid::new_v4(); let mut parent_version_id = Uuid::nil(); let mut version_ids = vec![]; // set up a task DB with 10 versions in it (oldest to newest) - txn.new_client(client_id, Uuid::nil())?; + txn.new_client(Uuid::nil())?; for _ in 0..10 { - txn.add_version(client_id, version_id, parent_version_id, vec![])?; + txn.add_version(version_id, parent_version_id, vec![])?; version_ids.push(version_id); parent_version_id = version_id; version_id = Uuid::new_v4(); @@ -769,8 +741,8 @@ mod test { server.add_snapshot(client_id, version_ids[0], vec![1, 2, 3])?; // verify the snapshot does not exist - let mut txn = server.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = server.txn(client_id)?; + let client = txn.get_client()?.unwrap(); assert!(client.snapshot.is_none()); Ok(()) @@ -778,23 +750,21 @@ mod test { #[test] fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> { - let (server, (client_id, version_ids)) = setup(|txn| { - let client_id = Uuid::new_v4(); + let (server, (client_id, version_ids)) = setup(|txn, client_id| { let mut version_id = Uuid::new_v4(); let mut parent_version_id = Uuid::nil(); let mut version_ids = vec![]; // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the // middle one - txn.new_client(client_id, Uuid::nil())?; + txn.new_client(Uuid::nil())?; for _ in 0..5 { - txn.add_version(client_id, version_id, parent_version_id, vec![])?; + txn.add_version(version_id, parent_version_id, vec![])?; version_ids.push(version_id); parent_version_id = version_id; version_id = Uuid::new_v4(); } txn.set_snapshot( - client_id, Snapshot { version_id: version_ids[2], versions_since: 2, @@ -810,13 +780,13 @@ mod test { server.add_snapshot(client_id, version_ids[0], vec![9, 9, 9])?; // verify the snapshot was not replaced - let mut txn = server.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = server.txn(client_id)?; + let client = txn.get_client()?.unwrap(); let snapshot = client.snapshot.unwrap(); assert_eq!(snapshot.version_id, version_ids[2]); assert_eq!(snapshot.versions_since, 2); assert_eq!( - txn.get_snapshot_data(client_id, version_ids[2]).unwrap(), + txn.get_snapshot_data(version_ids[2]).unwrap(), Some(vec![1, 2, 3]) ); @@ -825,11 +795,9 @@ mod test { #[test] fn add_snapshot_fails_nil_version() -> anyhow::Result<()> { - let (server, client_id) = setup(|txn| { - let client_id = Uuid::new_v4(); - + let (server, client_id) = setup(|txn, client_id| { // just set up the client - txn.new_client(client_id, NIL_VERSION_ID)?; + txn.new_client(NIL_VERSION_ID)?; // add a snapshot for the nil version Ok(client_id) @@ -838,8 +806,8 @@ mod test { server.add_snapshot(client_id, NIL_VERSION_ID, vec![9, 9, 9])?; // verify the snapshot does not exist - let mut txn = server.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = server.txn(client_id)?; + let client = txn.get_client()?.unwrap(); assert!(client.snapshot.is_none()); Ok(()) @@ -847,14 +815,12 @@ mod test { #[test] fn get_snapshot_found() -> anyhow::Result<()> { - let (server, (client_id, data, snapshot_version_id)) = setup(|txn| { - let client_id = Uuid::new_v4(); + let (server, (client_id, data, snapshot_version_id)) = setup(|txn, client_id| { let data = vec![1, 2, 3]; let snapshot_version_id = Uuid::new_v4(); - txn.new_client(client_id, snapshot_version_id)?; + txn.new_client(snapshot_version_id)?; txn.set_snapshot( - client_id, Snapshot { version_id: snapshot_version_id, versions_since: 3, @@ -874,9 +840,8 @@ mod test { #[test] fn get_snapshot_not_found() -> anyhow::Result<()> { - let (server, client_id) = setup(|txn| { - let client_id = Uuid::new_v4(); - txn.new_client(client_id, NIL_VERSION_ID)?; + let (server, client_id) = setup(|txn, client_id| { + txn.new_client(NIL_VERSION_ID)?; Ok(client_id) })?; diff --git a/core/src/storage.rs b/core/src/storage.rs index e0c9621..011f231 100644 --- a/core/src/storage.rs +++ b/core/src/storage.rs @@ -39,48 +39,38 @@ pub struct Version { /// in storage must be as if each were executed sequentially in some order. In particular, /// un-committed changes must not be read by another transaction. /// +/// Transactions with different client IDs cannot share any data, so it is safe to handle them +/// concurrently. +/// /// Changes in a transaction that is dropped without calling `commit` must not appear in any other /// transaction. pub trait StorageTxn { - /// Get information about the given client - fn get_client(&mut self, client_id: Uuid) -> anyhow::Result>; + /// Get information about the client for this transaction + fn get_client(&mut self) -> anyhow::Result>; - /// Create a new client with the given latest_version_id - fn new_client(&mut self, client_id: Uuid, latest_version_id: Uuid) -> anyhow::Result<()>; + /// Create the client for this transaction, with the given latest_version_id. The client must + /// not already exist. + fn new_client(&mut self, latest_version_id: Uuid) -> anyhow::Result<()>; /// Set the client's most recent snapshot. - fn set_snapshot( - &mut self, - client_id: Uuid, - snapshot: Snapshot, - data: Vec, - ) -> anyhow::Result<()>; + fn set_snapshot(&mut self, snapshot: Snapshot, data: Vec) -> anyhow::Result<()>; /// Get the data for the most recent snapshot. The version_id /// is used to verify that the snapshot is for the correct version. - fn get_snapshot_data( - &mut self, - client_id: Uuid, - version_id: Uuid, - ) -> anyhow::Result>>; + fn get_snapshot_data(&mut self, version_id: Uuid) -> anyhow::Result>>; /// Get a version, indexed by parent version id - fn get_version_by_parent( - &mut self, - client_id: Uuid, - parent_version_id: Uuid, - ) -> anyhow::Result>; + fn get_version_by_parent(&mut self, parent_version_id: Uuid) + -> anyhow::Result>; /// Get a version, indexed by its own version id - fn get_version(&mut self, client_id: Uuid, version_id: Uuid) - -> anyhow::Result>; + fn get_version(&mut self, version_id: Uuid) -> anyhow::Result>; /// Add a version (that must not already exist), and /// - update latest_version_id /// - increment snapshot.versions_since fn add_version( &mut self, - client_id: Uuid, version_id: Uuid, parent_version_id: Uuid, history_segment: Vec, @@ -94,6 +84,6 @@ pub trait StorageTxn { /// A trait for objects able to act as storage. Most of the interesting behavior is in the /// [`crate::storage::StorageTxn`] trait. pub trait Storage: Send + Sync { - /// Begin a transaction - fn txn(&self) -> anyhow::Result>; + /// Begin a transaction for the given client ID. + fn txn(&self, client_id: Uuid) -> anyhow::Result>; } diff --git a/server/src/api/add_snapshot.rs b/server/src/api/add_snapshot.rs index 1769ca7..745a5a9 100644 --- a/server/src/api/add_snapshot.rs +++ b/server/src/api/add_snapshot.rs @@ -70,9 +70,9 @@ mod test { // set up the storage contents.. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, version_id).unwrap(); - txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?; + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(version_id).unwrap(); + txn.add_version(version_id, NIL_VERSION_ID, vec![])?; txn.commit()?; } @@ -114,8 +114,8 @@ mod test { // set up the storage contents.. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, NIL_VERSION_ID).unwrap(); + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(NIL_VERSION_ID).unwrap(); txn.commit().unwrap(); } diff --git a/server/src/api/add_version.rs b/server/src/api/add_version.rs index d6ac245..32cfad3 100644 --- a/server/src/api/add_version.rs +++ b/server/src/api/add_version.rs @@ -82,9 +82,11 @@ pub(crate) async fn service( } Err(ServerError::NoSuchClient) => { // Create a new client and repeat the `add_version` call. - let mut txn = server_state.server.txn().map_err(server_error_to_actix)?; - txn.new_client(client_id, NIL_VERSION_ID) - .map_err(failure_to_ise)?; + let mut txn = server_state + .server + .txn(client_id) + .map_err(server_error_to_actix)?; + txn.new_client(NIL_VERSION_ID).map_err(failure_to_ise)?; txn.commit().map_err(failure_to_ise)?; continue; } @@ -111,8 +113,8 @@ mod test { // set up the storage contents.. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, Uuid::nil()).unwrap(); + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(Uuid::nil()).unwrap(); txn.commit().unwrap(); } @@ -181,8 +183,8 @@ mod test { // Check that the client really was created { - let mut txn = server.server_state.server.txn().unwrap(); - let client = txn.get_client(client_id).unwrap().unwrap(); + let mut txn = server.server_state.server.txn(client_id).unwrap(); + let client = txn.get_client().unwrap().unwrap(); assert_eq!(client.latest_version_id, new_version_id); assert_eq!(client.snapshot, None); } @@ -197,8 +199,8 @@ mod test { // set up the storage contents.. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, version_id).unwrap(); + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(version_id).unwrap(); txn.commit().unwrap(); } diff --git a/server/src/api/get_child_version.rs b/server/src/api/get_child_version.rs index 4cf53ef..fef19c1 100644 --- a/server/src/api/get_child_version.rs +++ b/server/src/api/get_child_version.rs @@ -64,9 +64,9 @@ mod test { // set up the storage contents.. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, Uuid::new_v4()).unwrap(); - txn.add_version(client_id, version_id, parent_version_id, b"abcd".to_vec()) + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(Uuid::new_v4()).unwrap(); + txn.add_version(version_id, parent_version_id, b"abcd".to_vec()) .unwrap(); txn.commit().unwrap(); } @@ -128,9 +128,9 @@ mod test { // create the client and a single version. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, Uuid::new_v4()).unwrap(); - txn.add_version(client_id, test_version_id, NIL_VERSION_ID, b"vers".to_vec()) + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(Uuid::new_v4()).unwrap(); + txn.add_version(test_version_id, NIL_VERSION_ID, b"vers".to_vec()) .unwrap(); txn.commit().unwrap(); } diff --git a/server/src/api/get_snapshot.rs b/server/src/api/get_snapshot.rs index 6eff71a..0a1f030 100644 --- a/server/src/api/get_snapshot.rs +++ b/server/src/api/get_snapshot.rs @@ -48,8 +48,8 @@ mod test { // set up the storage contents.. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, Uuid::new_v4()).unwrap(); + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(Uuid::new_v4()).unwrap(); txn.commit().unwrap(); } @@ -75,10 +75,9 @@ mod test { // set up the storage contents.. { - let mut txn = storage.txn().unwrap(); - txn.new_client(client_id, Uuid::new_v4()).unwrap(); + let mut txn = storage.txn(client_id).unwrap(); + txn.new_client(Uuid::new_v4()).unwrap(); txn.set_snapshot( - client_id, Snapshot { version_id, versions_since: 3, diff --git a/sqlite/src/lib.rs b/sqlite/src/lib.rs index e64c0c5..7441b17 100644 --- a/sqlite/src/lib.rs +++ b/sqlite/src/lib.rs @@ -78,12 +78,12 @@ impl SqliteStorage { } impl Storage for SqliteStorage { - fn txn<'a>(&'a self) -> anyhow::Result> { + fn txn(&self, client_id: Uuid) -> anyhow::Result> { let con = self.new_connection()?; // Begin the transaction on this new connection. An IMMEDIATE connection is in // write (exclusive) mode from the start. con.execute("BEGIN IMMEDIATE", [])?; - let txn = Txn { con }; + let txn = Txn { con, client_id }; Ok(Box::new(txn)) } } @@ -93,6 +93,7 @@ struct Txn { // transactions by running `BEGIN ...` and `COMMIT` at appropriate times. So we will do // the same. con: Connection, + client_id: Uuid, } impl Txn { @@ -126,7 +127,7 @@ impl Txn { } impl StorageTxn for Txn { - fn get_client(&mut self, client_id: Uuid) -> anyhow::Result> { + fn get_client(&mut self) -> anyhow::Result> { let result: Option = self .con .query_row( @@ -138,7 +139,7 @@ impl StorageTxn for Txn { FROM clients WHERE client_id = ? LIMIT 1", - [&StoredUuid(client_id)], + [&StoredUuid(self.client_id)], |r| { let latest_version_id: StoredUuid = r.get(0)?; let snapshot_timestamp: Option = r.get(1)?; @@ -170,22 +171,17 @@ impl StorageTxn for Txn { Ok(result) } - fn new_client(&mut self, client_id: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> { + fn new_client(&mut self, latest_version_id: Uuid) -> anyhow::Result<()> { self.con .execute( "INSERT OR REPLACE INTO clients (client_id, latest_version_id) VALUES (?, ?)", - params![&StoredUuid(client_id), &StoredUuid(latest_version_id)], + params![&StoredUuid(self.client_id), &StoredUuid(latest_version_id)], ) .context("Error creating/updating client")?; Ok(()) } - fn set_snapshot( - &mut self, - client_id: Uuid, - snapshot: Snapshot, - data: Vec, - ) -> anyhow::Result<()> { + fn set_snapshot(&mut self, snapshot: Snapshot, data: Vec) -> anyhow::Result<()> { self.con .execute( "UPDATE clients @@ -200,23 +196,19 @@ impl StorageTxn for Txn { snapshot.timestamp.timestamp(), snapshot.versions_since, data, - &StoredUuid(client_id), + &StoredUuid(self.client_id), ], ) .context("Error creating/updating snapshot")?; Ok(()) } - fn get_snapshot_data( - &mut self, - client_id: Uuid, - version_id: Uuid, - ) -> anyhow::Result>> { + fn get_snapshot_data(&mut self, version_id: Uuid) -> anyhow::Result>> { let r = self .con .query_row( "SELECT snapshot, snapshot_version_id FROM clients WHERE client_id = ?", - params![&StoredUuid(client_id)], + params![&StoredUuid(self.client_id)], |r| { let v: StoredUuid = r.get("snapshot_version_id")?; let d: Vec = r.get("snapshot")?; @@ -237,29 +229,25 @@ impl StorageTxn for Txn { fn get_version_by_parent( &mut self, - client_id: Uuid, + parent_version_id: Uuid, ) -> anyhow::Result> { self.get_version_impl( "SELECT version_id, parent_version_id, history_segment FROM versions WHERE parent_version_id = ? AND client_id = ?", - client_id, + self.client_id, parent_version_id) } - fn get_version( - &mut self, - client_id: Uuid, - version_id: Uuid, - ) -> anyhow::Result> { + fn get_version(&mut self, version_id: Uuid) -> anyhow::Result> { self.get_version_impl( "SELECT version_id, parent_version_id, history_segment FROM versions WHERE version_id = ? AND client_id = ?", - client_id, + self.client_id, version_id) } fn add_version( &mut self, - client_id: Uuid, + version_id: Uuid, parent_version_id: Uuid, history_segment: Vec, @@ -268,7 +256,7 @@ impl StorageTxn for Txn { "INSERT INTO versions (version_id, client_id, parent_version_id, history_segment) VALUES(?, ?, ?, ?)", params![ StoredUuid(version_id), - StoredUuid(client_id), + StoredUuid(self.client_id), StoredUuid(parent_version_id), history_segment ] @@ -281,7 +269,7 @@ impl StorageTxn for Txn { latest_version_id = ?, versions_since_snapshot = versions_since_snapshot + 1 WHERE client_id = ?", - params![StoredUuid(version_id), StoredUuid(client_id),], + params![StoredUuid(version_id), StoredUuid(self.client_id),], ) .context("Error updating client for new version")?; @@ -306,8 +294,9 @@ mod test { let tmp_dir = TempDir::new()?; let non_existant = tmp_dir.path().join("subdir"); let storage = SqliteStorage::new(non_existant)?; - let mut txn = storage.txn()?; - let maybe_client = txn.get_client(Uuid::new_v4())?; + let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; + let maybe_client = txn.get_client()?; assert!(maybe_client.is_none()); Ok(()) } @@ -316,8 +305,9 @@ mod test { fn test_get_client_empty() -> anyhow::Result<()> { let tmp_dir = TempDir::new()?; let storage = SqliteStorage::new(tmp_dir.path())?; - let mut txn = storage.txn()?; - let maybe_client = txn.get_client(Uuid::new_v4())?; + let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; + let maybe_client = txn.get_client()?; assert!(maybe_client.is_none()); Ok(()) } @@ -326,20 +316,20 @@ mod test { fn test_client_storage() -> anyhow::Result<()> { let tmp_dir = TempDir::new()?; let storage = SqliteStorage::new(tmp_dir.path())?; - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); - let latest_version_id = Uuid::new_v4(); - txn.new_client(client_id, latest_version_id)?; + let mut txn = storage.txn(client_id)?; - let client = txn.get_client(client_id)?.unwrap(); + let latest_version_id = Uuid::new_v4(); + txn.new_client(latest_version_id)?; + + let client = txn.get_client()?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); assert!(client.snapshot.is_none()); let latest_version_id = Uuid::new_v4(); - txn.add_version(client_id, latest_version_id, Uuid::new_v4(), vec![1, 1])?; + txn.add_version(latest_version_id, Uuid::new_v4(), vec![1, 1])?; - let client = txn.get_client(client_id)?.unwrap(); + let client = txn.get_client()?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); assert!(client.snapshot.is_none()); @@ -348,9 +338,9 @@ mod test { timestamp: "2014-11-28T12:00:09Z".parse::>().unwrap(), versions_since: 4, }; - txn.set_snapshot(client_id, snap.clone(), vec![1, 2, 3])?; + txn.set_snapshot(snap.clone(), vec![1, 2, 3])?; - let client = txn.get_client(client_id)?.unwrap(); + let client = txn.get_client()?.unwrap(); assert_eq!(client.latest_version_id, latest_version_id); assert_eq!(client.snapshot.unwrap(), snap); @@ -361,8 +351,9 @@ mod test { fn test_gvbp_empty() -> anyhow::Result<()> { let tmp_dir = TempDir::new()?; let storage = SqliteStorage::new(tmp_dir.path())?; - let mut txn = storage.txn()?; - let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?; + let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; + let maybe_version = txn.get_version_by_parent(Uuid::new_v4())?; assert!(maybe_version.is_none()); Ok(()) } @@ -371,18 +362,13 @@ mod test { fn test_add_version_and_get_version() -> anyhow::Result<()> { let tmp_dir = TempDir::new()?; let storage = SqliteStorage::new(tmp_dir.path())?; - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; + let version_id = Uuid::new_v4(); let parent_version_id = Uuid::new_v4(); let history_segment = b"abc".to_vec(); - txn.add_version( - client_id, - version_id, - parent_version_id, - history_segment.clone(), - )?; + txn.add_version(version_id, parent_version_id, history_segment.clone())?; let expected = Version { version_id, @@ -390,12 +376,10 @@ mod test { history_segment, }; - let version = txn - .get_version_by_parent(client_id, parent_version_id)? - .unwrap(); + let version = txn.get_version_by_parent(parent_version_id)?.unwrap(); assert_eq!(version, expected); - let version = txn.get_version(client_id, version_id)?.unwrap(); + let version = txn.get_version(version_id)?.unwrap(); assert_eq!(version, expected); Ok(()) @@ -405,41 +389,40 @@ mod test { fn test_snapshots() -> anyhow::Result<()> { let tmp_dir = TempDir::new()?; let storage = SqliteStorage::new(tmp_dir.path())?; - let mut txn = storage.txn()?; - let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id)?; - txn.new_client(client_id, Uuid::new_v4())?; - assert!(txn.get_client(client_id)?.unwrap().snapshot.is_none()); + txn.new_client(Uuid::new_v4())?; + assert!(txn.get_client()?.unwrap().snapshot.is_none()); let snap = Snapshot { version_id: Uuid::new_v4(), timestamp: "2013-10-08T12:00:09Z".parse::>().unwrap(), versions_since: 3, }; - txn.set_snapshot(client_id, snap.clone(), vec![9, 8, 9])?; + txn.set_snapshot(snap.clone(), vec![9, 8, 9])?; assert_eq!( - txn.get_snapshot_data(client_id, snap.version_id)?.unwrap(), + txn.get_snapshot_data(snap.version_id)?.unwrap(), vec![9, 8, 9] ); - assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap)); + assert_eq!(txn.get_client()?.unwrap().snapshot, Some(snap)); let snap2 = Snapshot { version_id: Uuid::new_v4(), timestamp: "2014-11-28T12:00:09Z".parse::>().unwrap(), versions_since: 10, }; - txn.set_snapshot(client_id, snap2.clone(), vec![0, 2, 4, 6])?; + txn.set_snapshot(snap2.clone(), vec![0, 2, 4, 6])?; assert_eq!( - txn.get_snapshot_data(client_id, snap2.version_id)?.unwrap(), + txn.get_snapshot_data(snap2.version_id)?.unwrap(), vec![0, 2, 4, 6] ); - assert_eq!(txn.get_client(client_id)?.unwrap().snapshot, Some(snap2)); + assert_eq!(txn.get_client()?.unwrap().snapshot, Some(snap2)); // check that mismatched version is detected - assert!(txn.get_snapshot_data(client_id, Uuid::new_v4()).is_err()); + assert!(txn.get_snapshot_data(Uuid::new_v4()).is_err()); Ok(()) } diff --git a/sqlite/tests/concurrency.rs b/sqlite/tests/concurrency.rs index 3250426..5a3a37a 100644 --- a/sqlite/tests/concurrency.rs +++ b/sqlite/tests/concurrency.rs @@ -12,8 +12,8 @@ fn add_version_concurrency() -> anyhow::Result<()> { { let con = SqliteStorage::new(tmp_dir.path())?; - let mut txn = con.txn()?; - txn.new_client(client_id, NIL_VERSION_ID)?; + let mut txn = con.txn(client_id)?; + txn.new_client(NIL_VERSION_ID)?; txn.commit()?; } @@ -25,12 +25,12 @@ fn add_version_concurrency() -> anyhow::Result<()> { let con = SqliteStorage::new(tmp_dir.path())?; for _ in 0..N { - let mut txn = con.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = con.txn(client_id)?; + let client = txn.get_client()?.unwrap(); let version_id = Uuid::new_v4(); let parent_version_id = client.latest_version_id; std::thread::yield_now(); // Make failure more likely. - txn.add_version(client_id, version_id, parent_version_id, b"data".to_vec())?; + txn.add_version(version_id, parent_version_id, b"data".to_vec())?; txn.commit()?; } @@ -49,15 +49,13 @@ fn add_version_concurrency() -> anyhow::Result<()> { // same `parent_version_id`. { let con = SqliteStorage::new(tmp_dir.path())?; - let mut txn = con.txn()?; - let client = txn.get_client(client_id)?.unwrap(); + let mut txn = con.txn(client_id)?; + let client = txn.get_client()?.unwrap(); let mut n = 0; let mut version_id = client.latest_version_id; while version_id != NIL_VERSION_ID { - let version = txn - .get_version(client_id, version_id)? - .expect("version should exist"); + let version = txn.get_version(version_id)?.expect("version should exist"); n += 1; version_id = version.parent_version_id; }