Skip to content

Commit

Permalink
feat(hubble): add fetch_transactions_for_block
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiserKarel committed Oct 15, 2023
1 parent ce690af commit 62da951
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion hubble/src/tm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hubble::hasura::*;
use tendermint::{block::Height, consensus::Params, genesis::Genesis, validator::Update};
use tendermint_rpc::{
dialect::v0_37::Event, endpoint::block_results::Response as BlockResponse, error::ErrorDetail,
response_error::Code, Client, Error, HttpClient,
response_error::Code, Client, Error, HttpClient, Order, query::{Query, EventType},
};
use tokio::time::{sleep, Duration};
use tracing::{debug, info};
Expand Down Expand Up @@ -185,6 +185,13 @@ async fn batch_sync<D: Datastore>(
let objects: Vec<_> = join_all(headers.block_metas.iter().rev().map(|header| async {
debug!("fetching block results for height {}", header.header.height);
let block = client.block_results(header.header.height).await?;
let len = if let Some(tx_results) = &block.txs_results {
Some(tx_results.len())
} else {
None
};

let txs = fetch_transactions_for_block(client, header.header.height, len).await?;
let events: Vec<_> = block
.events()
.enumerate()
Expand Down Expand Up @@ -310,6 +317,38 @@ async fn sync_next<D: Datastore>(
Ok(Some(height.increment()))
}

async fn fetch_transactions_for_block(client: &HttpClient, height: Height, expected: impl Into<Option<usize>>) -> Result<Vec<tendermint_rpc::endpoint::tx::Response>, Report> {
let query = Query::from(EventType::Tx).and_eq("tx.height", height.value());
let expected = expected.into();

let mut txs = if let Some(expected) = expected {
Vec::with_capacity(expected)
} else {
vec![]
};

for page in 0..u32::MAX {
let response = client.tx_search(query.clone(), false, page, 100, Order::Ascending).await?;
let len = response.txs.len();
txs.extend(response.txs);

// We always query for the maximum page size. If we get less items, we know pagination is done
if len < 100 {
break
}

// If we deduce the number from expected, we end pagination once we reach expected.
if txs.len() == expected.unwrap_or(usize::MAX) {
break
}
}

if let Some(expected) = expected {
assert_eq!(txs.len(), expected);
}
return Ok(txs);
}

fn into_many_blocks_input(
time: String,
value: (usize, StateChange),
Expand Down

0 comments on commit 62da951

Please sign in to comment.