(
+ self,
+ start_from_checkpoint: u64,
+ genesis_checkpoint: u64,
+ persistent: P,
+ ) -> Indexer
+ where
+ P: Persistent,
+ {
+ Indexer {
+ name: self.name,
+ storage: persistent,
+ datasource: self.datasource.into(),
+ backfill_strategy: self.backfill_strategy,
+ disable_live_task: self.disable_live_task,
+ start_from_checkpoint,
+ data_mapper: self.data_mapper,
+ genesis_checkpoint,
+ }
+ }
+
+ pub fn with_backfill_strategy(mut self, backfill: BackfillStrategy) -> Self {
+ self.backfill_strategy = backfill;
+ self
+ }
+
+ pub fn disable_live_task(mut self) -> Self {
+ self.disable_live_task = true;
+ self
+ }
+}
+
+pub struct Indexer {
+ name: String,
+ storage: P,
+ datasource: Arc,
+ data_mapper: M,
+ backfill_strategy: BackfillStrategy,
+ disable_live_task: bool,
+ start_from_checkpoint: u64,
+ genesis_checkpoint: u64,
+}
+
+impl Indexer
{
+ pub async fn start(mut self) -> Result<(), Error>
+ where
+ D: Datasource + 'static + Send + Sync,
+ M: DataMapper + 'static + Clone,
+ P: Persistent + 'static,
+ T: Send,
+ {
+ // Update tasks first
+ let tasks = self.storage.tasks(&self.name)?;
+ // create checkpoint workers base on backfill config and existing tasks in the db
+ match tasks.live_task() {
+ None => {
+ // if diable_live_task is set, we should not have any live task in the db
+ if !self.disable_live_task {
+ // Scenario 1: No task in database, start live task and backfill tasks
+ self.storage.register_task(
+ format!("{} - Live", self.name),
+ self.start_from_checkpoint,
+ i64::MAX,
+ )?;
+ }
+
+ // Create backfill tasks
+ if self.start_from_checkpoint != self.genesis_checkpoint {
+ self.create_backfill_tasks(self.genesis_checkpoint)?
+ }
+ }
+ Some(mut live_task) => {
+ if self.disable_live_task {
+ // TODO: delete task
+ // self.storage.delete_task(live_task.task_name.clone())?;
+ } else if self.start_from_checkpoint > live_task.checkpoint {
+ // Scenario 2: there are existing tasks in DB and start_from_checkpoint > current checkpoint
+ // create backfill task to finish at start_from_checkpoint
+ // update live task to start from start_from_checkpoint and finish at u64::MAX
+ self.create_backfill_tasks(live_task.checkpoint)?;
+ live_task.checkpoint = self.start_from_checkpoint;
+ self.storage.update_task(live_task)?;
+ } else {
+ // Scenario 3: start_from_checkpoint < current checkpoint
+ // ignore start_from_checkpoint, resume all task as it is.
+ }
+ }
+ }
+
+ // get updated tasks from storage and start workers
+ let updated_tasks = self.storage.tasks(&self.name)?;
+ // Start latest checkpoint worker
+ // Tasks are ordered in checkpoint descending order, realtime update task always come first
+ // tasks won't be empty here, ok to unwrap.
+ let backfill_tasks;
+ let live_task_future = if self.disable_live_task {
+ backfill_tasks = updated_tasks;
+ None
+ } else {
+ let (_live_task, _backfill_tasks) = updated_tasks.split_first().unwrap();
+
+ backfill_tasks = _backfill_tasks.to_vec();
+ let live_task = _live_task;
+
+ Some(self.datasource.start_ingestion_task(
+ live_task.task_name.clone(),
+ live_task.checkpoint,
+ live_task.target_checkpoint,
+ self.storage.clone(),
+ self.data_mapper.clone(),
+ ))
+ };
+
+ let backfill_tasks = backfill_tasks.to_vec();
+ let storage_clone = self.storage.clone();
+ let data_mapper_clone = self.data_mapper.clone();
+ let datasource_clone = self.datasource.clone();
+
+ let handle = spawn_monitored_task!(async {
+ // Execute task one by one
+ for backfill_task in backfill_tasks {
+ datasource_clone
+ .start_ingestion_task(
+ backfill_task.task_name.clone(),
+ backfill_task.checkpoint,
+ backfill_task.target_checkpoint,
+ storage_clone.clone(),
+ data_mapper_clone.clone(),
+ )
+ .await
+ .expect("Backfill task failed");
+ }
+ });
+
+ if let Some(live_task_future) = live_task_future {
+ live_task_future.await?;
+ }
+
+ tokio::try_join!(handle)?;
+
+ Ok(())
+ }
+
+ // Create backfill tasks according to backfill strategy
+ fn create_backfill_tasks(&mut self, mut current_cp: u64) -> Result<(), Error>
+ where
+ P: Persistent + 'static,
+ {
+ match self.backfill_strategy {
+ BackfillStrategy::Simple => self.storage.register_task(
+ format!("{} - backfill - {}", self.name, self.start_from_checkpoint),
+ current_cp + 1,
+ self.start_from_checkpoint as i64,
+ ),
+ BackfillStrategy::Partitioned { task_size } => {
+ while current_cp < self.start_from_checkpoint {
+ let target_cp = min(current_cp + task_size, self.start_from_checkpoint);
+ self.storage.register_task(
+ format!("{} - backfill - {target_cp}", self.name),
+ current_cp + 1,
+ target_cp as i64,
+ )?;
+ current_cp = target_cp;
+ }
+ Ok(())
+ }
+ BackfillStrategy::Disabled => Ok(()),
+ }
+ }
+}
+
+pub trait Persistent: IndexerProgressStore + Sync + Send + Clone {
+ fn write(&self, data: Vec) -> Result<(), Error>;
+}
+
+#[async_trait]
+pub trait IndexerProgressStore: Send {
+ async fn load_progress(&self, task_name: String) -> anyhow::Result;
+ async fn save_progress(
+ &mut self,
+ task_name: String,
+ checkpoint_number: u64,
+ ) -> anyhow::Result<()>;
+
+ fn tasks(&self, task_prefix: &str) -> Result, Error>;
+
+ fn register_task(
+ &mut self,
+ task_name: String,
+ checkpoint: u64,
+ target_checkpoint: i64,
+ ) -> Result<(), anyhow::Error>;
+
+ fn update_task(&mut self, task: Task) -> Result<(), Error>;
+}
+
+#[async_trait]
+pub trait Datasource {
+ async fn start_ingestion_task(
+ &self,
+ task_name: String,
+ starting_checkpoint: u64,
+ target_checkpoint: u64,
+ mut storage: P,
+ data_mapper: M,
+ ) -> Result<(), Error>
+ where
+ M: DataMapper + 'static,
+ P: Persistent + 'static,
+ {
+ // todo: add metrics for number of tasks
+ let (join_handle, mut data_channel) = self
+ .start_data_retrieval(task_name.clone(), starting_checkpoint, target_checkpoint)
+ .await?;
+ while let Some((block_number, data)) = data_channel.recv().await {
+ if !data.is_empty() {
+ let processed_data = data.into_iter().try_fold(vec![], |mut result, d| {
+ result.append(&mut data_mapper.map(d)?);
+ Ok::, Error>(result)
+ })?;
+ // TODO: we might be able to write data and progress in a single transaction.
+ storage.write(processed_data)?;
+ }
+ storage
+ .save_progress(task_name.clone(), block_number)
+ .await?;
+ }
+ join_handle.abort();
+ join_handle.await?
+ }
+
+ async fn start_data_retrieval(
+ &self,
+ task_name: String,
+ starting_checkpoint: u64,
+ target_checkpoint: u64,
+ ) -> Result<(JoinHandle>, Receiver<(u64, Vec)>), Error>;
+}
+
+pub struct SuiCheckpointDatasource {
+ remote_store_url: String,
+ concurrency: usize,
+ checkpoint_path: PathBuf,
+ metrics: DataIngestionMetrics,
+}
+impl SuiCheckpointDatasource {
+ pub fn new(
+ remote_store_url: String,
+ concurrency: usize,
+ checkpoint_path: PathBuf,
+ metrics: DataIngestionMetrics,
+ ) -> Self {
+ SuiCheckpointDatasource {
+ remote_store_url,
+ concurrency,
+ checkpoint_path,
+ metrics,
+ }
+ }
+}
+
+#[async_trait]
+impl Datasource for SuiCheckpointDatasource
+where
+ P: Persistent + 'static,
+ R: Sync + Send + 'static,
+{
+ async fn start_data_retrieval(
+ &self,
+ task_name: String,
+ starting_checkpoint: u64,
+ target_checkpoint: u64,
+ ) -> Result<
+ (
+ JoinHandle>,
+ Receiver<(u64, Vec)>,
+ ),
+ Error,
+ > {
+ let (exit_sender, exit_receiver) = oneshot::channel();
+ let progress_store = PerTaskInMemProgressStore {
+ current_checkpoint: starting_checkpoint,
+ exit_checkpoint: target_checkpoint,
+ exit_sender: Some(exit_sender),
+ };
+ let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone());
+ let (data_sender, data_receiver) = metered_channel::channel(
+ 1000,
+ &mysten_metrics::get_metrics()
+ .unwrap()
+ .channel_inflight
+ .with_label_values(&[&task_name]),
+ );
+ let worker = IndexerWorker::new(data_sender);
+ let worker_pool = WorkerPool::new(worker, task_name, self.concurrency);
+ executor.register(worker_pool).await?;
+ let checkpoint_path = self.checkpoint_path.clone();
+ let remote_store_url = self.remote_store_url.clone();
+ let join_handle = spawn_monitored_task!(async {
+ executor
+ .run(
+ checkpoint_path,
+ Some(remote_store_url),
+ vec![], // optional remote store access options
+ ReaderOptions::default(),
+ exit_receiver,
+ )
+ .await?;
+ Ok(())
+ });
+ Ok((join_handle, data_receiver))
+ }
+}
+
+pub enum BackfillStrategy {
+ Simple,
+ Partitioned { task_size: u64 },
+ Disabled,
+}
+
+pub trait DataMapper: Sync + Send {
+ fn map(&self, data: T) -> Result, anyhow::Error>;
+}
+
+struct PerTaskInMemProgressStore {
+ pub current_checkpoint: u64,
+ pub exit_checkpoint: u64,
+ pub exit_sender: Option>,
+}
+
+#[async_trait]
+impl ProgressStore for PerTaskInMemProgressStore {
+ async fn load(
+ &mut self,
+ _task_name: String,
+ ) -> Result {
+ Ok(self.current_checkpoint)
+ }
+
+ async fn save(
+ &mut self,
+ _task_name: String,
+ checkpoint_number: CheckpointSequenceNumber,
+ ) -> anyhow::Result<()> {
+ if checkpoint_number >= self.exit_checkpoint {
+ if let Some(sender) = self.exit_sender.take() {
+ let _ = sender.send(());
+ }
+ }
+ self.current_checkpoint = checkpoint_number;
+ Ok(())
+ }
+}
+
+pub struct IndexerWorker {
+ data_sender: metered_channel::Sender<(u64, Vec)>,
+}
+
+impl IndexerWorker {
+ pub fn new(data_sender: metered_channel::Sender<(u64, Vec)>) -> Self {
+ Self { data_sender }
+ }
+}
+
+pub type CheckpointTxnData = (CheckpointTransaction, u64, u64);
+
+#[async_trait]
+impl Worker for IndexerWorker {
+ async fn process_checkpoint(&self, checkpoint: CheckpointData) -> anyhow::Result<()> {
+ info!(
+ "Received checkpoint [{}] {}: {}",
+ checkpoint.checkpoint_summary.epoch,
+ checkpoint.checkpoint_summary.sequence_number,
+ checkpoint.transactions.len(),
+ );
+ let checkpoint_num = checkpoint.checkpoint_summary.sequence_number;
+ let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;
+
+ let transactions = checkpoint
+ .transactions
+ .into_iter()
+ .map(|tx| (tx, checkpoint_num, timestamp_ms))
+ .collect();
+ Ok(self
+ .data_sender
+ .send((checkpoint_num, transactions))
+ .await?)
+ }
+}
diff --git a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs b/crates/sui-bridge-indexer/src/latest_eth_syncer.rs
deleted file mode 100644
index 150353969f736..0000000000000
--- a/crates/sui-bridge-indexer/src/latest_eth_syncer.rs
+++ /dev/null
@@ -1,168 +0,0 @@
-// Copyright (c) Mysten Labs, Inc.
-// SPDX-License-Identifier: Apache-2.0
-
-//! The EthSyncer module is responsible for synchronizing Events emitted on Ethereum blockchain from
-//! concerned contracts. Each contract is associated with a start block number, and the syncer will
-//! only query from that block number onwards. The syncer also keeps track of the last
-//! block on Ethereum and will only query for events up to that block number.
-
-use ethers::providers::{Http, JsonRpcClient, Middleware, Provider};
-use ethers::types::Address as EthAddress;
-use mysten_metrics::metered_channel::{channel, Receiver, Sender};
-use mysten_metrics::spawn_logged_monitored_task;
-use std::cmp::min;
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::Instant;
-use sui_bridge::error::BridgeResult;
-use sui_bridge::eth_client::EthClient;
-use sui_bridge::retry_with_max_elapsed_time;
-use sui_bridge::types::RawEthLog;
-use tokio::task::JoinHandle;
-use tokio::time::{self, Duration};
-use tracing::{error, info};
-
-use crate::metrics::BridgeIndexerMetrics;
-
-const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
-const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
-const BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(2);
-
-pub struct LatestEthSyncer {
- eth_client: Arc>,
- provider: Arc>,
- contract_addresses: EthTargetAddresses,
-}
-
-/// Map from contract address to their start block.
-pub type EthTargetAddresses = HashMap;
-
-#[allow(clippy::new_without_default)]
-impl LatestEthSyncer
-where
- P: JsonRpcClient + 'static,
-{
- pub fn new(
- eth_client: Arc>,
- provider: Arc>,
- contract_addresses: EthTargetAddresses,
- ) -> Self {
- Self {
- eth_client,
- provider,
- contract_addresses,
- }
- }
-
- pub async fn run(
- self,
- metrics: BridgeIndexerMetrics,
- ) -> BridgeResult<(
- Vec>,
- Receiver<(EthAddress, u64, Vec)>,
- )> {
- let (eth_evnets_tx, eth_events_rx) = channel(
- ETH_EVENTS_CHANNEL_SIZE,
- &mysten_metrics::get_metrics()
- .unwrap()
- .channel_inflight
- .with_label_values(&["eth_events_queue"]),
- );
-
- let mut task_handles = vec![];
- for (contract_address, start_block) in self.contract_addresses {
- let eth_events_tx_clone = eth_evnets_tx.clone();
- let eth_client_clone = self.eth_client.clone();
- let provider_clone = self.provider.clone();
- let metrics_clone = metrics.clone();
- task_handles.push(spawn_logged_monitored_task!(
- Self::run_event_listening_task(
- contract_address,
- start_block,
- provider_clone,
- eth_events_tx_clone,
- eth_client_clone,
- metrics_clone,
- )
- ));
- }
- Ok((task_handles, eth_events_rx))
- }
-
- async fn run_event_listening_task(
- contract_address: EthAddress,
- mut start_block: u64,
- provider: Arc>,
- events_sender: Sender<(EthAddress, u64, Vec)>,
- eth_client: Arc>,
- metrics: BridgeIndexerMetrics,
- ) {
- info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
- let mut interval = time::interval(BLOCK_QUERY_INTERVAL);
- interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
- loop {
- interval.tick().await;
- let Ok(Ok(new_block)) =
- retry_with_max_elapsed_time!(provider.get_block_number(), Duration::from_secs(600))
- else {
- error!("Failed to get latest block from eth client after retry");
- continue;
- };
-
- let new_block: u64 = new_block.as_u64();
-
- if new_block < start_block {
- info!(
- contract_address=?contract_address,
- "New block {} is smaller than start block {}, ignore",
- new_block,
- start_block,
- );
- continue;
- }
-
- // Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks.
- let end_block = min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block);
- let timer = Instant::now();
- let Ok(Ok(events)) = retry_with_max_elapsed_time!(
- eth_client.get_raw_events_in_range(contract_address, start_block, end_block),
- Duration::from_secs(600)
- ) else {
- error!("Failed to get events from eth client after retry");
- continue;
- };
- info!(
- ?contract_address,
- start_block,
- end_block,
- "Querying eth events took {:?}",
- timer.elapsed()
- );
- let len = events.len();
- let last_block = events.last().map(|e| e.block_number);
-
- // Note 1: we always events to the channel even when it is empty. This is because of
- // how `eth_getLogs` api is designed - we want cursor to move forward continuously.
-
- // Note 2: it's extremely critical to make sure the Logs we send via this channel
- // are complete per block height. Namely, we should never send a partial list
- // of events for a block. Otherwise, we may end up missing events.
- events_sender
- .send((contract_address, end_block, events))
- .await
- .expect("All Eth event channel receivers are closed");
- if len != 0 {
- info!(
- ?contract_address,
- start_block, end_block, "Observed {len} new Eth events",
- );
- }
- if let Some(last_block) = last_block {
- metrics
- .last_synced_unfinalized_eth_block
- .set(last_block as i64);
- }
- start_block = end_block + 1;
- }
- }
-}
diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs
index 2aed0d793e3c2..3a4320e9b316c 100644
--- a/crates/sui-bridge-indexer/src/lib.rs
+++ b/crates/sui-bridge-indexer/src/lib.rs
@@ -1,14 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
-use crate::models::TokenTransferData as DBTokenTransferData;
-use crate::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
use std::fmt::{Display, Formatter};
+
use sui_types::base_types::{SuiAddress, TransactionDigest};
+use crate::models::TokenTransferData as DBTokenTransferData;
+use crate::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
+
pub mod config;
-pub mod eth_worker;
-pub mod latest_eth_syncer;
pub mod metrics;
pub mod models;
pub mod postgres_manager;
@@ -16,9 +16,13 @@ pub mod schema;
pub mod sui_checkpoint_ingestion;
pub mod sui_transaction_handler;
pub mod sui_transaction_queries;
-pub mod sui_worker;
pub mod types;
+pub mod indexer_builder;
+
+pub mod eth_bridge_indexer;
+pub mod sui_bridge_indexer;
+
#[derive(Clone)]
pub enum ProcessedTxnData {
TokenTransfer(TokenTransfer),
@@ -102,7 +106,6 @@ impl SuiTxnError {
#[derive(Clone)]
pub(crate) enum TokenTransferStatus {
- DepositedUnfinalized,
Deposited,
Approved,
Claimed,
@@ -111,7 +114,6 @@ pub(crate) enum TokenTransferStatus {
impl Display for TokenTransferStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let str = match self {
- TokenTransferStatus::DepositedUnfinalized => "DepositedUnfinalized",
TokenTransferStatus::Deposited => "Deposited",
TokenTransferStatus::Approved => "Approved",
TokenTransferStatus::Claimed => "Claimed",
diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs
index ae873496f0d9e..889383e7f0239 100644
--- a/crates/sui-bridge-indexer/src/main.rs
+++ b/crates/sui-bridge-indexer/src/main.rs
@@ -1,30 +1,36 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
+use std::env;
+use std::path::PathBuf;
+use std::sync::Arc;
+
use anyhow::Result;
use clap::*;
+use ethers::providers::Http;
+use ethers::providers::Middleware;
+use ethers::providers::Provider;
+use sui_bridge_indexer::eth_bridge_indexer::EthSubscriptionDatasource;
+use sui_bridge_indexer::eth_bridge_indexer::EthSyncDatasource;
+use sui_bridge_indexer::indexer_builder::BackfillStrategy;
+use tokio::task::JoinHandle;
+use tracing::info;
+
+use mysten_metrics::metered_channel::channel;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
-use std::collections::HashSet;
-use std::env;
-use std::path::PathBuf;
-use std::sync::Arc;
-use sui_bridge::eth_client::EthClient;
use sui_bridge::metrics::BridgeMetrics;
-use sui_bridge_indexer::eth_worker::EthBridgeWorker;
+use sui_bridge_indexer::config::IndexerConfig;
+use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
+use sui_bridge_indexer::indexer_builder::{IndexerBuilder, SuiCheckpointDatasource};
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
+use sui_bridge_indexer::sui_bridge_indexer::{PgBridgePersistent, SuiBridgeDataMapper};
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
+use sui_config::Config;
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_sdk::SuiClientBuilder;
-use tokio::task::JoinHandle;
-
-use mysten_metrics::metered_channel::channel;
-use sui_bridge_indexer::config::IndexerConfig;
-use sui_bridge_indexer::sui_checkpoint_ingestion::SuiCheckpointSyncer;
-use sui_config::Config;
-use tracing::info;
#[derive(Parser, Clone, Debug)]
struct Args {
@@ -50,7 +56,6 @@ async fn main() -> Result<()> {
.join("config.yaml")
};
let config = IndexerConfig::load(&config_path)?;
- let config_clone = config.clone();
// Init metrics server
let registry_service = start_prometheus_server(
@@ -70,35 +75,56 @@ async fn main() -> Result<()> {
let ingestion_metrics = DataIngestionMetrics::new(®istry);
let bridge_metrics = Arc::new(BridgeMetrics::new(®istry));
- // unwrap safe: db_url must be set in `load_config` above
let db_url = config.db_url.clone();
+ let datastore = PgBridgePersistent::new(get_connection_pool(db_url.clone()));
- // TODO: retry_with_max_elapsed_time
- let eth_worker = EthBridgeWorker::new(
- get_connection_pool(db_url.clone()),
- bridge_metrics.clone(),
- indexer_meterics.clone(),
- config.clone(),
- )?;
-
- let eth_client = Arc::new(
- EthClient::::new(
- &config.eth_rpc_url,
- HashSet::from_iter(vec![eth_worker.bridge_address()]),
- bridge_metrics.clone(),
- )
- .await?,
+ let provider = Arc::new(
+ Provider::::try_from(config.eth_rpc_url.clone())?
+ .interval(std::time::Duration::from_millis(2000)),
);
- let unfinalized_handle = eth_worker
- .start_indexing_unfinalized_events(eth_client.clone())
- .await?;
- let finalized_handle = eth_worker
- .start_indexing_finalized_events(eth_client.clone())
- .await?;
- let handles = vec![unfinalized_handle, finalized_handle];
+ let current_block = provider.get_block_number().await?.as_u64();
+ let subscription_end_block = u64::MAX;
+
+ // Start the eth subscription indexer
+
+ let eth_subscription_datasource = EthSubscriptionDatasource::new(
+ config.eth_sui_bridge_contract_address.clone(),
+ config.eth_ws_url.clone(),
+ indexer_meterics.clone(),
+ )?;
+ let eth_subscription_indexer = IndexerBuilder::new(
+ "EthBridgeSubscriptionIndexer",
+ eth_subscription_datasource,
+ EthDataMapper {
+ metrics: indexer_meterics.clone(),
+ },
+ )
+ .with_backfill_strategy(BackfillStrategy::Disabled)
+ .build(current_block, subscription_end_block, datastore.clone());
+ let subscription_indexer_fut = spawn_logged_monitored_task!(eth_subscription_indexer.start());
+
+ // Start the eth sync indexer
+ let eth_sync_datasource = EthSyncDatasource::new(
+ config.eth_sui_bridge_contract_address.clone(),
+ config.eth_rpc_url.clone(),
+ indexer_meterics.clone(),
+ bridge_metrics.clone(),
+ )?;
+ let eth_sync_indexer = IndexerBuilder::new(
+ "EthBridgeSyncIndexer",
+ eth_sync_datasource,
+ EthDataMapper {
+ metrics: indexer_meterics.clone(),
+ },
+ )
+ .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
+ .disable_live_task()
+ .build(current_block, config.start_block, datastore.clone());
+ let sync_indexer_fut = spawn_logged_monitored_task!(eth_sync_indexer.start());
if let Some(sui_rpc_url) = config.sui_rpc_url.clone() {
+ // Todo: impl datasource for sui RPC datasource
start_processing_sui_checkpoints_by_querying_txns(
sui_rpc_url,
db_url.clone(),
@@ -107,13 +133,30 @@ async fn main() -> Result<()> {
)
.await?;
} else {
- let pg_pool = get_connection_pool(db_url.clone());
- SuiCheckpointSyncer::new(pg_pool, config.bridge_genesis_checkpoint)
- .start(&config_clone, indexer_meterics, ingestion_metrics)
- .await?;
+ let sui_checkpoint_datasource = SuiCheckpointDatasource::new(
+ config.remote_store_url,
+ config.concurrency as usize,
+ config.checkpoints_path.clone().into(),
+ ingestion_metrics.clone(),
+ );
+ let indexer = IndexerBuilder::new(
+ "SuiBridgeIndexer",
+ sui_checkpoint_datasource,
+ SuiBridgeDataMapper {
+ metrics: indexer_meterics.clone(),
+ },
+ )
+ .build(
+ config
+ .resume_from_checkpoint
+ .unwrap_or(config.bridge_genesis_checkpoint),
+ config.bridge_genesis_checkpoint,
+ datastore,
+ );
+ indexer.start().await?;
}
// We are not waiting for the sui tasks to finish here, which is ok.
- futures::future::join_all(handles).await;
+ futures::future::join_all(vec![subscription_indexer_fut, sync_indexer_fut]).await;
Ok(())
}
diff --git a/crates/sui-bridge-indexer/src/metrics.rs b/crates/sui-bridge-indexer/src/metrics.rs
index f4ccf5dfbedfc..b57a3121f4ae3 100644
--- a/crates/sui-bridge-indexer/src/metrics.rs
+++ b/crates/sui-bridge-indexer/src/metrics.rs
@@ -18,9 +18,8 @@ pub struct BridgeIndexerMetrics {
pub(crate) total_eth_token_transfer_claimed: IntCounter,
pub(crate) total_eth_bridge_txn_other: IntCounter,
pub(crate) last_committed_sui_checkpoint: IntGauge,
- pub(crate) last_committed_eth_block: IntGauge,
- pub(crate) last_synced_unfinalized_eth_block: IntGauge,
- pub(crate) last_committed_unfinalized_eth_block: IntGauge,
+ pub(crate) latest_committed_eth_block: IntGauge,
+ pub(crate) last_synced_eth_block: IntGauge,
}
impl BridgeIndexerMetrics {
@@ -86,21 +85,15 @@ impl BridgeIndexerMetrics {
registry,
)
.unwrap(),
- last_committed_eth_block: register_int_gauge_with_registry!(
+ latest_committed_eth_block: register_int_gauge_with_registry!(
"last_committed_eth_block",
"The latest eth block that indexer committed to DB",
registry,
)
.unwrap(),
- last_synced_unfinalized_eth_block: register_int_gauge_with_registry!(
- "last_synced_unfinalized_eth_block",
- "The latest unfinalized block that indexer synced",
- registry,
- )
- .unwrap(),
- last_committed_unfinalized_eth_block: register_int_gauge_with_registry!(
- "last_committed_unfinalized_eth_block",
- "The latest unfinalized block that indexer comitted to DB",
+ last_synced_eth_block: register_int_gauge_with_registry!(
+ "last_synced_eth_block",
+ "The last eth block that indexer committed to DB",
registry,
)
.unwrap(),
diff --git a/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs
new file mode 100644
index 0000000000000..41f219beb18f1
--- /dev/null
+++ b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs
@@ -0,0 +1,291 @@
+// Copyright (c) Mysten Labs, Inc.
+// SPDX-License-Identifier: Apache-2.0
+
+use anyhow::{anyhow, Error};
+use async_trait::async_trait;
+use diesel::dsl::now;
+use diesel::{Connection, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper};
+use diesel::{ExpressionMethods, TextExpressionMethods};
+use tracing::info;
+
+use sui_bridge::events::{
+ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed,
+};
+use sui_types::effects::TransactionEffectsAPI;
+use sui_types::event::Event;
+use sui_types::execution_status::ExecutionStatus;
+use sui_types::full_checkpoint_content::CheckpointTransaction;
+use sui_types::{BRIDGE_ADDRESS, SUI_BRIDGE_OBJECT_ID};
+
+use crate::indexer_builder::{CheckpointTxnData, DataMapper, IndexerProgressStore, Persistent};
+use crate::metrics::BridgeIndexerMetrics;
+use crate::postgres_manager::PgPool;
+use crate::schema::progress_store::{columns, dsl};
+use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
+use crate::sui_checkpoint_ingestion::Task;
+use crate::{
+ models, schema, BridgeDataSource, ProcessedTxnData, SuiTxnError, TokenTransfer,
+ TokenTransferData, TokenTransferStatus,
+};
+
+/// Persistent layer impl
+#[derive(Clone)]
+pub struct PgBridgePersistent {
+ pool: PgPool,
+}
+
+impl PgBridgePersistent {
+ pub fn new(pool: PgPool) -> Self {
+ Self { pool }
+ }
+}
+
+// TODO: this is shared between SUI and ETH, move to different file.
+impl Persistent for PgBridgePersistent {
+ fn write(&self, data: Vec) -> Result<(), Error> {
+ if data.is_empty() {
+ return Ok(());
+ }
+ let connection = &mut self.pool.get()?;
+ connection.transaction(|conn| {
+ for d in data {
+ match d {
+ ProcessedTxnData::TokenTransfer(t) => {
+ diesel::insert_into(token_transfer::table)
+ .values(&t.to_db())
+ .on_conflict_do_nothing()
+ .execute(conn)?;
+
+ if let Some(d) = t.to_data_maybe() {
+ diesel::insert_into(token_transfer_data::table)
+ .values(&d)
+ .on_conflict_do_nothing()
+ .execute(conn)?;
+ }
+ }
+ ProcessedTxnData::Error(e) => {
+ diesel::insert_into(sui_error_transactions::table)
+ .values(&e.to_db())
+ .on_conflict_do_nothing()
+ .execute(conn)?;
+ }
+ }
+ }
+ Ok(())
+ })
+ }
+}
+
+#[async_trait]
+impl IndexerProgressStore for PgBridgePersistent {
+ async fn load_progress(&self, task_name: String) -> anyhow::Result {
+ let mut conn = self.pool.get()?;
+ let cp: Option = dsl::progress_store
+ .find(&task_name)
+ .select(models::ProgressStore::as_select())
+ .first(&mut conn)
+ .optional()?;
+ Ok(cp
+ .ok_or(anyhow!("Cannot found progress for task {task_name}"))?
+ .checkpoint as u64)
+ }
+
+ async fn save_progress(
+ &mut self,
+ task_name: String,
+ checkpoint_number: u64,
+ ) -> anyhow::Result<()> {
+ let mut conn = self.pool.get()?;
+ diesel::insert_into(schema::progress_store::table)
+ .values(&models::ProgressStore {
+ task_name,
+ checkpoint: checkpoint_number as i64,
+ // Target checkpoint and timestamp will only be written for new entries
+ target_checkpoint: i64::MAX,
+ // Timestamp is defaulted to current time in DB if None
+ timestamp: None,
+ })
+ .on_conflict(dsl::task_name)
+ .do_update()
+ .set((
+ columns::checkpoint.eq(checkpoint_number as i64),
+ columns::timestamp.eq(now),
+ ))
+ .execute(&mut conn)?;
+ Ok(())
+ }
+
+ fn tasks(&self, prefix: &str) -> Result, anyhow::Error> {
+ let mut conn = self.pool.get()?;
+ // get all unfinished tasks
+ let cp: Vec = dsl::progress_store
+ // TODO: using like could be error prone, change the progress store schema to stare the task name properly.
+ .filter(columns::task_name.like(format!("{prefix} - %")))
+ .filter(columns::checkpoint.lt(columns::target_checkpoint))
+ .order_by(columns::target_checkpoint.desc())
+ .load(&mut conn)?;
+ Ok(cp.into_iter().map(|d| d.into()).collect())
+ }
+
+ fn register_task(
+ &mut self,
+ task_name: String,
+ checkpoint: u64,
+ target_checkpoint: i64,
+ ) -> Result<(), anyhow::Error> {
+ let mut conn = self.pool.get()?;
+ diesel::insert_into(schema::progress_store::table)
+ .values(models::ProgressStore {
+ task_name,
+ checkpoint: checkpoint as i64,
+ target_checkpoint,
+ // Timestamp is defaulted to current time in DB if None
+ timestamp: None,
+ })
+ .execute(&mut conn)?;
+ Ok(())
+ }
+
+ fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
+ let mut conn = self.pool.get()?;
+ diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
+ .set((
+ columns::checkpoint.eq(task.checkpoint as i64),
+ columns::target_checkpoint.eq(task.target_checkpoint as i64),
+ columns::timestamp.eq(now),
+ ))
+ .execute(&mut conn)?;
+ Ok(())
+ }
+}
+
+/// Data mapper impl
+#[derive(Clone)]
+pub struct SuiBridgeDataMapper {
+ pub metrics: BridgeIndexerMetrics,
+}
+
+impl DataMapper for SuiBridgeDataMapper {
+ fn map(
+ &self,
+ (data, checkpoint_num, timestamp_ms): CheckpointTxnData,
+ ) -> Result, Error> {
+ self.metrics.total_sui_bridge_transactions.inc();
+ if !data
+ .input_objects
+ .iter()
+ .any(|obj| obj.id() == SUI_BRIDGE_OBJECT_ID)
+ {
+ return Ok(vec![]);
+ }
+
+ match &data.events {
+ Some(events) => {
+ let token_transfers = events.data.iter().try_fold(vec![], |mut result, ev| {
+ if let Some(data) = process_sui_event(ev, &data, checkpoint_num, timestamp_ms)?
+ {
+ result.push(data);
+ }
+ Ok::<_, anyhow::Error>(result)
+ })?;
+
+ if !token_transfers.is_empty() {
+ info!(
+ "SUI: Extracted {} bridge token transfer data entries for tx {}.",
+ token_transfers.len(),
+ data.transaction.digest()
+ );
+ }
+ Ok(token_transfers)
+ }
+ None => {
+ if let ExecutionStatus::Failure { error, command } = data.effects.status() {
+ Ok(vec![ProcessedTxnData::Error(SuiTxnError {
+ tx_digest: *data.transaction.digest(),
+ sender: data.transaction.sender_address(),
+ timestamp_ms,
+ failure_status: error.to_string(),
+ cmd_idx: command.map(|idx| idx as u64),
+ })])
+ } else {
+ Ok(vec![])
+ }
+ }
+ }
+ }
+}
+
+fn process_sui_event(
+ ev: &Event,
+ tx: &CheckpointTransaction,
+ checkpoint: u64,
+ timestamp_ms: u64,
+) -> Result