Skip to content

Commit

Permalink
[bridge-indexer] batch processing checkpoints and indexing progress m…
Browse files Browse the repository at this point in the history
…etrics (#19179)

## Description 

This PR does two things:
1. pull checkpoint batches to processed when there are multiple, rather
than one-by-one. This improves the efficiency especially for
write_process
2. add metrics to check indexing progress.

## Test plan 

Deployed in production.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
longbowlu committed Sep 6, 2024
1 parent fa776a0 commit 6110568
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions crates/sui-bridge-indexer/src/eth_bridge_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_trait::async_trait;
use ethers::prelude::Transaction;
use ethers::providers::{Http, Middleware, Provider, StreamExt, Ws};
use ethers::types::{Address as EthAddress, Block, Filter, H256};
use prometheus::{IntCounterVec, IntGaugeVec};
use sui_bridge::error::BridgeError;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
Expand Down Expand Up @@ -145,6 +146,18 @@ impl Datasource<RawEthData> for EthSubscriptionDatasource {
fn get_genesis_height(&self) -> u64 {
self.genesis_block
}

fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.tasks_remaining_checkpoints
}

fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

pub struct EthSyncDatasource {
Expand Down Expand Up @@ -267,6 +280,18 @@ impl Datasource<RawEthData> for EthSyncDatasource {
fn get_genesis_height(&self) -> u64 {
self.genesis_block
}

fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.tasks_remaining_checkpoints
}

fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

#[derive(Clone)]
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod types;

pub mod eth_bridge_indexer;
pub mod sui_bridge_indexer;
pub mod sui_datasource;

#[derive(Clone)]
pub enum ProcessedTxnData {
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
use sui_bridge_indexer::sui_bridge_indexer::{PgBridgePersistent, SuiBridgeDataMapper};
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_config::Config;
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder};
use sui_indexer_builder::sui_datasource::SuiCheckpointDatasource;
use sui_sdk::SuiClientBuilder;

#[derive(Parser, Clone, Debug)]
Expand Down Expand Up @@ -137,6 +137,7 @@ async fn main() -> Result<()> {
config.checkpoints_path.clone().into(),
config.sui_bridge_genesis_checkpoint,
ingestion_metrics.clone(),
indexer_meterics.clone(),
);
let indexer = IndexerBuilder::new(
"SuiBridgeIndexer",
Expand Down
53 changes: 39 additions & 14 deletions crates/sui-bridge-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge,
Registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};

#[derive(Clone, Debug)]
Expand All @@ -20,83 +21,107 @@ pub struct BridgeIndexerMetrics {
pub(crate) last_committed_sui_checkpoint: IntGauge,
pub(crate) latest_committed_eth_block: IntGauge,
pub(crate) last_synced_eth_block: IntGauge,
pub(crate) tasks_remaining_checkpoints: IntGaugeVec,
pub(crate) tasks_processed_checkpoints: IntCounterVec,
pub(crate) live_task_current_checkpoint: IntGaugeVec,
}

impl BridgeIndexerMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
total_sui_bridge_transactions: register_int_counter_with_registry!(
"total_sui_bridge_transactions",
"bridge_indexer_total_sui_bridge_transactions",
"Total number of sui bridge transactions",
registry,
)
.unwrap(),
total_sui_token_deposited: register_int_counter_with_registry!(
"total_sui_token_deposited",
"bridge_indexer_total_sui_token_deposited",
"Total number of sui token deposited transactions",
registry,
)
.unwrap(),
total_sui_token_transfer_approved: register_int_counter_with_registry!(
"total_sui_token_transfer_approved",
"bridge_indexer_total_sui_token_transfer_approved",
"Total number of sui token approved transactions",
registry,
)
.unwrap(),
total_sui_token_transfer_claimed: register_int_counter_with_registry!(
"total_sui_token_transfer_claimed",
"bridge_indexer_total_sui_token_transfer_claimed",
"Total number of sui token claimed transactions",
registry,
)
.unwrap(),
total_sui_bridge_txn_other: register_int_counter_with_registry!(
"total_sui_bridge_txn_other",
"bridge_indexer_total_sui_bridge_txn_other",
"Total number of other sui bridge transactions",
registry,
)
.unwrap(),
total_eth_bridge_transactions: register_int_counter_with_registry!(
"total_eth_bridge_transactions",
"bridge_indexer_total_eth_bridge_transactions",
"Total number of eth bridge transactions",
registry,
)
.unwrap(),
total_eth_token_deposited: register_int_counter_with_registry!(
"total_eth_token_deposited",
"bridge_indexer_total_eth_token_deposited",
"Total number of eth token deposited transactions",
registry,
)
.unwrap(),
total_eth_token_transfer_claimed: register_int_counter_with_registry!(
"total_eth_token_transfer_claimed",
"bridge_indexer_total_eth_token_transfer_claimed",
"Total number of eth token claimed transactions",
registry,
)
.unwrap(),
total_eth_bridge_txn_other: register_int_counter_with_registry!(
"total_eth_bridge_txn_other",
"bridge_indexer_total_eth_bridge_txn_other",
"Total number of other eth bridge transactions",
registry,
)
.unwrap(),
last_committed_sui_checkpoint: register_int_gauge_with_registry!(
"last_committed_sui_checkpoint",
"bridge_indexer_last_committed_sui_checkpoint",
"The latest sui checkpoint that indexer committed to DB",
registry,
)
.unwrap(),
latest_committed_eth_block: register_int_gauge_with_registry!(
"last_committed_eth_block",
"bridge_indexer_last_committed_eth_block",
"The latest eth block that indexer committed to DB",
registry,
)
.unwrap(),
last_synced_eth_block: register_int_gauge_with_registry!(
"last_synced_eth_block",
"bridge_indexer_last_synced_eth_block",
"The last eth block that indexer committed to DB",
registry,
)
.unwrap(),
tasks_remaining_checkpoints: register_int_gauge_vec_with_registry!(
"bridge_indexer_tasks_remaining_checkpoints",
"The remaining checkpoints for each task",
&["task_name"],
registry,
)
.unwrap(),
tasks_processed_checkpoints: register_int_counter_vec_with_registry!(
"bridge_indexer_tasks_processed_checkpoints",
"Total processed checkpoints for each task",
&["task_name"],
registry,
)
.unwrap(),
live_task_current_checkpoint: register_int_gauge_vec_with_registry!(
"bridge_indexer_live_task_current_checkpoint",
"Current checkpoint of live task",
&["task_name"],
registry,
)
.unwrap(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-bridge-indexer/src/sui_bridge_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use sui_bridge::events::{
MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed,
};
use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent};
use sui_indexer_builder::sui_datasource::CheckpointTxnData;
use sui_indexer_builder::Task;
use sui_types::effects::TransactionEffectsAPI;
use sui_types::event::Event;
Expand All @@ -27,6 +26,7 @@ use crate::metrics::BridgeIndexerMetrics;
use crate::postgres_manager::PgPool;
use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
use crate::sui_datasource::CheckpointTxnData;
use crate::{
models, schema, BridgeDataSource, ProcessedTxnData, SuiTxnError, TokenTransfer,
TokenTransferData, TokenTransferStatus,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::indexer_builder::{DataSender, Datasource};
use anyhow::Error;
use async_trait::async_trait;
use mysten_metrics::{metered_channel, spawn_monitored_task};
use prometheus::IntCounterVec;
use prometheus::IntGaugeVec;
use std::path::PathBuf;
use std::sync::Arc;
use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
};
use sui_indexer_builder::indexer_builder::{DataSender, Datasource};
use sui_sdk::SuiClient;
use sui_types::base_types::TransactionDigest;
use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData;
Expand All @@ -18,7 +20,8 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tracing::info;

use crate::metrics::BridgeIndexerMetrics;

pub struct SuiCheckpointDatasource {
remote_store_url: String,
Expand All @@ -27,6 +30,7 @@ pub struct SuiCheckpointDatasource {
checkpoint_path: PathBuf,
genesis_checkpoint: u64,
metrics: DataIngestionMetrics,
indexer_metrics: BridgeIndexerMetrics,
}
impl SuiCheckpointDatasource {
pub fn new(
Expand All @@ -36,13 +40,15 @@ impl SuiCheckpointDatasource {
checkpoint_path: PathBuf,
genesis_checkpoint: u64,
metrics: DataIngestionMetrics,
indexer_metrics: BridgeIndexerMetrics,
) -> Self {
SuiCheckpointDatasource {
remote_store_url,
sui_client,
concurrency,
checkpoint_path,
metrics,
indexer_metrics,
genesis_checkpoint,
}
}
Expand Down Expand Up @@ -97,6 +103,18 @@ impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
fn get_genesis_height(&self) -> u64 {
self.genesis_checkpoint
}

fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.tasks_remaining_checkpoints
}

fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

struct PerTaskInMemProgressStore {
Expand Down Expand Up @@ -144,7 +162,7 @@ pub type CheckpointTxnData = (CheckpointTransaction, u64, u64);
#[async_trait]
impl Worker for IndexerWorker<CheckpointTxnData> {
async fn process_checkpoint(&self, checkpoint: SuiCheckpointData) -> anyhow::Result<()> {
info!(
tracing::trace!(
"Received checkpoint [{}] {}: {}",
checkpoint.checkpoint_summary.epoch,
checkpoint.checkpoint_summary.sequence_number,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mysten-metrics.workspace = true
sui-types.workspace = true
sui-sdk.workspace = true
sui-data-ingestion-core.workspace = true
futures.workspace = true
tracing.workspace = true
prometheus.workspace = true
telemetry-subscribers.workspace = true
Expand Down
Loading

0 comments on commit 6110568

Please sign in to comment.