Skip to content

Commit

Permalink
add height metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Sep 22, 2024
1 parent 8451dd0 commit 4329b4b
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 33 deletions.
3 changes: 1 addition & 2 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ async fn start_processing_sui_checkpoints_by_querying_txns(
sui_rpc_url: String,
db_url: String,
indexer_metrics: BridgeIndexerMetrics,
bridge_metrics: Arc<BridgeMetrics>,
) -> Result<Vec<JoinHandle<()>>> {
let pg_pool = get_connection_pool(db_url.clone()).await;
let (tx, rx) = channel(
Expand All @@ -212,7 +211,7 @@ async fn start_processing_sui_checkpoints_by_querying_txns(
.expect("Failed to read cursor from sui progress store");
let sui_client = SuiClientBuilder::default().build(sui_rpc_url).await?;
handles.push(spawn_logged_monitored_task!(
start_sui_tx_polling_task(sui_client, cursor, tx, bridge_metrics),
start_sui_tx_polling_task(sui_client, cursor, tx),
"start_sui_tx_polling_task"
));
handles.push(spawn_logged_monitored_task!(
Expand Down
7 changes: 1 addition & 6 deletions crates/sui-bridge-indexer/src/sui_transaction_queries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;
use std::time::Duration;
use sui_json_rpc_types::SuiTransactionBlockResponseOptions;
use sui_json_rpc_types::SuiTransactionBlockResponseQuery;
Expand All @@ -10,7 +9,7 @@ use sui_sdk::SuiClient;
use sui_types::digests::TransactionDigest;
use sui_types::SUI_BRIDGE_OBJECT_ID;

use sui_bridge::{metrics::BridgeMetrics, retry_with_max_elapsed_time};
use sui_bridge::retry_with_max_elapsed_time;
use tracing::{error, info};

use crate::types::RetrievedTransaction;
Expand All @@ -25,7 +24,6 @@ pub async fn start_sui_tx_polling_task(
Vec<RetrievedTransaction>,
Option<TransactionDigest>,
)>,
metrics: Arc<BridgeMetrics>,
) {
info!("Starting SUI transaction polling task from {:?}", cursor);
loop {
Expand Down Expand Up @@ -68,12 +66,9 @@ pub async fn start_sui_tx_polling_task(
tokio::time::sleep(QUERY_DURATION).await;
continue;
}
// Unwrap: txes is not empty
let ckp = txes.last().unwrap().checkpoint;
tx.send((txes, results.next_cursor))
.await
.expect("Failed to send transaction block to process");
metrics.last_synced_sui_checkpoint.set(ckp as i64);
cursor = results.next_cursor;
}
}
8 changes: 5 additions & 3 deletions crates/sui-bridge/src/eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ where
tracing::debug!("Last finalized block: {}", new_value);
metrics.last_finalized_eth_block.set(new_value as i64);

// TODO add a metrics for the last finalized block

if new_value > last_block_number {
last_finalized_block_sender
.send(new_value)
Expand All @@ -136,6 +134,7 @@ where
metrics: Arc<BridgeMetrics>,
) {
tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
let contract_address_str = contract_address.to_string();
let mut more_blocks = false;
loop {
// If no more known blocks, wait for the next finalized block.
Expand Down Expand Up @@ -198,7 +197,10 @@ where
);
}
if let Some(last_block) = last_block {
metrics.last_synced_eth_block.set(last_block as i64);
metrics
.last_synced_eth_blocks
.with_label_values(&[&contract_address_str])
.set(last_block as i64);
}
start_block = end_block + 1;
}
Expand Down
20 changes: 11 additions & 9 deletions crates/sui-bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ pub struct BridgeMetrics {
pub(crate) err_requests: IntCounterVec,
pub(crate) requests_inflight: IntGaugeVec,

pub last_synced_sui_checkpoint: IntGauge,
pub(crate) last_synced_sui_checkpoints: IntGaugeVec,
pub(crate) last_finalized_eth_block: IntGauge,
pub(crate) last_synced_eth_block: IntGauge,
pub(crate) last_synced_eth_blocks: IntGaugeVec,

pub(crate) sui_watcher_received_events: IntCounter,
pub(crate) sui_watcher_received_actions: IntCounter,
Expand Down Expand Up @@ -262,21 +262,23 @@ impl BridgeMetrics {
registry,
)
.unwrap(),
last_synced_sui_checkpoint: register_int_gauge_with_registry!(
"last_synced_sui_checkpoint",
"The latest sui checkpoint that indexer synced",
last_synced_sui_checkpoints: register_int_gauge_vec_with_registry!(
"bridge_last_synced_sui_checkpoints",
"The latest sui checkpoints synced for each module",
&["module_name"],
registry,
)
.unwrap(),
last_synced_eth_block: register_int_gauge_with_registry!(
"bridge_last_synced_eth_block",
"The latest finalized eth block that indexer synced",
last_synced_eth_blocks: register_int_gauge_vec_with_registry!(
"bridge_last_synced_eth_blocks",
"The latest synced eth blocks synced for each contract",
&["contract_address"],
registry,
)
.unwrap(),
last_finalized_eth_block: register_int_gauge_with_registry!(
"bridge_last_finalized_eth_block",
"The latest finalized eth block that indexer observed",
"The latest finalized eth block observed",
registry,
)
.unwrap(),
Expand Down
13 changes: 8 additions & 5 deletions crates/sui-bridge/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ async fn start_client_components(
.expect("Failed to start eth syncer");
all_handles.extend(task_handles);

let (task_handles, sui_events_rx) =
SuiSyncer::new(client_config.sui_client, sui_modules_to_watch)
.run(Duration::from_secs(2))
.await
.expect("Failed to start sui syncer");
let (task_handles, sui_events_rx) = SuiSyncer::new(
client_config.sui_client,
sui_modules_to_watch,
metrics.clone(),
)
.run(Duration::from_secs(2))
.await
.expect("Failed to start sui syncer");
all_handles.extend(task_handles);

let committee = Arc::new(
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-bridge/src/sui_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ where
}
}

pub async fn get_latest_checkpoint_sequence_number(&self) -> BridgeResult<u64> {
Ok(self.inner.get_latest_checkpoint_sequence_number().await?)
}

pub async fn execute_transaction_block_with_effects(
&self,
tx: sui_types::transaction::Transaction,
Expand Down
10 changes: 7 additions & 3 deletions crates/sui-bridge/src/sui_mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::types::{BridgeAction, BridgeActionStatus, IsBridgePaused};
pub struct SuiMockClient {
// the top two fields do not change during tests so we don't need them to be Arc<Mutex>>
chain_identifier: String,
latest_checkpoint_sequence_number: u64,
latest_checkpoint_sequence_number: Arc<Mutex<u64>>,
events: Arc<Mutex<HashMap<(ObjectID, Identifier, Option<EventID>), EventPage>>>,
past_event_query_params: Arc<Mutex<VecDeque<(ObjectID, Identifier, Option<EventID>)>>>,
events_by_tx_digest:
Expand All @@ -51,7 +51,7 @@ impl SuiMockClient {
pub fn default() -> Self {
Self {
chain_identifier: "".to_string(),
latest_checkpoint_sequence_number: 0,
latest_checkpoint_sequence_number: Arc::new(Mutex::new(0)),
events: Default::default(),
past_event_query_params: Default::default(),
events_by_tx_digest: Default::default(),
Expand Down Expand Up @@ -128,6 +128,10 @@ impl SuiMockClient {
*self.wildcard_transaction_response.lock().unwrap() = Some(response);
}

pub fn set_latest_checkpoint_sequence_number(&self, value: u64) {
*self.latest_checkpoint_sequence_number.lock().unwrap() = value;
}

pub fn add_gas_object_info(&self, gas_coin: GasCoin, object_ref: ObjectRef, owner: Owner) {
self.get_object_info
.lock()
Expand Down Expand Up @@ -196,7 +200,7 @@ impl SuiClientInner for SuiMockClient {
}

async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, Self::Error> {
Ok(self.latest_checkpoint_sequence_number)
Ok(*self.latest_checkpoint_sequence_number.lock().unwrap())
}

async fn get_mutable_bridge_object_arg(&self) -> Result<ObjectArg, Self::Error> {
Expand Down
65 changes: 60 additions & 5 deletions crates/sui-bridge/src/sui_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use crate::{
error::BridgeResult,
metrics::BridgeMetrics,
retry_with_max_elapsed_time,
sui_client::{SuiClient, SuiClientInner},
};
Expand All @@ -15,6 +16,7 @@ use sui_json_rpc_types::SuiEvent;
use sui_types::BRIDGE_PACKAGE_ID;
use sui_types::{event::EventID, Identifier};
use tokio::{
sync::Notify,
task::JoinHandle,
time::{self, Duration},
};
Expand All @@ -29,16 +31,22 @@ pub struct SuiSyncer<C> {
// The last transaction that the syncer has fully processed.
// Syncer will resume post this transaction (i.e. exclusive), when it starts.
cursors: SuiTargetModules,
metrics: Arc<BridgeMetrics>,
}

impl<C> SuiSyncer<C>
where
C: SuiClientInner + 'static,
{
pub fn new(sui_client: Arc<SuiClient<C>>, cursors: SuiTargetModules) -> Self {
pub fn new(
sui_client: Arc<SuiClient<C>>,
cursors: SuiTargetModules,
metrics: Arc<BridgeMetrics>,
) -> Self {
Self {
sui_client,
cursors,
metrics,
}
}

Expand All @@ -59,6 +67,7 @@ where

let mut task_handles = vec![];
for (module, cursor) in self.cursors {
let metrics = self.metrics.clone();
let events_rx_clone: mysten_metrics::metered_channel::Sender<(
Identifier,
Vec<SuiEvent>,
Expand All @@ -70,7 +79,8 @@ where
cursor,
events_rx_clone,
sui_client_clone,
query_interval
query_interval,
metrics,
)
));
}
Expand All @@ -85,10 +95,33 @@ where
events_sender: mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
sui_client: Arc<SuiClient<C>>,
query_interval: Duration,
metrics: Arc<BridgeMetrics>,
) {
tracing::info!(?module, ?cursor, "Starting sui events listening task");
let mut interval = time::interval(query_interval);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);

// Create a task to update metrics
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let sui_client_clone = sui_client.clone();
let last_synced_sui_checkpoints_metric = metrics
.last_synced_sui_checkpoints
.with_label_values(&[&module.to_string()]);
spawn_logged_monitored_task!(async move {
loop {
notify_clone.notified().await;
let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
sui_client_clone.get_latest_checkpoint_sequence_number(),
Duration::from_secs(120)
) else {
tracing::error!("Failed to query latest checkpoint sequence number from sui client after retry");
continue;
};
last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
}
});

loop {
interval.tick().await;
let Ok(Ok(events)) = retry_with_max_elapsed_time!(
Expand All @@ -101,6 +134,11 @@ where

let len = events.data.len();
if len != 0 {
if !events.has_next_page {
// If this is the last page, it means we have processed all events up to the latest checkpoint
// We can then update the latest checkpoint metric.
notify.notify_one();
}
events_sender
.send((module.clone(), events.data))
.await
Expand Down Expand Up @@ -129,7 +167,7 @@ mod tests {
telemetry_subscribers::init_for_testing();
let registry = Registry::new();
mysten_metrics::init_metrics(&registry);

let metrics = Arc::new(BridgeMetrics::new(&registry));
let mock = SuiMockClient::default();
let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
let module_foo = Identifier::new("Foo").unwrap();
Expand All @@ -147,14 +185,15 @@ mod tests {
(module_bar.clone(), Some(cursor)),
]);
let interval = Duration::from_millis(200);
let (_handles, mut events_rx) = SuiSyncer::new(client, target_modules)
let (_handles, mut events_rx) = SuiSyncer::new(client, target_modules, metrics.clone())
.run(interval)
.await
.unwrap();

// Initially there are no events
assert_no_more_events(interval, &mut events_rx).await;

mock.set_latest_checkpoint_sequence_number(999);
// Module Foo has new events
let mut event_1: SuiEvent = SuiEvent::random_for_testing();
let package_id = BRIDGE_PACKAGE_ID;
Expand All @@ -180,6 +219,14 @@ mod tests {
assert_eq!(received_events[1].id, event_1.id);
// No more
assert_no_more_events(interval, &mut events_rx).await;
assert_eq!(
metrics
.last_synced_sui_checkpoints
.get_metric_with_label_values(&["Foo"])
.unwrap()
.get(),
999
);

// Module Bar has new events
let mut event_2: SuiEvent = SuiEvent::random_for_testing();
Expand All @@ -188,7 +235,7 @@ mod tests {
let module_bar_events_1 = EventPage {
data: vec![event_2.clone()],
next_cursor: Some(event_2.id),
has_next_page: false,
has_next_page: true, // Set to true so that the syncer will not update the last synced checkpoint
};
add_event_response(&mock, module_bar.clone(), event_2.id, empty_events.clone());

Expand All @@ -200,6 +247,14 @@ mod tests {
assert_eq!(received_events[0].id, event_2.id);
// No more
assert_no_more_events(interval, &mut events_rx).await;
assert_eq!(
metrics
.last_synced_sui_checkpoints
.get_metric_with_label_values(&["Bar"])
.unwrap()
.get(),
0, // Not updated
);

Ok(())
}
Expand Down

0 comments on commit 4329b4b

Please sign in to comment.