Skip to content

Commit

Permalink
refactor(hubble): remove sync_next and always use batch syncing (#1933)
Browse files Browse the repository at this point in the history
A simplification of the current tendermint indexer. It can be simplified
more, but this already despaghettifies a lot.
  • Loading branch information
KaiserKarel authored May 20, 2024
2 parents 2b1434d + ada6fc3 commit 0b7c142
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 152 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion hubble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ contracts.workspace = true
ethers = { workspace = true, features = ["default", "std"] }
futures = { workspace = true, features = ["async-await"] }
hex.workspace = true
itertools = "0.13.0"
lazy_static = { workspace = true }
prometheus = { version = "0.13.3", features = ["process"] }
prost.workspace = true
protos = { workspace = true, features = ["client"] }
reqwest = { workspace = true, features = ["json", "blocking"] }
retry = "2.0.0"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "tls-rustls", "time", "macros", "json"] }
Expand Down
198 changes: 57 additions & 141 deletions hubble/src/tm.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use backon::{ExponentialBuilder, Retryable};
use color_eyre::eyre::{bail, Report};
use futures::{
future::{ready, TryFutureExt},
stream,
stream::TryStreamExt,
try_join,
};
use futures::{stream, stream::TryStreamExt};
use sqlx::{Acquire, Postgres};
use tendermint::block::Height;
use tendermint_rpc::{
Expand Down Expand Up @@ -71,29 +66,31 @@ impl Config {

// Fast sync protocol. We sync up to latest.height - batch-size + 1
if let Some(up_to) = should_fast_sync_up_to(&client, Self::BATCH_SIZE, height).await? {
info!(?chain_id.canonical, "starting fast sync protocol up to: {}", up_to);
debug!(?chain_id.canonical, "syncing with batch size {} up to height {}", Self::BATCH_SIZE, up_to);
loop {
let batch_end =
std::cmp::min(up_to.value(), height.value() + Self::BATCH_SIZE as u64);
if batch_end - height.value() != 20 {
break; // go back to the should_fast_sync_up_to. If this returns None, we continue to slow sync.
}

info!(?chain_id.canonical, "fast syncing for batch: {}..{}", height, batch_end);
debug!(?chain_id.canonical, "fast syncing for batch: {}..{}", height, batch_end);
let mut tx = pool.begin().await?;
height = batch_sync(&client, &mut tx, chain_id, Self::BATCH_SIZE, height).await?;
let next_height = fetch_and_insert_blocks(&client, &mut tx, chain_id, Self::BATCH_SIZE, height).await?.expect("batch sync with batch > 1 should error or succeed, but never reach head of chain");
tx.commit().await?;
info!(?chain_id.canonical, "indexed blocks {}..{}", height.value(), next_height.value());
height = next_height
}
}

info!(?chain_id.canonical, "continuing regular sync protocol");
info!(?chain_id.canonical, "syncing block by block");
let mut retry_count = 0;
loop {
debug!("starting regular sync protocol");
// Regular sync protocol. This fetches blocks one-by-one.
retry_count += 1;
let mut tx = pool.begin().await?;
match sync_next(&client, &mut tx, chain_id, height).await? {
match fetch_and_insert_blocks(&client, &mut tx, chain_id, 1, height).await? {
Some(h) => {
info!(?chain_id.canonical, "indexed block {}", &height);
height = h;
Expand Down Expand Up @@ -166,50 +163,71 @@ async fn should_fast_sync_up_to(
}
}

/// Uses batch processing to fast sync up to the provided height.
async fn batch_sync(
/// Fetches and inserts blocks into the database. Will attempt to use more optimal RPC calls (such as /blockchain)
/// if the batch size > 1.
///
/// Will return None if the node had no new blocks and no inserts were made, otherwise it will returns the last height inserted.
async fn fetch_and_insert_blocks(
client: &HttpClient,
tx: &mut sqlx::Transaction<'_, Postgres>,
chain_id: ChainId,
batch_size: u32,
from: Height,
) -> Result<Height, Report> {
) -> Result<Option<Height>, Report> {
use itertools::Either;

let min = from.value() as u32;
let max = min + batch_size - 1_u32;
debug!("fetching batch of headers from {} to {}", min, max);

let headers = client.blockchain(min, max).await?;
let headers = if batch_size > 1 {
Either::Left(
client
.blockchain(min, max)
.await?
.block_metas
.into_iter()
.map(|meta| meta.header),
)
} else {
// We do need this arm, because client.blockchain will error if max > latest block (instead of just returning min..latest).
match client.block(min).await {
Err(err) => {
if is_height_exceeded_error(&err) {
return Ok(None);
} else {
return Err(err.into());
}
}
Ok(val) => Either::Right(std::iter::once(val.block.header)),
}
};

let submit_blocks = postgres::insert_batch_blocks(
tx,
stream::iter(headers.block_metas.clone().into_iter().map(|meta| {
stream::iter(headers.clone().into_iter().map(|header| {
PgBlock {
chain_id,
hash: meta.header.hash().to_string(),
height: meta.header.height.value() as i32,
time: meta.header.time.into(),
data: serde_json::to_value(&meta.header)
hash: header.hash().to_string(),
height: header.height.value() as i32,
time: header.time.into(),
data: serde_json::to_value(&header)
.unwrap()
.replace_escape_chars(),
}
})),
);

let block_results = stream::iter(
headers
.block_metas
.clone()
.into_iter()
.rev()
.map(Ok::<_, Report>),
)
.and_then(|meta| async {
debug!("fetching block results for height {}", meta.header.height);
let block = client.block_results(meta.header.height).await?;
let txs = fetch_transactions_for_block(client, meta.header.height, None).await?;
Ok((meta, block, txs))
})
.try_collect();
let block_results = stream::iter(headers.clone().into_iter().rev().map(Ok::<_, Report>))
.and_then(|header| async {
debug!("fetching block results for height {}", header.height);
let block = (|| client.block_results(header.height))
.retry(&ExponentialBuilder::default())
.await?;
let txs = fetch_transactions_for_block(client, header.height, None).await?;
Ok((header, block, txs))
})
.try_collect();

// let (submit_blocks, block_results) = join!(submit_blocks, block_results);
submit_blocks.await?;
Expand All @@ -219,10 +237,10 @@ async fn batch_sync(
let mut events = Vec::with_capacity(block_results.len() * 4 * 10);

let transactions =
block_results.into_iter().flat_map(|(meta, block, txs)| {
block_results.into_iter().flat_map(|(header, block, txs)| {
let block_height: i32 = block.height.value().try_into().unwrap();
let block_hash = meta.header.hash().to_string();
let time: OffsetDateTime = meta.header.time.into();
let block_hash = header.hash().to_string();
let time: OffsetDateTime = header.time.into();
let mut block_index = 0;
let finalize_block_events = block.events(chain_id, block_hash.clone(), time);

Expand Down Expand Up @@ -273,109 +291,7 @@ async fn batch_sync(
});
postgres::insert_batch_transactions(tx, stream::iter(transactions)).await?;
postgres::insert_batch_events(tx, stream::iter(events)).await?;
Ok((from.value() as u32 + headers.block_metas.len() as u32).into())
}

async fn sync_next(
client: &HttpClient,
tx: &mut sqlx::Transaction<'_, Postgres>,
chain_id: ChainId,
block_height: Height,
) -> Result<Option<Height>, Report> {
// If we're caught up indexing to the latest height, this will error. In that case,
// we retry until we obtain the next header.
debug!("fetching block header for height: {}", &block_height);
let header = match client.block(block_height).await {
Err(err) => {
if is_height_exceeded_error(&err) {
return Ok(None);
} else {
return Err(err.into());
}
}
Ok(val) => val.block.header,
};
debug!("fetching block results for height: {}", &block_height);

let (block, finalize_events) = match client.block_results(block_height).await {
Err(err) => {
if is_height_exceeded_error(&err) {
return Ok(None);
} else {
return Err(err.into());
}
}
Ok(block) => (
PgBlock {
chain_id,
hash: header.hash().to_string(),
height: block_height.value().try_into().unwrap(),
time: header.time.into(),
data: serde_json::to_value(&header)
.unwrap()
.replace_escape_chars(),
},
block.events(chain_id, header.hash().to_string(), header.time.into()),
),
};

let txs = fetch_transactions_for_block(client, block_height, None);
let (_, txs) = try_join!(
postgres::insert_batch_blocks(tx, stream::once(ready(block))).map_err(Report::from),
txs
)?;

let mut events = vec![];
let mut block_index = 0;

let txs: Vec<_> = txs
.iter()
.map(|tx| {
tx.tx_result
.events
.iter()
.enumerate()
.for_each(|(i, event)| {
events.push(PgEvent {
chain_id,
block_hash: header.hash().to_string(),
block_height: block_height.value().try_into().unwrap(),
time: header.time.into(),
data: serde_json::to_value(event).unwrap().replace_escape_chars(),
transaction_hash: Some(tx.hash.to_string()),
transaction_index: Some(i as i32),
block_index,
});
block_index += 1;
});
PgTransaction {
chain_id,
block_hash: header.hash().to_string(),
block_height: block_height.value().try_into().unwrap(),
time: header.time.into(),
data: serde_json::to_value(tx).unwrap().replace_escape_chars(),
hash: tx.hash.to_string(),
index: tx.index as i32,
}
})
.collect();

let events_len = events.len();

let events = events
.into_iter()
.chain(finalize_events)
.enumerate()
.map(|(i, e)| PgEvent {
block_index: i as i32,
..e
});

postgres::insert_batch_transactions(tx, stream::iter(txs)).await?;
postgres::insert_batch_events(tx, stream::iter(events)).await?;

debug!("found {} events for block {}", events_len, &block_height);
Ok(Some(block_height.increment()))
Ok(Some((from.value() as u32 + headers.len() as u32).into()))
}

async fn fetch_transactions_for_block(
Expand Down

0 comments on commit 0b7c142

Please sign in to comment.