Skip to content

Commit

Permalink
feat(voyager): integrate pg-queue into voyager
Browse files Browse the repository at this point in the history
  • Loading branch information
benluelo committed Oct 10, 2023
1 parent ae305f2 commit 67ecc29
Show file tree
Hide file tree
Showing 21 changed files with 341 additions and 147 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ milagro_bls = { git = "https://github.com/datachainlab/milagro_bls", rev = "bc2b

contracts = { path = "generated/contracts", default-features = false }
serde-utils = { path = "lib/serde-utils", default-features = false }
pg-queue = { path = "lib/pg-queue", default-features = false }
ethereum-verifier = { path = "lib/ethereum-verifier", default-features = false }
cometbls-groth16-verifier = { path = "lib/cometbls-groth16-verifier", default-features = false }
ics008-wasm-client = { path = "lib/ics-008-wasm-client", default-features = false }
Expand Down
File renamed without changes.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

File renamed without changes.
File renamed without changes.
File renamed without changes.
158 changes: 158 additions & 0 deletions lib/pg-queue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::{future::Future, marker::PhantomData};

use serde::{de::DeserializeOwned, Serialize};
use sqlx::{migrate::Migrator, query, query_as, types::Json, Acquire, Postgres};

pub static MIGRATOR: Migrator = sqlx::migrate!(); // defaults to "./migrations"

/// A fifo queue backed by a postgres table. Not suitable for high-throughput, but enough for ~1k items/sec.
///
/// The queue assumes the following database schema:
///
/// id SERIAL AUTO INCREMENT
/// status 0..2
/// item JSONB
/// error TEXT
#[derive(Debug, Clone)]
pub struct Queue<T> {
__marker: PhantomData<fn() -> T>,
}

impl<T> Queue<T> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
__marker: PhantomData,
}
}
}

impl<T: DeserializeOwned + Serialize + Unpin + Send + Sync> Queue<T> {
/// Enqueues a new item for processing. The item's processing status is set to 0, indicating that it is ready
/// for processing.
pub async fn enqueue<'a, A>(conn: A, item: T) -> Result<(), sqlx::Error>
where
A: Acquire<'a, Database = Postgres>,
{
let mut tx = conn.begin().await?;

let row = query!(
"INSERT INTO queue (item) VALUES ($1) RETURNING id",
Json(item) as _
)
.fetch_one(tx.as_mut())
.await?;

tx.commit().await?;

println!("enqueued item with id {}", row.id);

Ok(())
}

/// Processes the next value from the queue, calling `f` on the value. Dequeueing has the following properties:
/// - if `f` returns an error, the item is requeued.
/// - if `f` returns Ok(ProcessFlow::Fail), the item is permanently marked as failed.
/// - if `f` returns Ok(ProcessFlow::Continue), the item is requeued, but process returns with Ok(()).
/// - if `f` returns Ok(ProcessFlow::Success), the item is marked as processed.
///
/// Database atomicity is used to ensure that the queue is always in a consistent state, meaning that an item
/// process will always be retried until it reaches ProcessFlow::Fail or ProcessFlow::Success. `f` is responsible for
/// storing metadata in the job to determine if retrying should fail permanently.
pub async fn process<'a, F, Fut, A>(conn: A, f: F) -> Result<(), sqlx::Error>
where
F: (FnOnce(T) -> Fut) + 'a,
Fut: Future<Output = ProcessFlow<T>> + 'a,
A: Acquire<'a, Database = Postgres>,
{
let mut tx = conn.begin().await?;

#[derive(Debug)]
struct Record<T> {
id: i64,
item: Json<T>,
}

let row = query_as!(
Record::<T>,
"
UPDATE queue
SET status = 'done'::status
WHERE id = (
SELECT id
FROM queue
WHERE status = 'ready'::status
ORDER BY id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, item as \"item: Json<T>\"",
)
.fetch_optional(tx.as_mut())
.await?;

match row {
Some(row) => {
// TODO: Use tracing
println!("processing item at row {}", row.id);

match f(row.item.0).await {
ProcessFlow::Fail(error) => {
// Insert error message in the queue
query!(
"UPDATE queue
SET status = 'failed'::status, message = $1
WHERE id = $2",
error,
row.id,
)
.execute(tx.as_mut())
.await?;
tx.commit().await?;
}
ProcessFlow::Success(new_msgs) => {
if !new_msgs.is_empty() {
let new_ids = query!(
"INSERT INTO queue (item)
SELECT * FROM UNNEST($1::JSONB[])
RETURNING id",
&*new_msgs
.into_iter()
.map(|t| serde_json::to_value(t).expect(
"queue message should have infallible serialization"
))
.collect::<Vec<_>>(),
)
.fetch_all(tx.as_mut())
.await?;

println!(
"inserted new messages with ids {}",
new_ids
.into_iter()
.map(|x| x.id.to_string())
.collect::<Vec<_>>()
.join(", ")
);
}

tx.commit().await?;
}
ProcessFlow::Requeue => {
tx.rollback().await?;
}
}
}
None => {
println!("queue is empty")
}
}
Ok(())
}
}

pub enum ProcessFlow<T> {
Success(Vec<T>),
Requeue,
Fail(String),
}
103 changes: 0 additions & 103 deletions lib/pgqueue/src/lib.rs

This file was deleted.

20 changes: 19 additions & 1 deletion networks/devnet.nix
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
postgres = import ./services/postgres.nix { inherit lib pkgs; };
};

# hasura-services = import ./services/hasura.nix { inherit lib pkgs; migrations = self'.packages.hubble-migrations; };
hasura-services = import ./services/hasura.nix {
inherit lib pkgs;
};
# hubble-services = { hubble = import ./services/hubble.nix { inherit lib; image = self'.packages.hubble-image; }; };

devnet = {
Expand Down Expand Up @@ -94,6 +96,13 @@
build-evm = arion.build spec-evm;

build-cosmos = arion.build spec-cosmos;

build-voyager-queue = arion.build {
modules = [{
project.name = "postgres";
services = postgres-services;
}];
};
in
{
packages.devnet =
Expand Down Expand Up @@ -123,6 +132,15 @@
'';
};

packages.voyager-queue =
pkgs.writeShellApplication {
name = "postgres";
runtimeInputs = [ arion ];
text = ''
arion --prebuilt-file ${build-voyager-queue} up --build --force-recreate -V --always-recreate-deps --remove-orphans
'';
};

_module.args.networks = {
inherit devnet devnet-minimal union sepolia;
};
Expand Down
2 changes: 1 addition & 1 deletion tools/sqlx-cli/sqlx-cli.nix
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{
packages = {
sqlx-cli = crane.lib.buildPackage {
name = name;
inherit name;
version = "0.7.1";
cargoExtraArgs = "-p sqlx-cli";
nativeBuildInputs = [ pkgs.pkg-config ];
Expand Down
5 changes: 4 additions & 1 deletion voyager-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@
},
"voyager": {
"hasura": null
},
"queue": {
"database_url": "postgres://postgres:postgrespassword@localhost:5432/default"
}
}
}
2 changes: 2 additions & 0 deletions voyager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ anyhow = "1.0.72"
pin-utils = "0.1.0"
tokio-stream = { version = "0.1.14" }
either = { version = "1.9.0", features = ["serde"] }
pg-queue.workspace = true

beacon-api = { workspace = true, default-features = false }
contracts.workspace = true
Expand All @@ -48,6 +49,7 @@ bitvec = "1.0.1"
displaydoc = { version = "0.2.4", default-features = false }
frame-support-procedural = "18.0.0"
hubble.workspace = true
sqlx = { version = "0.7.2", features = ["postgres"] }

[features]
eth-mainnet = [ "unionlabs/eth-mainnet" ]
Loading

0 comments on commit 67ecc29

Please sign in to comment.