support avoiding snapshots

This commit is contained in:
Dustin J. Mitchell 2021-10-11 17:14:26 -04:00
parent b97f6dc4d5
commit ed3475d9ea
5 changed files with 85 additions and 31 deletions

View file

@ -6,7 +6,8 @@ pub(crate) fn execute<W: WriteColor>(
replica: &mut Replica, replica: &mut Replica,
server: &mut Box<dyn Server>, server: &mut Box<dyn Server>,
) -> Result<(), crate::Error> { ) -> Result<(), crate::Error> {
replica.sync(server)?; // TODO: configurable avoid_snapshots
replica.sync(server, false)?;
writeln!(w, "sync complete.")?; writeln!(w, "sync complete.")?;
Ok(()) Ok(())
} }

View file

@ -41,8 +41,8 @@ async fn cross_sync() -> anyhow::Result<()> {
t1.start()?; t1.start()?;
let t1 = t1.into_immut(); let t1 = t1.into_immut();
rep1.sync(&mut serv1)?; rep1.sync(&mut serv1, false)?;
rep2.sync(&mut serv2)?; rep2.sync(&mut serv2, false)?;
// those tasks should exist on rep2 now // those tasks should exist on rep2 now
let t12 = rep2 let t12 = rep2
@ -66,9 +66,9 @@ async fn cross_sync() -> anyhow::Result<()> {
t12.set_status(Status::Completed)?; t12.set_status(Status::Completed)?;
// sync those changes back and forth // sync those changes back and forth
rep1.sync(&mut serv1)?; // rep1 -> server rep1.sync(&mut serv1, false)?; // rep1 -> server
rep2.sync(&mut serv2)?; // server -> rep2, rep2 -> server rep2.sync(&mut serv2, false)?; // server -> rep2, rep2 -> server
rep1.sync(&mut serv1)?; // server -> rep1 rep1.sync(&mut serv1, false)?; // server -> rep1
let t1 = rep1 let t1 = rep1
.get_task(t1.get_uuid())? .get_task(t1.get_uuid())?

View file

@ -126,8 +126,21 @@ impl Replica {
/// Synchronize this replica against the given server. The working set is rebuilt after /// Synchronize this replica against the given server. The working set is rebuilt after
/// this occurs, but without renumbering, so any newly-pending tasks should appear in /// this occurs, but without renumbering, so any newly-pending tasks should appear in
/// the working set. /// the working set.
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> anyhow::Result<()> { ///
self.taskdb.sync(server).context("Failed to synchronize")?; /// If `avoid_snapshots` is true, the sync operations produces a snapshot only when the server
/// indicate it is urgent (snapshot urgency "high"). This allows time for other replicas to
/// create a snapshot before this one does.
///
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
/// system
pub fn sync(
&mut self,
server: &mut Box<dyn Server>,
avoid_snapshots: bool,
) -> anyhow::Result<()> {
self.taskdb
.sync(server, avoid_snapshots)
.context("Failed to synchronize")?;
self.rebuild_working_set(false) self.rebuild_working_set(false)
.context("Failed to rebuild working set after sync")?; .context("Failed to rebuild working set after sync")?;
Ok(()) Ok(())

View file

@ -96,9 +96,20 @@ impl TaskDb {
} }
/// Sync to the given server, pulling remote changes and pushing local changes. /// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> anyhow::Result<()> { ///
/// If `avoid_snapshots` is true, the sync operations produces a snapshot only when the server
/// indicate it is urgent (snapshot urgency "high"). This allows time for other replicas to
/// create a snapshot before this one does.
///
/// Set this to true on systems more constrained in CPU, memory, or bandwidth than a typical desktop
/// system
pub fn sync(
&mut self,
server: &mut Box<dyn Server>,
avoid_snapshots: bool,
) -> anyhow::Result<()> {
let mut txn = self.storage.txn()?; let mut txn = self.storage.txn()?;
sync::sync(server, txn.as_mut()) sync::sync(server, txn.as_mut(), avoid_snapshots)
} }
// functions for supporting tests // functions for supporting tests
@ -212,7 +223,7 @@ mod tests {
println!(" {:?} (ignored)", e); println!(" {:?} (ignored)", e);
} }
}, },
Action::Sync => db.sync(&mut server).unwrap(), Action::Sync => db.sync(&mut server, false).unwrap(),
} }
} }

View file

@ -11,7 +11,11 @@ struct Version {
} }
/// Sync to the given server, pulling remote changes and pushing local changes. /// Sync to the given server, pulling remote changes and pushing local changes.
pub(super) fn sync(server: &mut Box<dyn Server>, txn: &mut dyn StorageTxn) -> anyhow::Result<()> { pub(super) fn sync(
server: &mut Box<dyn Server>,
txn: &mut dyn StorageTxn,
avoid_snapshots: bool,
) -> anyhow::Result<()> {
// retry synchronizing until the server accepts our version (this allows for races between // retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server). If the server insists on the same base // replicas trying to sync to the same server). If the server insists on the same base
// version twice, then we have diverged. // version twice, then we have diverged.
@ -64,8 +68,13 @@ pub(super) fn sync(server: &mut Box<dyn Server>, txn: &mut dyn StorageTxn) -> an
txn.set_base_version(new_version_id)?; txn.set_base_version(new_version_id)?;
txn.set_operations(vec![])?; txn.set_operations(vec![])?;
// TODO: configurable urgency levels // make a snapshot if the server indicates it is urgent enough
if snapshot_urgency != SnapshotUrgency::None { let base_urgency = if avoid_snapshots {
SnapshotUrgency::High
} else {
SnapshotUrgency::Low
};
if snapshot_urgency >= base_urgency {
let snapshot = snapshot::make_snapshot(txn)?; let snapshot = snapshot::make_snapshot(txn)?;
server.add_snapshot(new_version_id, snapshot)?; server.add_snapshot(new_version_id, snapshot)?;
} }
@ -171,10 +180,10 @@ mod test {
let mut server: Box<dyn Server> = TestServer::new().server(); let mut server: Box<dyn Server> = TestServer::new().server();
let mut db1 = newdb(); let mut db1 = newdb();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
let mut db2 = newdb(); let mut db2 = newdb();
sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
// make some changes in parallel to db1 and db2.. // make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4(); let uuid1 = Uuid::new_v4();
@ -198,9 +207,9 @@ mod test {
.unwrap(); .unwrap();
// and synchronize those around // and synchronize those around
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// now make updates to the same task on both sides // now make updates to the same task on both sides
@ -220,9 +229,9 @@ mod test {
.unwrap(); .unwrap();
// and synchronize those around // and synchronize those around
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
Ok(()) Ok(())
@ -233,10 +242,10 @@ mod test {
let mut server: Box<dyn Server> = TestServer::new().server(); let mut server: Box<dyn Server> = TestServer::new().server();
let mut db1 = newdb(); let mut db1 = newdb();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
let mut db2 = newdb(); let mut db2 = newdb();
sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
// create and update a task.. // create and update a task..
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
@ -250,9 +259,9 @@ mod test {
.unwrap(); .unwrap();
// and synchronize those around // and synchronize those around
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// delete and re-create the task on db1 // delete and re-create the task on db1
@ -275,9 +284,9 @@ mod test {
}) })
.unwrap(); .unwrap();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db2.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db2.storage.txn()?.as_mut(), false).unwrap();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
Ok(()) Ok(())
@ -301,7 +310,7 @@ mod test {
.unwrap(); .unwrap();
test_server.set_snapshot_urgency(SnapshotUrgency::High); test_server.set_snapshot_urgency(SnapshotUrgency::High);
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut(), false).unwrap();
// assert that a snapshot was added // assert that a snapshot was added
let base_version = db1.storage.txn()?.base_version()?; let base_version = db1.storage.txn()?.base_version()?;
@ -315,4 +324,24 @@ mod test {
Ok(()) Ok(())
} }
#[test]
fn test_sync_avoids_snapshot() -> anyhow::Result<()> {
let test_server = TestServer::new();
let mut server: Box<dyn Server> = test_server.server();
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
test_server.set_snapshot_urgency(SnapshotUrgency::Low);
sync(&mut server, db1.storage.txn()?.as_mut(), true).unwrap();
// assert that a snapshot was not added, because we indicated
// we wanted to avoid snapshots and it was only low urgency
assert_eq!(test_server.snapshot(), None);
Ok(())
}
} }