Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pg-queue): add pg-queue #777

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,413 changes: 877 additions & 536 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ keypairs
kwallet
laddr
lahcen
launchbadge
lcfur
lcgadget
ldflags
Expand Down Expand Up @@ -528,6 +529,7 @@ snapshottypes
solomachine
spents
splitn
sqlx
srcs
stakingkeeper
stakingtypes
Expand Down
2 changes: 2 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
./tools/hasura-cli/hasura-cli.nix
./tools/todo-comment.nix
./tools/iaviewer/iaviewer.nix
./tools/sqlx-cli/sqlx-cli.nix
./networks/e2e-setup.nix
./networks/devnet.nix
./networks/genesis/devnet-minimal.nix
Expand Down Expand Up @@ -299,6 +300,7 @@
protobuf
self'.packages.hasura-cli
self'.packages.tdc
self'.packages.sqlx-cli
solc
yarn
yq
Expand Down
2 changes: 1 addition & 1 deletion lib/chain-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tracing = "0.1.37"
tokio = { version = "1.32.0", default-features = false }
typenum = { version = "1.16.0", default-features = false, features = ["const-generics", "no_std"] }
# TODO: Use a version here
prost = "*"
prost = "0.11.0"
sha2 = "0.10.6"
chrono = { version = "0.4.26", default-features = false, features = ["alloc"] }
hubble.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions lib/pgqueue/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions lib/pgqueue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "pg-queue"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = "1.0.188"
serde_json = "1.0.107"
sqlx = { version = "0.7.2", features = ["postgres", "migrate", "runtime-tokio"] }
5 changes: 5 additions & 0 deletions lib/pgqueue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
To work on this package, make sure you are running postgres and set the DATABASE_URL variable. For devnet this should be:

```
export DATABASE_URL="postgres://postgres:postgrespassword@localhost:5432/default"
```
3 changes: 3 additions & 0 deletions lib/pgqueue/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
println!("cargo:rerun-if-changed=migrations");
}
1 change: 1 addition & 0 deletions lib/pgqueue/migrations/20231008225947_initial.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE TYPE status AS ENUM ('ready', 'done', 'failed');
7 changes: 7 additions & 0 deletions lib/pgqueue/migrations/20231009120059_queue.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE queue (
id BIGSERIAL PRIMARY KEY,
status status NOT NULL DEFAULT 'ready',
item JSONB NOT NULL,
-- Error message in case of permanent failure. If set, status MUST be 'failed'.
message TEXT CHECK (((message IS NULL) AND (status != 'failed'::status)) OR ((message IS NOT NULL) AND (status = 'failed'::status)))
);
103 changes: 103 additions & 0 deletions lib/pgqueue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use serde::Serialize;
use serde_json::Value;
use sqlx::{error::BoxDynError, migrate::Migrator, query, types::Json, Acquire, Postgres};

pub static MIGRATOR: Migrator = sqlx::migrate!(); // defaults to "./migrations"

/// A fifo queue backed by a postgres table. Not suitable for high-throughput, but enough for ~1k items/sec.
///
/// The queue assumes the following database schema:
///
/// id SERIAL AUTO INCREMENT
/// status 0..2
/// item JSONB
/// error TEXT
pub struct Queue {}

impl Queue {
/// Enqueues a new item for processing. The item's processing status is set to 0, indicating that it is ready
/// for processing.
pub async fn enqueue<'a, A, T: Serialize + Send + Sync>(
conn: A,
item: T,
) -> Result<i64, BoxDynError>
where
A: Acquire<'a, Database = Postgres>,
{
let mut tx = conn.begin().await?;
let row = query!(
"INSERT into queue (item) VALUES ($1) RETURNING id",
Json(item) as _
)
.fetch_one(tx.as_mut())
.await?;
tx.commit().await?;
Ok(row.id)
}

/// Processes the next value from the queue, calling `f` on the value. Dequeueing has the following properties:
/// - if `f` returns an error, the item is requeued.
/// - if `f` returns Ok(ProcessFlow::Fail), the item is permanently marked as failed.
/// - if `f` returns Ok(ProcessFlow::Continue), the item is requeued, but process returns with Ok(()).
/// - if `f` returns Ok(ProcessFlow::Success), the item is marked as processed.
///
/// Database atomicity is used to ensure that the queue is always in a consistent state, meaning that an item
/// process will always be retried until it reaches ProcessFlow::Fail or ProcessFlow::Success. `f` is responsible for
/// storing metadata in the job to determine if retrying should fail permanently.
pub async fn process<'a, A>(
conn: A,
f: impl FnOnce((i64, Value)) -> Result<ProcessFlow, BoxDynError>,
) -> Result<(), BoxDynError>
where
A: Acquire<'a, Database = Postgres>,
{
let mut tx = conn.begin().await?;

let row = query!(
"
UPDATE queue
SET status = 'done'::status
WHERE id = (
SELECT id
FROM queue
WHERE status = 'ready'::status
ORDER BY id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, item;",
)
.fetch_one(tx.as_mut())
.await?;

match f((row.id, row.item))? {
ProcessFlow::Fail(error) => {
// Insert error message in the queue
query!(
"
UPDATE queue
SET status = 'failed'::status, message = $1
WHERE id = $2",
error,
row.id,
)
.execute(tx.as_mut())
.await?;
tx.commit().await?;
}
ProcessFlow::Success => {
tx.commit().await?;
}
ProcessFlow::Requeue => {
tx.rollback().await?;
}
}
Ok(())
}
}

pub enum ProcessFlow {
Success,
Requeue,
Fail(String),
}
8 changes: 4 additions & 4 deletions networks/devnet.nix
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@
};
};

# postgres-services = {
# postgres = import ./services/postgres.nix { inherit lib pkgs; };
# };
postgres-services = {
postgres = import ./services/postgres.nix { inherit lib pkgs; };
};

# hasura-services = import ./services/hasura.nix { inherit lib pkgs; migrations = self'.packages.hubble-migrations; };
# hubble-services = { hubble = import ./services/hubble.nix { inherit lib; image = self'.packages.hubble-image; }; };

devnet = {
project.name = "devnet";
services = sepolia-services // uniond-services;
services = sepolia-services // uniond-services // postgres-services;
};

devnet-minimal = {
Expand Down
25 changes: 25 additions & 0 deletions tools/sqlx-cli/sqlx-cli.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{ ... }: {
perSystem = { pkgs, crane, ... }:
let
name = "sqlx-cli";
in
{
packages = {
sqlx-cli = crane.lib.buildPackage {
name = name;
version = "0.7.1";
cargoExtraArgs = "-p sqlx-cli";
nativeBuildInputs = [ pkgs.pkg-config ];
buildInputs = [ pkgs.openssl ];
src = pkgs.fetchFromGitHub {
inherit name;
owner = "launchbadge";
repo = "sqlx";
rev = "b1387057e5e6c6b72eacd01f491cb45854616502";
sha256 = "sha256-jCMDJuE7iYCAWgIDRq4KVGrwbV3TM0Ws9GiFxFn+hVU=";
};
meta.mainProgram = "sqlx";
};
};
};
}
2 changes: 1 addition & 1 deletion voyager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ethers = { version = "2.0.4", features = ["rustls", "ws"] }
futures = "0.3.28"
hex-literal = "0.4.1"
num-bigint = "0.4"
prost = "*"
prost = "0.11.0"
reqwest = { version = "0.11.17", default-features = false, features = ["tokio-rustls"] }
ripemd = "0.1.3"
serde_json = "1.0.96"
Expand Down
Loading