diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 22ee9c3..5d53a10 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -106,7 +106,8 @@ jobs: - uses: obi1kenobi/cargo-semver-checks-action@v2 with: # exclude the binary package from semver checks, since it is not published as a crate. - exclude: taskchampion-sync-server + # exclude postgres temporarily until it is released + exclude: taskchampion-sync-server,taskchampion-sync-server-storage-postgres mdbook: runs-on: ubuntu-latest diff --git a/.github/workflows/rust-tests.yml b/.github/workflows/rust-tests.yml index 185d4db..7d0d4d1 100644 --- a/.github/workflows/rust-tests.yml +++ b/.github/workflows/rust-tests.yml @@ -11,13 +11,32 @@ jobs: test: strategy: matrix: + postgres: + - "17" rust: # MSRV - - "1.82.0" + - "1.85.0" - "stable" runs-on: ubuntu-latest - name: "rust ${{ matrix.rust }}" + name: "rust ${{ matrix.rust }} / postgres ${{ matrix.postgres }}" + + services: + # Service container for PostgreSQL + postgres: + image: "postgres:${{ matrix.postgres }}" + env: + POSTGRES_DB: test_db + POSTGRES_USER: test_user + POSTGRES_PASSWORD: test_password + ports: + - 5432:5432 + # Set health checks to ensure Postgres is ready before the job starts + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: actions/checkout@v4 @@ -40,4 +59,6 @@ jobs: override: true - name: test + env: + TEST_DB_URL: postgresql://test_user:test_password@localhost:5432/test_db run: cargo test diff --git a/Cargo.lock b/Cargo.lock index f96e478..9446a2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,6 +346,28 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bb8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d8b8e1a22743d9241575c6ba822cf9c8fef34771c86ab7e477a4fbfd254e5" +dependencies = [ + "futures-util", + "parking_lot", + "tokio", +] + +[[package]] +name = "bb8-postgres" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e570e6557cd0f88d28d32afa76644873271a70dc22656df565b2021c4036aa9c" +dependencies = [ + "bb8", + "tokio", + "tokio-postgres", +] + [[package]] name = "bitflags" version = "2.9.1" @@ -388,6 +410,12 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -479,6 +507,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -557,6 +595,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -618,6 +657,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -658,6 +703,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -827,6 +887,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "0.2.12" @@ -1132,6 +1201,16 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.5" @@ -1165,6 +1244,23 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1201,6 +1297,60 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-src" +version = "300.5.1+3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "735230c832b28c000e3bc117119e6466a663ec73506bc0a9907ea4187508e42a" +dependencies = [ + "cc", +] + +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking_lot" version = "0.12.4" @@ -1230,6 +1380,24 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1263,6 +1431,48 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "postgres-native-tls" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f39498473c92f7b6820ae970382c1d83178a3454c618161cb772e8598d9f6f" +dependencies = [ + "native-tls", + "tokio", + "tokio-native-tls", + "tokio-postgres", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ff0abab4a9b844b93ef7b81f1efc0a366062aaef2cd702c76256b5dc075c54" +dependencies = [ + "base64", + "byteorder", + "bytes", + "fallible-iterator 0.2.0", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" +dependencies = [ + "bytes", + "fallible-iterator 0.2.0", + "postgres-protocol", + "uuid", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -1401,7 +1611,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ "bitflags", - "fallible-iterator", + "fallible-iterator 0.3.0", "fallible-streaming-iterator", "hashlink", "libsqlite3-sys", @@ -1439,12 +1649,44 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.219" @@ -1500,6 +1742,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1515,6 +1768,12 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.10" @@ -1543,12 +1802,29 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "2.0.104" @@ -1609,6 +1885,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "taskchampion-sync-server-storage-postgres" +version = "0.7.0-pre" +dependencies = [ + "anyhow", + "async-trait", + "bb8", + "bb8-postgres", + "chrono", + "env_logger", + "log", + "native-tls", + "postgres-native-tls", + "pretty_assertions", + "taskchampion-sync-server-core", + "tempfile", + "thiserror", + "tokio", + "tokio-postgres", + "uuid", +] + [[package]] name = "taskchampion-sync-server-storage-sqlite" version = "0.7.0-pre" @@ -1708,6 +2006,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.46.1" @@ -1739,6 +2052,42 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-postgres" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c95d533c83082bb6490e0189acaa0bbeef9084e60471b696ca6988cd0541fb0" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator 0.2.0", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand", + "socket2", + "tokio", + "tokio-util", + "whoami", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -1790,12 +2139,33 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -1864,6 +2234,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -1922,6 +2298,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "whoami" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" +dependencies = [ + "redox_syscall", + "wasite", + "web-sys", +] + [[package]] name = "windows-core" version = "0.61.2" diff --git a/Cargo.toml b/Cargo.toml index 4bfe6d6..ae61945 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,9 @@ members = [ "core", "server", "sqlite", + "postgres", ] -rust-version = "1.82.0" # MSRV +rust-version = "1.85.0" # MSRV [workspace.dependencies] async-trait = "0.1.88" @@ -26,3 +27,6 @@ tempfile = "3" pretty_assertions = "1" temp-env = "0.3" tokio = { version = "*", features = ["rt", "macros"] } +tokio-postgres = { version = "0.7.13", features = ["with-uuid-1"] } +bb8 = "0.9.0" +bb8-postgres = { version = "0.9.0", features = ["with-uuid-1"] } diff --git a/README.md b/README.md index e13a875..67b0054 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,8 @@ for more on how to use this project. The repository is comprised of three crates: - `taskchampion-sync-server-core` implements the core of the protocol - - `taskchmpaion-sync-server-sqlite` implements an SQLite backend for the core + - `taskchampion-sync-server-storage-sqlite` implements an SQLite backend for the core + - `taskchampion-sync-server-storage-postgres` implements a Postgres backend for the core - `taskchampion-sync-server` implements a simple HTTP server for the protocol ### Building From Source diff --git a/RELEASING.md b/RELEASING.md index 0e21231..05c4c29 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -22,7 +22,11 @@ --- -For the next release, include the folowing in the release notes: +For the next release, + +- remove postgres from the exclusion list in `.github/workflows/checks.yml` after the release + +- include the folowing in the release notes: Running the Docker image for this server without specifying DATA_DIR defaulted to storing the server data in diff --git a/postgres/Cargo.toml b/postgres/Cargo.toml new file mode 100644 index 0000000..ba7b177 --- /dev/null +++ b/postgres/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "taskchampion-sync-server-storage-postgres" +version = "0.7.0-pre" +authors = ["Dustin J. Mitchell "] +edition = "2021" +description = "Postgres backend for TaskChampion-sync-server" +homepage = "https://github.com/GothenburgBitFactory/taskchampion" +repository = "https://github.com/GothenburgBitFactory/taskchampion-sync-server" +license = "MIT" + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +bb8-postgres.workspace = true +bb8.workspace = true +chrono.workspace = true +env_logger.workspace = true +log.workspace = true +taskchampion-sync-server-core = { path = "../core", version = "0.7.0-pre" } +thiserror.workspace = true +tokio-postgres.workspace = true +tokio.workspace = true +uuid.workspace = true +native-tls = { version = "0.2.14", features = ["vendored"] } +postgres-native-tls = "0.5.1" + +[dev-dependencies] +tempfile.workspace = true +pretty_assertions.workspace = true diff --git a/postgres/README.md b/postgres/README.md new file mode 100644 index 0000000..0a9fdeb --- /dev/null +++ b/postgres/README.md @@ -0,0 +1,4 @@ +# taskchampion-sync-server-storage-postgres + +This crate implements a Postgres storage backend for the +`taskchampion-sync-server-core`. diff --git a/postgres/schema.sql b/postgres/schema.sql new file mode 100644 index 0000000..4544563 --- /dev/null +++ b/postgres/schema.sql @@ -0,0 +1,17 @@ +CREATE TABLE clients ( + client_id UUID PRIMARY KEY, + latest_version_id UUID default '00000000-0000-0000-0000-000000000000', + snapshot_version_id UUID, + versions_since_snapshot INTEGER, + snapshot_timestamp BIGINT, + snapshot BYTEA); + +CREATE TABLE versions ( + client_id UUID NOT NULL, + FOREIGN KEY(client_id) REFERENCES clients (client_id) ON DELETE CASCADE, + version_id UUID NOT NULL, + parent_version_id UUID, + history_segment BYTEA, + CONSTRAINT versions_pkey PRIMARY KEY (client_id, version_id) +); +CREATE INDEX versions_by_parent ON versions (parent_version_id); diff --git a/postgres/src/lib.rs b/postgres/src/lib.rs new file mode 100644 index 0000000..0e76ac4 --- /dev/null +++ b/postgres/src/lib.rs @@ -0,0 +1,669 @@ +//! This crate implements a Postgres storage backend for the TaskChampion sync server. +//! +//! Use the [`PostgresStorage`] type as an implementation of the [`Storage`] trait. +//! +//! This implementation is tested with Postgres version 17 but should work with any recent version. +//! +//! ## Schema Setup +//! +//! The database identified by the connection string must already exist and be set up with the +//! following schema (also available in `postgres/schema.sql` in the repository): +//! +//! ```sql +#![doc=include_str!("../schema.sql")] +//! ``` +//! +//! ## Integration with External Applications +//! +//! The schema is stable, and any changes to the schema will be made in a major version with +//! migration instructions provided. +//! +//! An external application may: +//! - Add additional tables to the database +//! - Add additional columns to the `clients` table. If those columns do not have default +//! values, calls to [`Txn::new_client`] will fail. It is possible to configure +//! `taskchampion-sync-server` to never call this method. +//! - Insert rows into the `clients` table, using default values for all columns except +//! `client_id` and application-specific columns. +//! - Delete rows from the `clients` table, using `CASCADE` to ensure any associated data +//! is also deleted. + +use anyhow::Context; +use bb8::PooledConnection; +use bb8_postgres::PostgresConnectionManager; +use chrono::{TimeZone, Utc}; +use postgres_native_tls::MakeTlsConnector; +use taskchampion_sync_server_core::{Client, Snapshot, Storage, StorageTxn, Version}; +use uuid::Uuid; + +#[cfg(test)] +mod testing; + +/// A storage backend which uses Postgres. +pub struct PostgresStorage { + pool: bb8::Pool>, +} + +impl PostgresStorage { + pub async fn new(connection_string: impl ToString) -> anyhow::Result { + let connector = native_tls::TlsConnector::new()?; + let connector = postgres_native_tls::MakeTlsConnector::new(connector); + let manager = PostgresConnectionManager::new_from_stringlike(connection_string, connector)?; + let pool = bb8::Pool::builder().build(manager).await?; + Ok(Self { pool }) + } +} + +#[async_trait::async_trait] +impl Storage for PostgresStorage { + async fn txn(&self, client_id: Uuid) -> anyhow::Result> { + let db_client = self.pool.get_owned().await?; + + db_client + .execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE", &[]) + .await?; + + Ok(Box::new(Txn { + client_id, + db_client: Some(db_client), + })) + } +} + +struct Txn { + client_id: Uuid, + /// The DB client or, if `commit` has been called, None. This ensures queries aren't executed + /// after commit, and also frees connections back to the pool as quickly as possible. + db_client: Option>>, +} + +impl Txn { + /// Get the db_client, or panic if it is gone (after commit). + fn db_client(&self) -> &tokio_postgres::Client { + let Some(db_client) = &self.db_client else { + panic!("Cannot use a postgres Txn after commit"); + }; + db_client + } + + /// Implementation for queries from the versions table + async fn get_version_impl( + &mut self, + query: &'static str, + client_id: Uuid, + version_id_arg: Uuid, + ) -> anyhow::Result> { + Ok(self + .db_client() + .query_opt(query, &[&version_id_arg, &client_id]) + .await + .context("error getting version")? + .map(|r| Version { + version_id: r.get(0), + parent_version_id: r.get(1), + history_segment: r.get("history_segment"), + })) + } +} + +#[async_trait::async_trait(?Send)] +impl StorageTxn for Txn { + async fn get_client(&mut self) -> anyhow::Result> { + Ok(self + .db_client() + .query_opt( + "SELECT + latest_version_id, + snapshot_timestamp, + versions_since_snapshot, + snapshot_version_id + FROM clients + WHERE client_id = $1 + LIMIT 1", + &[&self.client_id], + ) + .await + .context("error getting client")? + .map(|r| { + let latest_version_id: Uuid = r.get(0); + let snapshot_timestamp: Option = r.get(1); + let versions_since_snapshot: Option = r.get(2); + let snapshot_version_id: Option = r.get(3); + + // if all of the relevant fields are non-NULL, return a snapshot + let snapshot = match ( + snapshot_timestamp, + versions_since_snapshot, + snapshot_version_id, + ) { + (Some(ts), Some(vs), Some(v)) => Some(Snapshot { + version_id: v, + timestamp: Utc.timestamp_opt(ts, 0).unwrap(), + versions_since: vs as u32, + }), + _ => None, + }; + Client { + latest_version_id, + snapshot, + } + })) + } + + async fn new_client(&mut self, latest_version_id: Uuid) -> anyhow::Result<()> { + self.db_client() + .execute( + "INSERT INTO clients (client_id, latest_version_id) VALUES ($1, $2)", + &[&self.client_id, &latest_version_id], + ) + .await + .context("error creating/updating client")?; + Ok(()) + } + + async fn set_snapshot(&mut self, snapshot: Snapshot, data: Vec) -> anyhow::Result<()> { + let timestamp = snapshot.timestamp.timestamp(); + self.db_client() + .execute( + "UPDATE clients + SET snapshot_version_id = $1, + versions_since_snapshot = $2, + snapshot_timestamp = $3, + snapshot = $4 + WHERE client_id = $5", + &[ + &snapshot.version_id, + &(snapshot.versions_since as i32), + ×tamp, + &data, + &self.client_id, + ], + ) + .await + .context("error setting snapshot")?; + Ok(()) + } + + async fn get_snapshot_data(&mut self, version_id: Uuid) -> anyhow::Result>> { + Ok(self + .db_client() + .query_opt( + "SELECT snapshot + FROM clients + WHERE client_id = $1 and snapshot_version_id = $2 + LIMIT 1", + &[&self.client_id, &version_id], + ) + .await + .context("error getting snapshot data")? + .map(|r| r.get(0))) + } + + async fn get_version_by_parent( + &mut self, + parent_version_id: Uuid, + ) -> anyhow::Result> { + self.get_version_impl( + "SELECT version_id, parent_version_id, history_segment + FROM versions + WHERE parent_version_id = $1 AND client_id = $2", + self.client_id, + parent_version_id, + ) + .await + } + + async fn get_version(&mut self, version_id: Uuid) -> anyhow::Result> { + self.get_version_impl( + "SELECT version_id, parent_version_id, history_segment + FROM versions + WHERE version_id = $1 AND client_id = $2", + self.client_id, + version_id, + ) + .await + } + + async fn add_version( + &mut self, + version_id: Uuid, + parent_version_id: Uuid, + history_segment: Vec, + ) -> anyhow::Result<()> { + self.db_client() + .execute( + "INSERT INTO versions (version_id, client_id, parent_version_id, history_segment) + VALUES ($1, $2, $3, $4)", + &[ + &version_id, + &self.client_id, + &parent_version_id, + &history_segment, + ], + ) + .await + .context("error inserting new version")?; + let rows_modified = self + .db_client() + .execute( + "UPDATE clients + SET latest_version_id = $1, + versions_since_snapshot = versions_since_snapshot + 1 + WHERE client_id = $2 and latest_version_id = $3", + &[&version_id, &self.client_id, &parent_version_id], + ) + .await + .context("error updating latest_version_id")?; + + // If no rows were modified, this operation failed. + if rows_modified == 0 { + anyhow::bail!("clients.latest_version_id does not match parent_version_id"); + } + Ok(()) + } + + async fn commit(&mut self) -> anyhow::Result<()> { + self.db_client().execute("COMMIT", &[]).await?; + self.db_client = None; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::testing::with_db; + + async fn make_client(db_client: &tokio_postgres::Client) -> anyhow::Result { + let client_id = Uuid::new_v4(); + db_client + .execute("insert into clients (client_id) values ($1)", &[&client_id]) + .await?; + Ok(client_id) + } + + async fn make_version( + db_client: &tokio_postgres::Client, + client_id: Uuid, + parent_version_id: Uuid, + history_segment: &[u8], + ) -> anyhow::Result { + let version_id = Uuid::new_v4(); + db_client + .execute( + "insert into versions + (version_id, client_id, parent_version_id, history_segment) + values ($1, $2, $3, $4)", + &[ + &version_id, + &client_id, + &parent_version_id, + &history_segment, + ], + ) + .await?; + Ok(version_id) + } + + async fn set_client_latest_version_id( + db_client: &tokio_postgres::Client, + client_id: Uuid, + latest_version_id: Uuid, + ) -> anyhow::Result<()> { + db_client + .execute( + "update clients set latest_version_id = $1 where client_id = $2", + &[&latest_version_id, &client_id], + ) + .await?; + Ok(()) + } + + async fn set_client_snapshot( + db_client: &tokio_postgres::Client, + client_id: Uuid, + snapshot_version_id: Uuid, + versions_since_snapshot: u32, + snapshot_timestamp: i64, + snapshot: &[u8], + ) -> anyhow::Result<()> { + db_client + .execute( + " + update clients + set snapshot_version_id = $1, + versions_since_snapshot = $2, + snapshot_timestamp = $3, + snapshot = $4 + where client_id = $5", + &[ + &snapshot_version_id, + &(versions_since_snapshot as i32), + &snapshot_timestamp, + &snapshot, + &client_id, + ], + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_get_client_none() -> anyhow::Result<()> { + with_db(async |connection_string, _db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = Uuid::new_v4(); + let mut txn = storage.txn(client_id).await?; + assert_eq!(txn.get_client().await?, None); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_get_client_exists_empty() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let mut txn = storage.txn(client_id).await?; + assert_eq!( + txn.get_client().await?, + Some(Client { + latest_version_id: Uuid::nil(), + snapshot: None + }) + ); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_get_client_exists_latest() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let latest_version_id = Uuid::new_v4(); + set_client_latest_version_id(&db_client, client_id, latest_version_id).await?; + let mut txn = storage.txn(client_id).await?; + assert_eq!( + txn.get_client().await?, + Some(Client { + latest_version_id, + snapshot: None + }) + ); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_get_client_exists_with_snapshot() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let snapshot_version_id = Uuid::new_v4(); + let versions_since_snapshot = 10; + let snapshot_timestamp = 10000000; + let snapshot = b"abcd"; + set_client_snapshot( + &db_client, + client_id, + snapshot_version_id, + versions_since_snapshot, + snapshot_timestamp, + snapshot, + ) + .await?; + let mut txn = storage.txn(client_id).await?; + assert_eq!( + txn.get_client().await?, + Some(Client { + latest_version_id: Uuid::nil(), + snapshot: Some(Snapshot { + version_id: snapshot_version_id, + timestamp: Utc.timestamp_opt(snapshot_timestamp, 0).unwrap(), + versions_since: versions_since_snapshot, + }) + }) + ); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_new_client() -> anyhow::Result<()> { + with_db(async |connection_string, _db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = Uuid::new_v4(); + let latest_version_id = Uuid::new_v4(); + + let mut txn1 = storage.txn(client_id).await?; + txn1.new_client(latest_version_id).await?; + + // Client is not visible yet as txn1 is not committed. + let mut txn2 = storage.txn(client_id).await?; + assert_eq!(txn2.get_client().await?, None); + + txn1.commit().await?; + + // Client is now visible. + let mut txn2 = storage.txn(client_id).await?; + assert_eq!( + txn2.get_client().await?, + Some(Client { + latest_version_id, + snapshot: None + }) + ); + + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_set_snapshot() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let mut txn = storage.txn(client_id).await?; + let snapshot_version_id = Uuid::new_v4(); + let versions_since_snapshot = 10; + let snapshot_timestamp = 10000000; + let snapshot = b"abcd"; + + txn.set_snapshot( + Snapshot { + version_id: snapshot_version_id, + timestamp: Utc.timestamp_opt(snapshot_timestamp, 0).unwrap(), + versions_since: versions_since_snapshot, + }, + snapshot.to_vec(), + ) + .await?; + txn.commit().await?; + + txn = storage.txn(client_id).await?; + assert_eq!( + txn.get_client().await?, + Some(Client { + latest_version_id: Uuid::nil(), + snapshot: Some(Snapshot { + version_id: snapshot_version_id, + timestamp: Utc.timestamp_opt(snapshot_timestamp, 0).unwrap(), + versions_since: versions_since_snapshot, + }) + }) + ); + + let row = db_client + .query_one( + "select snapshot from clients where client_id = $1", + &[&client_id], + ) + .await?; + assert_eq!(row.get::<_, &[u8]>(0), b"abcd"); + + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_get_snapshot_none() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let mut txn = storage.txn(client_id).await?; + assert_eq!(txn.get_snapshot_data(Uuid::new_v4()).await?, None); + + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_get_snapshot_mismatched_version() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let mut txn = storage.txn(client_id).await?; + + let snapshot_version_id = Uuid::new_v4(); + let versions_since_snapshot = 10; + let snapshot_timestamp = 10000000; + let snapshot = b"abcd"; + txn.set_snapshot( + Snapshot { + version_id: snapshot_version_id, + timestamp: Utc.timestamp_opt(snapshot_timestamp, 0).unwrap(), + versions_since: versions_since_snapshot, + }, + snapshot.to_vec(), + ) + .await?; + + assert_eq!(txn.get_snapshot_data(Uuid::new_v4()).await?, None); + + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_get_version() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let parent_version_id = Uuid::new_v4(); + let version_id = make_version(&db_client, client_id, parent_version_id, b"v1").await?; + + let mut txn = storage.txn(client_id).await?; + + // Different parent doesn't exist. + assert_eq!(txn.get_version_by_parent(Uuid::new_v4()).await?, None); + + // Different version doesn't exist. + assert_eq!(txn.get_version(Uuid::new_v4()).await?, None); + + let version = Version { + version_id, + parent_version_id, + history_segment: b"v1".to_vec(), + }; + + // Version found by parent. + assert_eq!( + txn.get_version_by_parent(parent_version_id).await?, + Some(version.clone()) + ); + + // Version found by ID. + assert_eq!(txn.get_version(version_id).await?, Some(version)); + + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_add_version() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let mut txn = storage.txn(client_id).await?; + let version_id = Uuid::new_v4(); + txn.add_version(version_id, Uuid::nil(), b"v1".to_vec()) + .await?; + assert_eq!( + txn.get_version(version_id).await?, + Some(Version { + version_id, + parent_version_id: Uuid::nil(), + history_segment: b"v1".to_vec() + }) + ); + Ok(()) + }) + .await + } + + #[tokio::test] + /// When an add_version call specifies an incorrect `parent_version_id, it fails. This is + /// typically avoided by calling `get_client` beforehand, which (due to repeatable reads) + /// allows the caller to check the `latest_version_id` before calling `add_version`. + async fn test_add_version_mismatch() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + let client_id = make_client(&db_client).await?; + let latest_version_id = Uuid::new_v4(); + set_client_latest_version_id(&db_client, client_id, latest_version_id).await?; + + let mut txn = storage.txn(client_id).await?; + let version_id = Uuid::new_v4(); + let parent_version_id = Uuid::new_v4(); // != latest_version_id + let res = txn + .add_version(version_id, parent_version_id, b"v1".to_vec()) + .await; + assert!(res.is_err()); + Ok(()) + }) + .await + } + + #[tokio::test] + /// Adding versions to two different clients can proceed concurrently. + async fn test_add_version_no_conflict_different_clients() -> anyhow::Result<()> { + with_db(async |connection_string, db_client| { + let storage = PostgresStorage::new(connection_string).await?; + + // Clients 1 and 2 do not interfere with each other; if these are the same client, then + // this will deadlock as one transaction waits for the other. If the postgres storage + // implementation serialized _all_ transactions across clients, that would limit its + // scalability. + // + // So the asertion here is "does not deadlock". + + let client_id1 = make_client(&db_client).await?; + let mut txn1 = storage.txn(client_id1).await?; + let version_id1 = Uuid::new_v4(); + txn1.add_version(version_id1, Uuid::nil(), b"v1".to_vec()) + .await?; + + let client_id2 = make_client(&db_client).await?; + let mut txn2 = storage.txn(client_id2).await?; + let version_id2 = Uuid::new_v4(); + txn2.add_version(version_id2, Uuid::nil(), b"v2".to_vec()) + .await?; + + txn1.commit().await?; + txn2.commit().await?; + + Ok(()) + }) + .await + } +} diff --git a/postgres/src/testing.rs b/postgres/src/testing.rs new file mode 100644 index 0000000..acb7727 --- /dev/null +++ b/postgres/src/testing.rs @@ -0,0 +1,76 @@ +use std::{future::Future, sync::LazyLock}; +use tokio::{sync::Mutex, task}; +use tokio_postgres::NoTls; + +// An async mutex used to ensure exclusive access to the database. +static DB_LOCK: LazyLock> = std::sync::LazyLock::new(|| Mutex::new(())); + +/// Call the given function with a DB client, pointing to an initialized DB. +/// +/// This serializes use of the database so that two tests are not simultaneously +/// modifying it. +/// +/// The function's future need not be `Send`. +pub(crate) async fn with_db(f: F) -> anyhow::Result<()> +where + F: FnOnce(String, tokio_postgres::Client) -> FUT, + FUT: Future> + 'static, +{ + let _ = env_logger::builder().is_test(true).try_init(); + + let Ok(connection_string) = std::env::var("TEST_DB_URL") else { + // If this is run in a GitHub action, then we really don't want to skip the tests. + if std::env::var("GITHUB_ACTIONS").is_ok() { + panic!("TEST_DB_URL must be set in GitHub actions"); + } + // Skip the test. + return Ok(()); + }; + + // Serialize use of the DB. + let _db_guard = DB_LOCK.lock().await; + + let local_set = task::LocalSet::new(); + local_set + .run_until(async move { + let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; + let conn_join_handle = tokio::spawn(async move { + if let Err(e) = connection.await { + log::warn!("connection error: {e}"); + } + }); + + // Set up the DB. + client + .execute("drop schema if exists public cascade", &[]) + .await?; + client.execute("create schema public", &[]).await?; + client.simple_query(include_str!("../schema.sql")).await?; + + // Run the test in its own task, so that we can handle all failure cases. This task must be + // local because the future typically uses `StorageTxn` which is not `Send`. + let test_join_handle = tokio::task::spawn_local(f(connection_string.clone(), client)); + + // Wait for the test task to complete. + let test_res = test_join_handle.await?; + + conn_join_handle.await?; + + // Clean up the DB. + + let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?; + let conn_join_handle = tokio::spawn(async move { + if let Err(e) = connection.await { + log::warn!("connection error: {e}"); + } + }); + client + .execute("drop schema if exists public cascade", &[]) + .await?; + drop(client); + conn_join_handle.await?; + + test_res + }) + .await +}