Merge branch 'main' into sqlstore

This commit is contained in:
dbr 2021-05-28 12:17:09 +10:00
commit aa2340965e
65 changed files with 1386 additions and 3608 deletions

View file

@ -4,6 +4,7 @@ use crate::storage::{Operation, Storage, TaskMap};
use crate::task::{Status, Task};
use crate::taskdb::TaskDb;
use crate::workingset::WorkingSet;
use anyhow::Context;
use chrono::Utc;
use log::trace;
use std::collections::HashMap;
@ -123,8 +124,10 @@ impl Replica {
/// this occurs, but without renumbering, so any newly-pending tasks should appear in
/// the working set.
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> anyhow::Result<()> {
self.taskdb.sync(server)?;
self.taskdb.sync(server).context("Failed to synchronize")?;
self.rebuild_working_set(false)
.context("Failed to rebuild working set after sync")?;
Ok(())
}
/// Rebuild this replica's working set, based on whether tasks are pending or not. If

View file

@ -49,7 +49,10 @@ impl Server for RemoteServer {
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> {
let url = format!("{}/client/add-version/{}", self.origin, parent_version_id);
let url = format!(
"{}/v1/client/add-version/{}",
self.origin, parent_version_id
);
let history_cleartext = HistoryCleartext {
parent_version_id,
history_segment,
@ -82,7 +85,7 @@ impl Server for RemoteServer {
parent_version_id: VersionId,
) -> anyhow::Result<GetVersionResult> {
let url = format!(
"{}/client/get-child-version/{}",
"{}/v1/client/get-child-version/{}",
self.origin, parent_version_id
);
match self

View file

@ -117,14 +117,14 @@ impl TaskDb {
{
let mut txn = self.storage.txn()?;
let mut new_ws = vec![];
let mut new_ws = vec![None]; // index 0 is always None
let mut seen = HashSet::new();
// The goal here is for existing working-set items to be "compressed' down to index 1, so
// we begin by scanning the current working set and inserting any tasks that should still
// be in the set into new_ws, implicitly dropping any tasks that are no longer in the
// working set.
for elt in txn.get_working_set()? {
for elt in txn.get_working_set()?.drain(1..) {
if let Some(uuid) = elt {
if let Some(task) = txn.get_task(uuid)? {
if in_working_set(&task) {
@ -144,14 +144,12 @@ impl TaskDb {
// if renumbering, clear the working set and re-add
if renumber {
txn.clear_working_set()?;
for elt in new_ws.drain(0..new_ws.len()) {
if let Some(uuid) = elt {
txn.add_to_working_set(uuid)?;
}
for elt in new_ws.drain(1..new_ws.len()).flatten() {
txn.add_to_working_set(elt)?;
}
} else {
// ..otherwise, just clear the None items determined above from the working set
for (i, elt) in new_ws.iter().enumerate() {
for (i, elt) in new_ws.iter().enumerate().skip(1) {
if elt.is_none() {
txn.set_working_set_item(i, None)?;
}

View file

@ -21,6 +21,10 @@ impl WorkingSet {
/// Create a new WorkingSet. Typically this is acquired via `replica.working_set()`
pub(crate) fn new(by_index: Vec<Option<Uuid>>) -> Self {
let mut by_uuid = HashMap::new();
// working sets are 1-indexed, so element 0 should always be None
assert!(by_index.is_empty() || by_index[0].is_none());
for (index, uuid) in by_index.iter().enumerate() {
if let Some(uuid) = uuid {
by_uuid.insert(*uuid, index);
@ -58,13 +62,7 @@ impl WorkingSet {
self.by_index
.iter()
.enumerate()
.filter_map(|(index, uuid)| {
if let Some(uuid) = uuid {
Some((index, *uuid))
} else {
None
}
})
.filter_map(|(index, uuid)| uuid.as_ref().map(|uuid| (index, *uuid)))
}
}