Skip to content

Commit

Permalink
feat(voyager): nicer logs and more stack (#1981)
Browse files Browse the repository at this point in the history
- emit a log for every event
- use `serde_stacker` in `pg-queue` (may remove this in the future, will
need more tests to see if it's necessary)
- make tokio's worker threads stack size configurable (default is 2MiB,
which we blow through almost immediately in debug builds)
- `Box::pin` a few things to cut down on stack usage
- `noop()` `QueueMsg` constructor fn for consistency
  • Loading branch information
benluelo authored May 29, 2024
2 parents 19099cb + 4e8d006 commit fa66f98
Show file tree
Hide file tree
Showing 19 changed files with 668 additions and 336 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions lib/block-message/src/chain_impls/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::StreamExt;
use queue_msg::{
aggregate,
aggregation::{do_aggregate, UseAggregate},
conc, data, fetch, queue_msg, QueueMsg,
conc, data, fetch, noop, queue_msg, QueueMsg,
};
use serde::{Deserialize, Serialize};
use unionlabs::{
Expand Down Expand Up @@ -135,7 +135,7 @@ where

if from_block == to_block {
tracing::debug!(%from_block, %to_block, %from_slot, %to_slot, "beacon block range is empty");
QueueMsg::Noop
noop()
} else {
tracing::debug!(%from_block, %to_block, "fetching block range");
// REVIEW: Surely transactions and events can be fetched in parallel?
Expand All @@ -162,7 +162,7 @@ where

match IBCHandlerEvents::decode_log(&log.into()) {
Ok(event) => {
tracing::info!(?event, "found IBCHandler event");
tracing::debug!(?event, "found IBCHandler event");
Some(mk_aggregate_event(c, event, event_height, tx_hash).await)
}
Err(e) => {
Expand Down Expand Up @@ -482,7 +482,7 @@ where
},
))
}
IBCHandlerEvents::ClientEvent(IBCClientEvents::ClientRegisteredFilter(_)) => QueueMsg::Noop,
IBCHandlerEvents::ClientEvent(IBCClientEvents::ClientRegisteredFilter(_)) => noop(),
IBCHandlerEvents::ClientEvent(IBCClientEvents::ClientUpdatedFilter(
ClientUpdatedFilter {
client_id,
Expand Down Expand Up @@ -539,8 +539,8 @@ where
raw_event,
)
}
IBCHandlerEvents::PacketEvent(IBCPacketEvents::TimeoutPacketFilter(_)) => QueueMsg::Noop,
IBCHandlerEvents::OwnableEvent(_) => QueueMsg::Noop,
IBCHandlerEvents::PacketEvent(IBCPacketEvents::TimeoutPacketFilter(_)) => noop(),
IBCHandlerEvents::OwnableEvent(_) => noop(),
}
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions lib/pg-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ workspace = true
test-include = []

[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["postgres", "migrate", "macros", "json", "runtime-tokio"] }
tokio = { workspace = true, features = ["time"] }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true, features = ["unbounded_depth"] }
serde_stacker = "0.1.11"
sqlx = { workspace = true, features = ["postgres", "migrate", "macros", "json", "runtime-tokio"] }
tokio = { workspace = true, features = ["time"] }
tracing = { workspace = true }
33 changes: 23 additions & 10 deletions lib/pg-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ use std::{
};

use serde::{de::DeserializeOwned, Serialize};
use sqlx::{migrate::Migrator, query, query_as, types::Json, Acquire, Postgres};
use sqlx::{migrate::Migrator, query, types::Json, Acquire, Postgres};
use tracing::Instrument;

pub static MIGRATOR: Migrator = sqlx::migrate!(); // defaults to "./migrations"

// TODO: Remove
pub use serde_stacker;

/// A fifo queue backed by a postgres table. Not suitable for high-throughput, but enough for ~1k items/sec.
///
/// The queue assumes the following database schema:
Expand Down Expand Up @@ -92,14 +95,14 @@ impl<T: DeserializeOwned + Serialize + Unpin + Send + Sync> Queue<T> {

let mut tx = conn.begin().await?;

#[derive(Debug)]
struct Record<T> {
id: i64,
item: Json<T>,
}
// #[derive(Debug)]
// struct Record<T> {
// id: i64,
// item: String,
// }

let row = query_as!(
Record::<T>,
let row = query!(
// Record::<T>,
"
UPDATE queue
SET status = 'done'::status
Expand All @@ -111,15 +114,25 @@ impl<T: DeserializeOwned + Serialize + Unpin + Send + Sync> Queue<T> {
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, item as \"item: Json<T>\"",
RETURNING id, item::text as \"item!: String\"",
)
.fetch_optional(tx.as_mut())
.await?;

match row {
Some(row) => {
let span = tracing::info_span!("processing item", id = row.id);
let (r, res) = f(row.item.0).instrument(span).await;

tracing::trace!(%row.item);

let mut deserializer: serde_json::Deserializer<serde_json::de::StrRead> =
serde_json::Deserializer::from_str(&row.item);
deserializer.disable_recursion_limit();
let deserializer = serde_stacker::Deserializer::new(&mut deserializer);

let json = T::deserialize(deserializer).unwrap();

let (r, res) = f(json).instrument(span).await;

match res {
Err(error) => {
Expand Down
31 changes: 19 additions & 12 deletions lib/queue-msg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(trait_alias, extract_if)]
// #![warn(clippy::large_futures, clippy::large_stack_frames)]

use std::{
collections::VecDeque,
Expand Down Expand Up @@ -208,6 +209,12 @@ pub fn void<T: QueueMessageTypes>(t: impl Into<QueueMsg<T>>) -> QueueMsg<T> {
QueueMsg::Void(Box::new(t.into()))
}

#[inline]
#[must_use = "constructing an instruction has no effect"]
pub fn noop<T: QueueMessageTypes>() -> QueueMsg<T> {
QueueMsg::Noop
}

pub trait QueueMsgTypesTraits = Debug
+ Clone
+ PartialEq
Expand Down Expand Up @@ -364,7 +371,7 @@ impl<T: QueueMessageTypes> QueueMsg<T> {
Ok(msg.handle(store, depth + 1).await?.map(|msg| match msg {
QueueMsg::Data(data) => {
tracing::debug!(data = %serde_json::to_string(&data).unwrap(), "voiding data");
QueueMsg::Noop
noop()
}
msg => void(msg),
}))
Expand Down Expand Up @@ -448,37 +455,37 @@ mod tests {

impl HandleEffect<UnitMessageTypes> for () {
async fn handle(self, _: &()) -> Result<QueueMsg<UnitMessageTypes>, QueueError> {
Ok(QueueMsg::Noop)
Ok(noop())
}
}

impl HandleEvent<UnitMessageTypes> for () {
fn handle(self, _: &()) -> Result<QueueMsg<UnitMessageTypes>, QueueError> {
Ok(QueueMsg::Noop)
Ok(noop())
}
}

impl HandleData<UnitMessageTypes> for () {
fn handle(self, _: &()) -> Result<QueueMsg<UnitMessageTypes>, QueueError> {
Ok(QueueMsg::Noop)
Ok(noop())
}
}

impl HandleFetch<UnitMessageTypes> for () {
async fn handle(self, _: &()) -> Result<QueueMsg<UnitMessageTypes>, QueueError> {
Ok(QueueMsg::Noop)
Ok(noop())
}
}

impl HandleWait<UnitMessageTypes> for () {
async fn handle(self, _: &()) -> Result<QueueMsg<UnitMessageTypes>, QueueError> {
Ok(QueueMsg::Noop)
Ok(noop())
}
}

impl HandleAggregate<UnitMessageTypes> for () {
fn handle(self, _: VecDeque<()>) -> Result<QueueMsg<UnitMessageTypes>, QueueError> {
Ok(QueueMsg::Noop)
Ok(noop())
}
}

Expand Down Expand Up @@ -629,22 +636,22 @@ mod tests {
seq([fetch(()), fetch(()), effect(())]),
seq([fetch(()), repeat(None, fetch(()))]),
seq([fetch(()), repeat(None, fetch(()))]),
QueueMsg::Noop,
noop(),
],
vec_deque![
seq([fetch(()), repeat(None, fetch(()))]),
seq([fetch(()), repeat(None, fetch(()))]),
QueueMsg::Noop,
noop(),
seq([fetch(()), effect(())]),
],
vec_deque![
seq([fetch(()), repeat(None, fetch(()))]),
QueueMsg::Noop,
noop(),
seq([fetch(()), effect(())]),
repeat(None, fetch(())),
],
vec_deque![
QueueMsg::Noop,
noop(),
seq([fetch(()), effect(())]),
repeat(None, fetch(())),
repeat(None, fetch(())),
Expand All @@ -668,7 +675,7 @@ mod tests {
vec_deque![
seq([fetch(()), repeat(None, fetch(()))]),
seq([fetch(()), repeat(None, fetch(()))]),
QueueMsg::Noop,
noop(),
],
],
)
Expand Down
10 changes: 5 additions & 5 deletions lib/relay-message/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use macros::apply;
use queue_msg::{
aggregate,
aggregation::{do_aggregate, UseAggregate},
conc, defer_relative, effect, fetch, queue_msg, seq, wait, HandleAggregate, QueueError,
conc, defer_relative, effect, fetch, noop, queue_msg, seq, wait, HandleAggregate, QueueError,
QueueMessageTypes, QueueMsg,
};
use unionlabs::{
Expand Down Expand Up @@ -2396,7 +2396,7 @@ where
destination_channel = %packet.destination_channel,
"packet received, cancelling timeout"
);
QueueMsg::Noop
noop()
} else {
seq([
// void(wait(id(
Expand Down Expand Up @@ -2593,7 +2593,7 @@ where
assert_eq!(this_chain_id, next_connection_sequence_chain_id);

if next_connection_sequence >= sequence {
QueueMsg::Noop
noop()
} else {
seq([
defer_relative(3),
Expand Down Expand Up @@ -2650,7 +2650,7 @@ where
assert_eq!(this_chain_id, next_client_sequence_chain_id);

if next_client_sequence >= sequence {
QueueMsg::Noop
noop()
} else {
seq([
defer_relative(3),
Expand Down Expand Up @@ -2710,7 +2710,7 @@ where
assert_eq!(connection_id, path_connection_id);

if connection.state == connection::state::State::Open {
QueueMsg::Noop
noop()
} else {
seq([
defer_relative(3),
Expand Down
15 changes: 9 additions & 6 deletions lib/relay-message/src/effect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use chain_utils::GetChain;
use macros::apply;
use queue_msg::{queue_msg, HandleEffect, QueueError, QueueMessageTypes, QueueMsg};
use queue_msg::{noop, queue_msg, HandleEffect, QueueError, QueueMessageTypes, QueueMsg};
use unionlabs::{
ibc::core::{
channel::{
Expand Down Expand Up @@ -55,11 +55,14 @@ impl HandleEffect<RelayMessageTypes> for AnyLightClientIdentified<AnyEffect> {
any_lc! {
|msg| {
store
.with_chain(&msg.chain_id, move |c| async move { msg.t.handle(&c).await })
.map_err(|e| QueueError::Fatal(Box::new(e)))?
.await
.map_err(|e: <Hc as ChainExt>::MsgError| QueueError::Retry(Box::new(e)))
.map(|()| QueueMsg::Noop)
.with_chain(
&msg.chain_id,
move |c| async move { msg.t.handle(&c).await },
)
.map_err(|e| QueueError::Fatal(Box::new(e)))?
.await
.map_err(|e: <Hc as ChainExt>::MsgError| QueueError::Retry(Box::new(e)))
.map(|()| noop())
}
}
}
Expand Down
Loading

0 comments on commit fa66f98

Please sign in to comment.