Skip to content

Commit

Permalink
indexer: index tx_affected_addresses (#19355)
Browse files Browse the repository at this point in the history
## Description
Introduce `tx_affected_addresses` table -- a combination of
`tx_recipients` and `tx_senders`, to power the sender-or-recipient query
in GraphQL.

## Test plan

```
cargo build -p sui-indexer
```

Tests based on reading this field will be included with the PR
introducing changes to GraphQL.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [x] Indexer: Index the addresses affected by a transaction (either
because they are a sender or a recipient of the transaction).
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn committed Sep 13, 2024
1 parent dce2e06 commit aaa906f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS tx_affected_addresses;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE tx_affected_addresses (
tx_sequence_number BIGINT NOT NULL,
affected BYTEA NOT NULL,
sender BYTEA NOT NULL,
PRIMARY KEY(affected, tx_sequence_number)
);

CREATE INDEX tx_affected_addresses_tx_sequence_number_index ON tx_affected_addresses (tx_sequence_number);
CREATE INDEX tx_affected_addresses_sender ON tx_affected_addresses (sender, affected, tx_sequence_number);
31 changes: 29 additions & 2 deletions crates/sui-indexer/src/models/tx_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

use crate::{
schema::{
tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects,
tx_kinds, tx_recipients, tx_senders,
tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects,
tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
},
types::TxIndex,
};
use diesel::prelude::*;
use itertools::Itertools;

#[derive(QueryableByName)]
pub struct TxSequenceNumber {
Expand All @@ -22,6 +23,14 @@ pub struct TxDigest {
pub transaction_digest: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_affected_addresses)]
pub struct StoredTxAffected {
pub tx_sequence_number: i64,
pub affected: Vec<u8>,
pub sender: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_senders)]
pub struct StoredTxSenders {
Expand Down Expand Up @@ -99,6 +108,7 @@ impl TxIndex {
pub fn split(
self: TxIndex,
) -> (
Vec<StoredTxAffected>,
Vec<StoredTxSenders>,
Vec<StoredTxRecipients>,
Vec<StoredTxInputObject>,
Expand All @@ -110,10 +120,24 @@ impl TxIndex {
Vec<StoredTxKind>,
) {
let tx_sequence_number = self.tx_sequence_number as i64;
let tx_affected_addresses = self
.recipients
.iter()
.chain(self.payers.iter())
.chain(std::iter::once(&self.sender))
.unique()
.map(|a| StoredTxAffected {
tx_sequence_number,
affected: a.to_vec(),
sender: self.sender.to_vec(),
})
.collect();

let tx_sender = StoredTxSenders {
tx_sequence_number,
sender: self.sender.to_vec(),
};

let tx_recipients = self
.recipients
.iter()
Expand All @@ -123,6 +147,7 @@ impl TxIndex {
sender: self.sender.to_vec(),
})
.collect();

let tx_input_objects = self
.input_objects
.iter()
Expand All @@ -132,6 +157,7 @@ impl TxIndex {
sender: self.sender.to_vec(),
})
.collect();

let tx_changed_objects = self
.changed_objects
.iter()
Expand Down Expand Up @@ -197,6 +223,7 @@ impl TxIndex {
};

(
tx_affected_addresses,
vec![tx_sender],
tx_recipients,
tx_input_objects,
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ diesel::table! {
}
}

diesel::table! {
tx_affected_addresses (affected, tx_sequence_number) {
tx_sequence_number -> Int8,
affected -> Bytea,
sender -> Bytea,
}
}

diesel::table! {
tx_calls_fun (package, module, func, tx_sequence_number) {
tx_sequence_number -> Int8,
Expand Down Expand Up @@ -375,6 +383,7 @@ diesel::allow_tables_to_appear_in_same_query!(
protocol_configs,
pruner_cp_watermark,
transactions,
tx_affected_addresses,
tx_calls_fun,
tx_calls_mod,
tx_calls_pkg,
Expand Down
123 changes: 75 additions & 48 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ use crate::schema::{
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
event_struct_package, events, feature_flags, full_objects_history, objects, objects_history,
objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
tx_input_objects, tx_kinds, tx_recipients, tx_senders,
transactions, tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
};
use crate::store::transaction_with_retry;
use crate::types::EventIndex;
Expand Down Expand Up @@ -1025,56 +1025,76 @@ impl PgIndexerStore {
.checkpoint_db_commit_latency_tx_indices_chunks
.start_timer();
let len = indices.len();
let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
indices.into_iter().map(|i| i.split()).fold(
let (
affected_addresses,
senders,
recipients,
input_objects,
changed_objects,
pkgs,
mods,
funs,
digests,
kinds,
) = indices.into_iter().map(|i| i.split()).fold(
(
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
),
|(
mut tx_affected_addresses,
mut tx_senders,
mut tx_recipients,
mut tx_input_objects,
mut tx_changed_objects,
mut tx_pkgs,
mut tx_mods,
mut tx_funs,
mut tx_digests,
mut tx_kinds,
),
index| {
tx_affected_addresses.extend(index.0);
tx_senders.extend(index.1);
tx_recipients.extend(index.2);
tx_input_objects.extend(index.3);
tx_changed_objects.extend(index.4);
tx_pkgs.extend(index.5);
tx_mods.extend(index.6);
tx_funs.extend(index.7);
tx_digests.extend(index.8);
tx_kinds.extend(index.9);
(
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
),
|(
mut tx_senders,
mut tx_recipients,
mut tx_input_objects,
mut tx_changed_objects,
mut tx_pkgs,
mut tx_mods,
mut tx_funs,
mut tx_digests,
mut tx_kinds,
),
index| {
tx_senders.extend(index.0);
tx_recipients.extend(index.1);
tx_input_objects.extend(index.2);
tx_changed_objects.extend(index.3);
tx_pkgs.extend(index.4);
tx_mods.extend(index.5);
tx_funs.extend(index.6);
tx_digests.extend(index.7);
tx_kinds.extend(index.8);
(
tx_senders,
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
tx_mods,
tx_funs,
tx_digests,
tx_kinds,
)
},
);
tx_affected_addresses,
tx_senders,
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
tx_mods,
tx_funs,
tx_digests,
tx_kinds,
)
},
);

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(tx_affected_addresses::table)
.values(&affected_addresses)
.on_conflict_do_nothing()
.execute(conn)
.await?;

diesel::insert_into(tx_senders::table)
.values(&senders)
.on_conflict_do_nothing()
Expand Down Expand Up @@ -1383,6 +1403,13 @@ impl PgIndexerStore {
let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::delete(
tx_affected_addresses::table
.filter(tx_affected_addresses::tx_sequence_number.between(min_tx, max_tx)),
)
.execute(conn)
.await?;

diesel::delete(
tx_senders::table
.filter(tx_senders::tx_sequence_number.between(min_tx, max_tx)),
Expand Down

0 comments on commit aaa906f

Please sign in to comment.