Skip to content

Commit

Permalink
cherry-pick: indexer reader: event and txn query in blocking tasks (#…
Browse files Browse the repository at this point in the history
…17170)

## Description 

event and txn query should be done in blocking tasks instead.

## Test plan 

tnt release tests

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
gegaowp authored Apr 16, 2024
1 parent db6e04d commit 637f269
Showing 1 changed file with 56 additions and 32 deletions.
88 changes: 56 additions & 32 deletions crates/sui-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,12 +778,20 @@ impl IndexerReader {
is_descending: bool,
) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
let cursor_tx_seq = if let Some(cursor) = cursor {
Some(self.run_query(|conn| {
transactions::dsl::transactions
.select(transactions::tx_sequence_number)
.filter(transactions::dsl::transaction_digest.eq(cursor.into_inner().to_vec()))
.first::<i64>(conn)
})?)
let tx_seq = self
.spawn_blocking(move |this| {
this.run_query(|conn| {
transactions::dsl::transactions
.select(transactions::tx_sequence_number)
.filter(
transactions::dsl::transaction_digest
.eq(cursor.into_inner().to_vec()),
)
.first::<i64>(conn)
})
})
.await?;
Some(tx_seq)
} else {
None
};
Expand Down Expand Up @@ -961,14 +969,17 @@ impl IndexerReader {
);

tracing::debug!("query transaction blocks: {}", query);

let tx_sequence_numbers = self
.run_query(|conn| diesel::sql_query(query.clone()).load::<TxSequenceNumber>(conn))?
.spawn_blocking(move |this| {
this.run_query(|conn| {
diesel::sql_query(query.clone()).load::<TxSequenceNumber>(conn)
})
})
.await?
.into_iter()
.map(|tsn| tsn.tx_sequence_number)
.collect::<Vec<_>>();

self.multi_get_transaction_block_response_by_sequence_numbers(
.collect::<Vec<i64>>();
self.multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
tx_sequence_numbers,
options,
Some(is_descending),
Expand All @@ -988,15 +999,21 @@ impl IndexerReader {
.await
}

async fn multi_get_transaction_block_response_by_sequence_numbers(
async fn multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
&self,
tx_sequence_numbers: Vec<i64>,
options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
// Some(true) for desc, Some(false) for asc, None for undefined order
is_descending: Option<bool>,
) -> Result<Vec<sui_json_rpc_types::SuiTransactionBlockResponse>, IndexerError> {
let stored_txes: Vec<StoredTransaction> =
self.multi_get_transactions_with_sequence_numbers(tx_sequence_numbers, is_descending)?;
let stored_txes: Vec<StoredTransaction> = self
.spawn_blocking(move |this| {
this.multi_get_transactions_with_sequence_numbers(
tx_sequence_numbers,
is_descending,
)
})
.await?;
self.stored_transaction_to_transaction_block(stored_txes, options)
.await
}
Expand Down Expand Up @@ -1093,25 +1110,32 @@ impl IndexerReader {
tx_digest,
event_seq,
} = cursor;
(
self.run_query(|conn| {
transactions::dsl::transactions
.select(transactions::tx_sequence_number)
.filter(
transactions::dsl::transaction_digest
.eq(tx_digest.into_inner().to_vec()),
)
.first::<i64>(conn)
})?,
event_seq,
)

let tx_seq = self
.spawn_blocking(move |this| {
this.run_query(|conn| {
transactions::dsl::transactions
.select(transactions::tx_sequence_number)
.filter(
transactions::dsl::transaction_digest
.eq(tx_digest.into_inner().to_vec()),
)
.first::<i64>(conn)
})
})
.await?;
(tx_seq, event_seq)
} else if descending_order {
let max_tx_seq: i64 = self.run_query(|conn| {
events::dsl::events
.select(events::tx_sequence_number)
.order(events::dsl::tx_sequence_number.desc())
.first::<i64>(conn)
})?;
let max_tx_seq: i64 = self
.spawn_blocking(move |this| {
this.run_query(|conn| {
events::dsl::events
.select(events::tx_sequence_number)
.order(events::dsl::tx_sequence_number.desc())
.first::<i64>(conn)
})
})
.await?;
(max_tx_seq + 1, 0)
} else {
(-1, 0)
Expand Down

0 comments on commit 637f269

Please sign in to comment.