More WIP.

Workaround for !Send SQLite causing problems as each get_txn creates a new transaction
This commit is contained in:
dbr 2021-05-08 22:09:48 +10:00
parent 7ff54ed0de
commit 027225d2a3

View file

@ -1,9 +1,9 @@
use super::{Client, Storage, StorageTxn, Uuid, Version}; use super::{Client, Storage, StorageTxn, Uuid, Version};
use anyhow::Context;
use rusqlite::types::{FromSql, ToSql}; use rusqlite::types::{FromSql, ToSql};
use rusqlite::{params, Connection, OptionalExtension}; use rusqlite::{params, Connection, OptionalExtension};
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use anyhow::Context;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
enum SqliteError { enum SqliteError {
@ -11,7 +11,6 @@ enum SqliteError {
TransactionAlreadyCommitted, TransactionAlreadyCommitted,
} }
/// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid` /// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid`
struct StoredUuid(Uuid); struct StoredUuid(Uuid);
@ -50,8 +49,6 @@ impl ToSql for Client {
} }
} }
/// DB Key for versions: concatenation of client_key and parent_version_id /// DB Key for versions: concatenation of client_key and parent_version_id
type VersionDbKey = [u8; 32]; type VersionDbKey = [u8; 32];
@ -89,7 +86,8 @@ impl SqliteStorage {
let txn = con.transaction()?; let txn = con.transaction()?;
let queries = vec![ let queries = vec![
"CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY AUTOINCREMENT, data STRING);", "CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);",
"CREATE TABLE IF NOT EXISTS versions (id STRING PRIMARY KEY, client_key STRING, parent STRING, history_segment STRING);",
]; ];
for q in queries { for q in queries {
txn.execute(q, []).context("Creating table")?; txn.execute(q, []).context("Creating table")?;
@ -104,7 +102,7 @@ impl SqliteStorage {
impl Storage for SqliteStorage { impl Storage for SqliteStorage {
fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> { fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
let con = self.new_connection()?; let con = self.new_connection()?;
let t = Txn{con, txn: None}; let t = Txn { con, txn: None };
Ok(Box::new(t)) Ok(Box::new(t))
} }
} }
@ -114,23 +112,28 @@ struct Txn<'t> {
txn: Option<rusqlite::Transaction<'t>>, txn: Option<rusqlite::Transaction<'t>>,
} }
impl <'t>Txn<'t> { impl<'t> Txn<'t> {
fn get_txn(&mut self) -> Result<rusqlite::Transaction, SqliteError> { fn get_txn(&mut self) -> Result<rusqlite::Transaction, SqliteError> {
Ok(self.con.transaction().unwrap()) Ok(self.con.transaction().unwrap())
} }
} }
impl<'t> StorageTxn for Txn<'t> {
impl <'t>StorageTxn for Txn<'t> {
fn get_client(&mut self, client_key: Uuid) -> anyhow::Result<Option<Client>> { fn get_client(&mut self, client_key: Uuid) -> anyhow::Result<Option<Client>> {
let t = self.get_txn()?; let t = self.get_txn()?;
let result: Option<Client> = t let result: Option<Client> = t
.query_row( .query_row(
"SELECT data FROM clients WHERE client_key = ? LIMIT 1", "SELECT latest_version_id FROM clients WHERE client_key = ? LIMIT 1",
[&StoredUuid(client_key)], [&StoredUuid(client_key)],
|r| r.get("data"), |r| {
let latest_version_id: StoredUuid = r.get(0)?;
Ok(Client {
latest_version_id: latest_version_id.0,
})
},
) )
.optional()?; .optional()
.context("Get client query")?;
Ok(result) Ok(result)
} }
@ -138,10 +141,9 @@ impl <'t>StorageTxn for Txn<'t> {
fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> { fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> {
let t = self.get_txn()?; let t = self.get_txn()?;
let client = Client{ latest_version_id };
t.execute( t.execute(
"INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)", "INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)",
params![&StoredUuid(latest_version_id), &client], params![&StoredUuid(client_key), &StoredUuid(latest_version_id)],
) )
.context("Create client query")?; .context("Create client query")?;
Ok(()) Ok(())
@ -173,16 +175,16 @@ impl <'t>StorageTxn for Txn<'t> {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let t = self.get_txn()?; let t = self.get_txn()?;
let version = Version {
version_id,
parent_version_id,
history_segment,
};
t.execute( t.execute(
"INSERT INTO versions (client_key, id, parent, history_segment)", "INSERT INTO versions (id, client_key, parent, history_segment) VALUES(?, ?, ?, ?)",
params![], params![
).context("Add version query")?; StoredUuid(version_id),
StoredUuid(client_key),
StoredUuid(parent_version_id),
history_segment
],
)
.context("Add version query")?;
Ok(()) Ok(())
} }
@ -191,8 +193,7 @@ impl <'t>StorageTxn for Txn<'t> {
let t: rusqlite::Transaction = self let t: rusqlite::Transaction = self
.txn .txn
.take() .take()
.unwrap(); .ok_or(SqliteError::TransactionAlreadyCommitted)?;
//.ok_or(SqliteError::TransactionAlreadyCommitted)?;
t.commit().context("Committing transaction")?; t.commit().context("Committing transaction")?;
Ok(()) Ok(())
} }