Refactor working-set support, add pending tasks

This refactors the working-set support so that taskdb knows how to
rebuild the working set (in a single transaction) but replica knows what
tasks should be in that set.

This also adds support for automatically adding tasks to the working set
when they are marked pending.  Note that tasks are not *removed* from
the working set automatically, but only on a gc operation.
This commit is contained in:
Dustin J. Mitchell 2020-11-22 18:18:53 -05:00
parent 39a0dfe798
commit 03e4fc7cee
5 changed files with 120 additions and 44 deletions

View file

@ -35,6 +35,21 @@ impl Replica {
})
}
/// Return true if this status string is such that the task should be included in
/// the working set.
fn is_working_set_status(status: Option<&String>) -> bool {
if let Some(status) = status {
status == "pending"
} else {
false
}
}
/// Add the given uuid to the working set, returning its index.
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
self.taskdb.add_to_working_set(uuid)
}
/// Get all tasks represented as a map keyed by UUID
pub fn all_tasks<'a>(&'a mut self) -> Fallible<HashMap<Uuid, Task>> {
Ok(self
@ -72,6 +87,17 @@ impl Replica {
Ok(self.taskdb.get_task(&uuid)?.map(|t| (&t).into()))
}
/// Get an existing task by its working set index
pub fn get_working_set_task(&mut self, i: u64) -> Fallible<Option<Task>> {
let working_set = self.taskdb.working_set()?;
if (i as usize) < working_set.len() {
if let Some(uuid) = working_set[i as usize] {
return Ok(self.taskdb.get_task(&uuid)?.map(|t| (&t).into()));
}
}
return Ok(None);
}
/// Create a new task. The task must not already exist.
pub fn new_task(
&mut self,
@ -115,9 +141,10 @@ impl Replica {
}
/// Perform "garbage collection" on this replica. In particular, this renumbers the working
/// set.
/// set to contain only pending tasks.
pub fn gc(&mut self) -> Fallible<()> {
self.taskdb.rebuild_working_set()?;
self.taskdb
.rebuild_working_set(|t| Replica::is_working_set_status(t.get("status")))?;
Ok(())
}
}
@ -177,9 +204,14 @@ impl<'a> TaskMut<'a> {
)
}
/// Set the task's status
/// Set the task's status. This also adds the task to the working set if the
/// new status puts it in that set.
pub fn status(&mut self, status: Status) -> Fallible<()> {
self.set_string("status", Some(String::from(status.as_ref())))
let status = String::from(status.as_ref());
if Replica::is_working_set_status(Some(&status)) {
self.replica.add_to_working_set(&self.uuid)?;
}
self.set_string("status", Some(status))
}
/// Set the task's description
@ -326,6 +358,22 @@ mod tests {
assert_eq!(t.project, Some("work".into()));
}
#[test]
fn set_pending_adds_to_working_set() {
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
rep.new_task(uuid.clone(), Status::Pending, "to-be-pending".into())
.unwrap();
let mut tm = rep.get_task_mut(&uuid).unwrap().unwrap();
tm.status(Status::Pending).unwrap();
let t = rep.get_working_set_task(1).unwrap().unwrap();
assert_eq!(t.status, Status::Pending);
assert_eq!(t.description, String::from("to-be-pending"));
}
#[test]
fn get_does_not_exist() {
let mut rep = Replica::new(DB::new_inmemory().into());

View file

@ -104,24 +104,27 @@ impl DB {
txn.get_task(uuid)
}
/// Rebuild the working set. This renumbers the pending tasks to eliminate gaps, and also
/// finds any tasks whose statuses changed without being noticed.
pub fn rebuild_working_set(&mut self) -> Fallible<()> {
// TODO: this logic belongs in Replica
// TODO: it's every status but Completed and Deleted, I think?
/// Rebuild the working set using a function to identify tasks that should be in the set. This
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
/// are not already in the working set but should be. The rebuild occurs in a single
/// trasnsaction against the storage backend.
pub fn rebuild_working_set<F>(&mut self, in_working_set: F) -> Fallible<()>
where
F: Fn(&TaskMap) -> bool,
{
let mut txn = self.storage.txn()?;
let mut new_ws = vec![];
let mut seen = HashSet::new();
let pending = String::from("pending");
// The goal here is for existing working-set items to be "compressed' down to index
// 1, so we begin by scanning the current working set and inserting any still-pending
// tasks into the new list
// The goal here is for existing working-set items to be "compressed' down to index 1, so
// we begin by scanning the current working set and inserting any tasks that should still
// be in the set into new_ws, implicitly dropping any tasks that are no longer in the
// working set.
for elt in txn.get_working_set()? {
if let Some(uuid) = elt {
if let Some(task) = txn.get_task(&uuid)? {
if task.get("status") == Some(&pending) {
if in_working_set(&task) {
new_ws.push(uuid.clone());
seen.insert(uuid);
}
@ -129,24 +132,43 @@ impl DB {
}
}
// Now go hunting for tasks that are pending and are not already in this list
// Now go hunting for tasks that should be in this list but are not, adding them at the
// end of the list.
for (uuid, task) in txn.all_tasks()? {
if !seen.contains(&uuid) {
if task.get("status") == Some(&pending) {
if in_working_set(&task) {
new_ws.push(uuid.clone());
}
}
}
// clear and re-write the entire working set, in order
txn.clear_working_set()?;
for uuid in new_ws.drain(0..new_ws.len()) {
txn.add_to_working_set(uuid)?;
txn.add_to_working_set(&uuid)?;
}
txn.commit()?;
Ok(())
}
/// Add the given uuid to the working set and return its index; if it is already in the working
/// set, its index is returned. This does *not* renumber any existing tasks.
pub fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
let mut txn = self.storage.txn()?;
// search for an existing entry for this task..
for (i, elt) in txn.get_working_set()?.iter().enumerate() {
if *elt == Some(*uuid) {
// (note that this drops the transaction with no changes made)
return Ok(i as u64);
}
}
// and if not found, add one
let i = txn.add_to_working_set(uuid)?;
txn.commit()?;
Ok(i)
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut Server) -> Fallible<()> {
let mut txn = self.storage.txn()?;
@ -448,7 +470,7 @@ mod tests {
txn.clear_working_set()?;
for i in &[1usize, 3, 4] {
txn.add_to_working_set(uuids[*i])?;
txn.add_to_working_set(&uuids[*i])?;
}
txn.commit()?;
@ -464,7 +486,13 @@ mod tests {
]
);
db.rebuild_working_set()?;
db.rebuild_working_set(|t| {
if let Some(status) = t.get("status") {
status == "pending"
} else {
false
}
})?;
// uuids[1] and uuids[4] are already in the working set, so are compressed
// to the top, and then uuids[0] is added.

View file

@ -109,9 +109,9 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(self.data_ref().working_set.clone())
}
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<u64> {
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
let working_set = &mut self.mut_data_ref().working_set;
working_set.push(Some(uuid));
working_set.push(Some(uuid.clone()));
Ok(working_set.len() as u64)
}
@ -194,8 +194,8 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
@ -216,15 +216,15 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.remove_from_working_set(1)?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
@ -244,7 +244,7 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
@ -267,16 +267,16 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(&uuid2)?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}

View file

@ -307,7 +307,7 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(res)
}
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<u64> {
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
@ -321,7 +321,7 @@ impl<'t> TaskStorageTxn for Txn<'t> {
kvtxn.set(
working_set_bucket,
next_index.into(),
Msgpack::to_value_buf(uuid)?,
Msgpack::to_value_buf(uuid.clone())?,
)?;
kvtxn.set(
numbers_bucket,
@ -666,8 +666,8 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
@ -689,15 +689,15 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.remove_from_working_set(1)?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
@ -718,7 +718,7 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
@ -742,16 +742,16 @@ mod test {
{
let mut txn = storage.txn()?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(uuid2.clone())?;
txn.add_to_working_set(uuid1.clone())?;
txn.add_to_working_set(&uuid2)?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}

View file

@ -70,7 +70,7 @@ pub trait TaskStorageTxn {
/// Add a task to the working set and return its (one-based) index. This index will be one greater
/// than the highest used index.
fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<u64>;
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64>;
/// Remove a task from the working set. Other tasks' indexes are not affected.
fn remove_from_working_set(&mut self, index: u64) -> Fallible<()>;