From 6d4be8441a2812ce38ee768d927f86ee022cf8f2 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Thu, 12 Sep 2024 23:38:42 +0100 Subject: [PATCH 1/3] indexer: index tx_affected ## Description Introduce `tx_affected` table -- a combination of `tx_recipients` and `tx_senders`, to power the sender-or-recipient query in GraphQL. ## Test plan ``` cargo build -p sui-indexer ``` Tests based on reading this field will be included with the PR introducing changes to GraphQL. --- .../pg/2024-09-12-150939_tx_affected/down.sql | 1 + .../pg/2024-09-12-150939_tx_affected/up.sql | 8 ++ crates/sui-indexer/src/models/tx_indices.rs | 28 +++- crates/sui-indexer/src/schema.rs | 9 ++ .../sui-indexer/src/store/pg_indexer_store.rs | 123 +++++++++++------- 5 files changed, 119 insertions(+), 50 deletions(-) create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql new file mode 100644 index 0000000000000..22d3eb6155a1a --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS tx_affected; diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql new file mode 100644 index 0000000000000..6bde0a0c2838e --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql @@ -0,0 +1,8 @@ +CREATE TABLE tx_affected ( + tx_sequence_number BIGINT NOT NULL, + affected BYTEA NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(affected, tx_sequence_number) +); +CREATE INDEX tx_affected_tx_sequence_number_index ON tx_affected (tx_sequence_number); +CREATE INDEX tx_affected_sender ON tx_affected (sender, affected, tx_sequence_number); diff --git a/crates/sui-indexer/src/models/tx_indices.rs b/crates/sui-indexer/src/models/tx_indices.rs index 4f942c2e7af0b..4bd7577e43d22 100644 --- a/crates/sui-indexer/src/models/tx_indices.rs +++ b/crates/sui-indexer/src/models/tx_indices.rs @@ -3,8 +3,8 @@ use crate::{ schema::{ - tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, - tx_kinds, tx_recipients, tx_senders, + tx_affected, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, + tx_input_objects, tx_kinds, tx_recipients, tx_senders, }, types::TxIndex, }; @@ -22,6 +22,14 @@ pub struct TxDigest { pub transaction_digest: Vec, } +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_affected)] +pub struct StoredTxAffected { + pub tx_sequence_number: i64, + pub affected: Vec, + pub sender: Vec, +} + #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_senders)] pub struct StoredTxSenders { @@ -99,6 +107,7 @@ impl TxIndex { pub fn split( self: TxIndex, ) -> ( + Vec, Vec, Vec, Vec, @@ -110,6 +119,20 @@ impl TxIndex { Vec, ) { let tx_sequence_number = self.tx_sequence_number as i64; + let tx_affected = self + .recipients + .iter() + .map(|r| StoredTxAffected { + tx_sequence_number, + affected: r.to_vec(), + sender: self.sender.to_vec(), + }) + .chain(std::iter::once(StoredTxAffected { + tx_sequence_number, + affected: self.sender.to_vec(), + sender: self.sender.to_vec(), + })) + .collect(); let tx_sender = StoredTxSenders { tx_sequence_number, sender: self.sender.to_vec(), @@ -197,6 +220,7 @@ impl TxIndex { }; ( + tx_affected, vec![tx_sender], tx_recipients, tx_input_objects, diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index 21c809dcdd7d2..ca6d67a50b6e3 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -280,6 +280,14 @@ diesel::table! { } } +diesel::table! { + tx_affected (affected, tx_sequence_number) { + tx_sequence_number -> Int8, + affected -> Bytea, + sender -> Bytea, + } +} + diesel::table! { tx_calls_fun (package, module, func, tx_sequence_number) { tx_sequence_number -> Int8, @@ -375,6 +383,7 @@ diesel::allow_tables_to_appear_in_same_query!( protocol_configs, pruner_cp_watermark, transactions, + tx_affected, tx_calls_fun, tx_calls_mod, tx_calls_pkg, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 564dfca502a92..ec3d3f8c55c3b 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -50,8 +50,8 @@ use crate::schema::{ event_senders, event_struct_instantiation, event_struct_module, event_struct_name, event_struct_package, events, feature_flags, full_objects_history, objects, objects_history, objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark, - transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, - tx_input_objects, tx_kinds, tx_recipients, tx_senders, + transactions, tx_affected, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, + tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, }; use crate::store::transaction_with_retry; use crate::types::EventIndex; @@ -1025,56 +1025,76 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_tx_indices_chunks .start_timer(); let len = indices.len(); - let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) = - indices.into_iter().map(|i| i.split()).fold( + let ( + affected, + senders, + recipients, + input_objects, + changed_objects, + pkgs, + mods, + funs, + digests, + kinds, + ) = indices.into_iter().map(|i| i.split()).fold( + ( + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ), + |( + mut tx_affected, + mut tx_senders, + mut tx_recipients, + mut tx_input_objects, + mut tx_changed_objects, + mut tx_pkgs, + mut tx_mods, + mut tx_funs, + mut tx_digests, + mut tx_kinds, + ), + index| { + tx_affected.extend(index.0); + tx_senders.extend(index.1); + tx_recipients.extend(index.2); + tx_input_objects.extend(index.3); + tx_changed_objects.extend(index.4); + tx_pkgs.extend(index.5); + tx_mods.extend(index.6); + tx_funs.extend(index.7); + tx_digests.extend(index.8); + tx_kinds.extend(index.9); ( - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - Vec::new(), - ), - |( - mut tx_senders, - mut tx_recipients, - mut tx_input_objects, - mut tx_changed_objects, - mut tx_pkgs, - mut tx_mods, - mut tx_funs, - mut tx_digests, - mut tx_kinds, - ), - index| { - tx_senders.extend(index.0); - tx_recipients.extend(index.1); - tx_input_objects.extend(index.2); - tx_changed_objects.extend(index.3); - tx_pkgs.extend(index.4); - tx_mods.extend(index.5); - tx_funs.extend(index.6); - tx_digests.extend(index.7); - tx_kinds.extend(index.8); - ( - tx_senders, - tx_recipients, - tx_input_objects, - tx_changed_objects, - tx_pkgs, - tx_mods, - tx_funs, - tx_digests, - tx_kinds, - ) - }, - ); + tx_affected, + tx_senders, + tx_recipients, + tx_input_objects, + tx_changed_objects, + tx_pkgs, + tx_mods, + tx_funs, + tx_digests, + tx_kinds, + ) + }, + ); transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { + diesel::insert_into(tx_affected::table) + .values(&affected) + .on_conflict_do_nothing() + .execute(conn) + .await?; + diesel::insert_into(tx_senders::table) .values(&senders) .on_conflict_do_nothing() @@ -1383,6 +1403,13 @@ impl PgIndexerStore { let (min_tx, max_tx) = (min_tx as i64, max_tx as i64); transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { + diesel::delete( + tx_affected::table + .filter(tx_affected::tx_sequence_number.between(min_tx, max_tx)), + ) + .execute(conn) + .await?; + diesel::delete( tx_senders::table .filter(tx_senders::tx_sequence_number.between(min_tx, max_tx)), From 78042af98da127caa2c69ea213ea43f14ac0db6a Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Fri, 13 Sep 2024 12:59:56 +0100 Subject: [PATCH 2/3] fixup: clarify that these are the affected *addresses* Later on, we will also track affected objects, instead of input and changed objects. --- .../pg/2024-09-12-150939_tx_affected/down.sql | 2 +- .../pg/2024-09-12-150939_tx_affected/up.sql | 7 ++++--- crates/sui-indexer/src/models/tx_indices.rs | 10 +++++----- crates/sui-indexer/src/schema.rs | 4 ++-- .../sui-indexer/src/store/pg_indexer_store.rs | 20 +++++++++---------- 5 files changed, 22 insertions(+), 21 deletions(-) diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql index 22d3eb6155a1a..98cc9c0a36ce9 100644 --- a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql +++ b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/down.sql @@ -1 +1 @@ -DROP TABLE IF EXISTS tx_affected; +DROP TABLE IF EXISTS tx_affected_addresses; diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql index 6bde0a0c2838e..4f71554f1394a 100644 --- a/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql +++ b/crates/sui-indexer/migrations/pg/2024-09-12-150939_tx_affected/up.sql @@ -1,8 +1,9 @@ -CREATE TABLE tx_affected ( +CREATE TABLE tx_affected_addresses ( tx_sequence_number BIGINT NOT NULL, affected BYTEA NOT NULL, sender BYTEA NOT NULL, PRIMARY KEY(affected, tx_sequence_number) ); -CREATE INDEX tx_affected_tx_sequence_number_index ON tx_affected (tx_sequence_number); -CREATE INDEX tx_affected_sender ON tx_affected (sender, affected, tx_sequence_number); + +CREATE INDEX tx_affected_addresses_tx_sequence_number_index ON tx_affected_addresses (tx_sequence_number); +CREATE INDEX tx_affected_addresses_sender ON tx_affected_addresses (sender, affected, tx_sequence_number); diff --git a/crates/sui-indexer/src/models/tx_indices.rs b/crates/sui-indexer/src/models/tx_indices.rs index 4bd7577e43d22..83e99f3c132eb 100644 --- a/crates/sui-indexer/src/models/tx_indices.rs +++ b/crates/sui-indexer/src/models/tx_indices.rs @@ -3,8 +3,8 @@ use crate::{ schema::{ - tx_affected, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, - tx_input_objects, tx_kinds, tx_recipients, tx_senders, + tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, + tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, }, types::TxIndex, }; @@ -23,7 +23,7 @@ pub struct TxDigest { } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] -#[diesel(table_name = tx_affected)] +#[diesel(table_name = tx_affected_addresses)] pub struct StoredTxAffected { pub tx_sequence_number: i64, pub affected: Vec, @@ -119,7 +119,7 @@ impl TxIndex { Vec, ) { let tx_sequence_number = self.tx_sequence_number as i64; - let tx_affected = self + let tx_affected_addresses = self .recipients .iter() .map(|r| StoredTxAffected { @@ -220,7 +220,7 @@ impl TxIndex { }; ( - tx_affected, + tx_affected_addresses, vec![tx_sender], tx_recipients, tx_input_objects, diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index ca6d67a50b6e3..ebbbd4bbd7b1c 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -281,7 +281,7 @@ diesel::table! { } diesel::table! { - tx_affected (affected, tx_sequence_number) { + tx_affected_addresses (affected, tx_sequence_number) { tx_sequence_number -> Int8, affected -> Bytea, sender -> Bytea, @@ -383,7 +383,7 @@ diesel::allow_tables_to_appear_in_same_query!( protocol_configs, pruner_cp_watermark, transactions, - tx_affected, + tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index ec3d3f8c55c3b..480e3d34d8adf 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -50,8 +50,8 @@ use crate::schema::{ event_senders, event_struct_instantiation, event_struct_module, event_struct_name, event_struct_package, events, feature_flags, full_objects_history, objects, objects_history, objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark, - transactions, tx_affected, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, - tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, + transactions, tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, + tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, }; use crate::store::transaction_with_retry; use crate::types::EventIndex; @@ -1026,7 +1026,7 @@ impl PgIndexerStore { .start_timer(); let len = indices.len(); let ( - affected, + affected_addresses, senders, recipients, input_objects, @@ -1050,7 +1050,7 @@ impl PgIndexerStore { Vec::new(), ), |( - mut tx_affected, + mut tx_affected_addresses, mut tx_senders, mut tx_recipients, mut tx_input_objects, @@ -1062,7 +1062,7 @@ impl PgIndexerStore { mut tx_kinds, ), index| { - tx_affected.extend(index.0); + tx_affected_addresses.extend(index.0); tx_senders.extend(index.1); tx_recipients.extend(index.2); tx_input_objects.extend(index.3); @@ -1073,7 +1073,7 @@ impl PgIndexerStore { tx_digests.extend(index.8); tx_kinds.extend(index.9); ( - tx_affected, + tx_affected_addresses, tx_senders, tx_recipients, tx_input_objects, @@ -1089,8 +1089,8 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(tx_affected::table) - .values(&affected) + diesel::insert_into(tx_affected_addresses::table) + .values(&affected_addresses) .on_conflict_do_nothing() .execute(conn) .await?; @@ -1404,8 +1404,8 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { diesel::delete( - tx_affected::table - .filter(tx_affected::tx_sequence_number.between(min_tx, max_tx)), + tx_affected_addresses::table + .filter(tx_affected_addresses::tx_sequence_number.between(min_tx, max_tx)), ) .execute(conn) .await?; From 5c5d6ceb241f3aa058000c3608efff79930166a9 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Fri, 13 Sep 2024 14:52:37 +0100 Subject: [PATCH 3/3] fixup: affected addresses dedup + include sponsor --- crates/sui-indexer/src/models/tx_indices.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/sui-indexer/src/models/tx_indices.rs b/crates/sui-indexer/src/models/tx_indices.rs index 83e99f3c132eb..de3ccc817826f 100644 --- a/crates/sui-indexer/src/models/tx_indices.rs +++ b/crates/sui-indexer/src/models/tx_indices.rs @@ -9,6 +9,7 @@ use crate::{ types::TxIndex, }; use diesel::prelude::*; +use itertools::Itertools; #[derive(QueryableByName)] pub struct TxSequenceNumber { @@ -122,21 +123,21 @@ impl TxIndex { let tx_affected_addresses = self .recipients .iter() - .map(|r| StoredTxAffected { + .chain(self.payers.iter()) + .chain(std::iter::once(&self.sender)) + .unique() + .map(|a| StoredTxAffected { tx_sequence_number, - affected: r.to_vec(), + affected: a.to_vec(), sender: self.sender.to_vec(), }) - .chain(std::iter::once(StoredTxAffected { - tx_sequence_number, - affected: self.sender.to_vec(), - sender: self.sender.to_vec(), - })) .collect(); + let tx_sender = StoredTxSenders { tx_sequence_number, sender: self.sender.to_vec(), }; + let tx_recipients = self .recipients .iter() @@ -146,6 +147,7 @@ impl TxIndex { sender: self.sender.to_vec(), }) .collect(); + let tx_input_objects = self .input_objects .iter() @@ -155,6 +157,7 @@ impl TxIndex { sender: self.sender.to_vec(), }) .collect(); + let tx_changed_objects = self .changed_objects .iter()