From 59f45e9f8caa01934bf6f315a7202811085a6049 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Fri, 13 Sep 2024 19:00:02 +0100 Subject: [PATCH] indexer: index tx_affected_addresses (#19355) ## Description Introduce `tx_affected_addresses` table -- a combination of `tx_recipients` and `tx_senders`, to power the sender-or-recipient query in GraphQL. ## Test plan ``` cargo build -p sui-indexer ``` Tests based on reading this field will be included with the PR introducing changes to GraphQL. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [x] Indexer: Index the addresses affected by a transaction (either because they are a sender or a recipient of the transaction). - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../pg/2024-09-12-150939_tx_affected/down.sql | 1 + .../pg/2024-09-12-150939_tx_affected/up.sql | 9 ++ crates/sui-indexer/src/models/tx_indices.rs | 31 ++++- crates/sui-indexer/src/schema.rs | 9 ++ .../sui-indexer/src/store/pg_indexer_store.rs | 123 +++++++++++------- 5 files changed, 123 insertions(+), 50 deletions(-) create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql new file mode 100644 index 0000000000000..98cc9c0a36ce9 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS tx_affected_addresses; diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql new file mode 100644 index 0000000000000..4f71554f1394a --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql @@ -0,0 +1,9 @@ +CREATE TABLE tx_affected_addresses ( + tx_sequence_number BIGINT NOT NULL, + affected BYTEA NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(affected, tx_sequence_number) +); + +CREATE INDEX tx_affected_addresses_tx_sequence_number_index ON tx_affected_addresses (tx_sequence_number); +CREATE INDEX tx_affected_addresses_sender ON tx_affected_addresses (sender, affected, tx_sequence_number); diff --git a/crates/sui-indexer/src/models/tx_indices.rs b/crates/sui-indexer/src/models/tx_indices.rs index 4f942c2e7af0b..de3ccc817826f 100644 --- a/crates/sui-indexer/src/models/tx_indices.rs +++ b/crates/sui-indexer/src/models/tx_indices.rs @@ -3,12 +3,13 @@ use crate::{ schema::{ - tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, - tx_kinds, tx_recipients, tx_senders, + tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, + tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, }, types::TxIndex, }; use diesel::prelude::*; +use itertools::Itertools; #[derive(QueryableByName)] pub struct TxSequenceNumber { @@ -22,6 +23,14 @@ pub struct TxDigest { pub transaction_digest: Vec, } +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_affected_addresses)] +pub struct StoredTxAffected { + pub tx_sequence_number: i64, + pub affected: Vec, + pub sender: Vec, +} + #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_senders)] pub struct StoredTxSenders { @@ -99,6 +108,7 @@ impl TxIndex { pub fn split( self: TxIndex, ) -> ( + Vec, Vec, Vec, Vec, @@ -110,10 +120,24 @@ impl TxIndex { Vec, ) { let tx_sequence_number = self.tx_sequence_number as i64; + let tx_affected_addresses = self + .recipients + .iter() + .chain(self.payers.iter()) + .chain(std::iter::once(&self.sender)) + .unique() + .map(|a| StoredTxAffected { + tx_sequence_number, + affected: a.to_vec(), + sender: self.sender.to_vec(), + }) + .collect(); + let tx_sender = StoredTxSenders { tx_sequence_number, sender: self.sender.to_vec(), }; + let tx_recipients = self .recipients .iter() @@ -123,6 +147,7 @@ impl TxIndex { sender: self.sender.to_vec(), }) .collect(); + let tx_input_objects = self .input_objects .iter() @@ -132,6 +157,7 @@ impl TxIndex { sender: self.sender.to_vec(), }) .collect(); + let tx_changed_objects = self .changed_objects .iter() @@ -197,6 +223,7 @@ impl TxIndex { }; ( + tx_affected_addresses, vec![tx_sender], tx_recipients, tx_input_objects, diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index 21c809dcdd7d2..ebbbd4bbd7b1c 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -280,6 +280,14 @@ diesel::table! { } } +diesel::table! { + tx_affected_addresses (affected, tx_sequence_number) { + tx_sequence_number -> Int8, + affected -> Bytea, + sender -> Bytea, + } +} + diesel::table! { tx_calls_fun (package, module, func, tx_sequence_number) { tx_sequence_number -> Int8, @@ -375,6 +383,7 @@ diesel::allow_tables_to_appear_in_same_query!( protocol_configs, pruner_cp_watermark, transactions, + tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 564dfca502a92..480e3d34d8adf 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -50,8 +50,8 @@ use crate::schema::{ event_senders, event_struct_instantiation, event_struct_module, event_struct_name, event_struct_package, events, feature_flags, full_objects_history, objects, objects_history, objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark, - transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, - tx_input_objects, tx_kinds, tx_recipients, tx_senders, + transactions, tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, + tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, }; use crate::store::transaction_with_retry; use crate::types::EventIndex; @@ -1025,56 +1025,76 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_tx_indices_chunks .start_timer(); let len = indices.len(); - let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) = - indices.into_iter().map(|i| i.split()).fold( + let ( + affected_addresses, + senders, + recipients, + input_objects, + changed_objects, + pkgs, + mods, + funs, + digests, + kinds, + ) = indices.into_iter().map(|i| i.split()).fold( + ( + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ), + |( + mut tx_affected_addresses, + mut tx_senders, + mut tx_recipients, + mut tx_input_objects, + mut tx_changed_objects, + mut tx_pkgs, + mut tx_mods, + mut tx_funs, + mut tx_digests, + mut tx_kinds, + ), + index| { + tx_affected_addresses.extend(index.0); + tx_senders.extend(index.1); + tx_recipients.extend(index.2); + tx_input_objects.extend(index.3); + tx_changed_objects.extend(index.4); + tx_pkgs.extend(index.5); + tx_mods.extend(index.6); + tx_funs.extend(index.7); + tx_digests.extend(index.8); + tx_kinds.extend(index.9); ( - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - ), - |( - mut tx_senders, - mut tx_recipients, - mut tx_input_objects, - mut tx_changed_objects, - mut tx_pkgs, - mut tx_mods, - mut tx_funs, - mut tx_digests, - mut tx_kinds, - ), - index| { - tx_senders.extend(index.0); - tx_recipients.extend(index.1); - tx_input_objects.extend(index.2); - tx_changed_objects.extend(index.3); - tx_pkgs.extend(index.4); - tx_mods.extend(index.5); - tx_funs.extend(index.6); - tx_digests.extend(index.7); - tx_kinds.extend(index.8); - ( - tx_senders, - tx_recipients, - tx_input_objects, - tx_changed_objects, - tx_pkgs, - tx_mods, - tx_funs, - tx_digests, - tx_kinds, - ) - }, - ); + tx_affected_addresses, + tx_senders, + tx_recipients, + tx_input_objects, + tx_changed_objects, + tx_pkgs, + tx_mods, + tx_funs, + tx_digests, + tx_kinds, + ) + }, + ); transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { + diesel::insert_into(tx_affected_addresses::table) + .values(&affected_addresses) + .on_conflict_do_nothing() + .execute(conn) + .await?; + diesel::insert_into(tx_senders::table) .values(&senders) .on_conflict_do_nothing() @@ -1383,6 +1403,13 @@ impl PgIndexerStore { let (min_tx, max_tx) = (min_tx as i64, max_tx as i64); transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { + diesel::delete( + tx_affected_addresses::table + .filter(tx_affected_addresses::tx_sequence_number.between(min_tx, max_tx)), + ) + .execute(conn) + .await?; + diesel::delete( tx_senders::table .filter(tx_senders::tx_sequence_number.between(min_tx, max_tx)),