Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer: index tx_affected_addresses #19355

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS tx_affected_addresses;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE tx_affected_addresses (
tx_sequence_number BIGINT NOT NULL,
affected BYTEA NOT NULL,
sender BYTEA NOT NULL,
PRIMARY KEY(affected, tx_sequence_number)
);

CREATE INDEX tx_affected_addresses_tx_sequence_number_index ON tx_affected_addresses (tx_sequence_number);
CREATE INDEX tx_affected_addresses_sender ON tx_affected_addresses (sender, affected, tx_sequence_number);
31 changes: 29 additions & 2 deletions crates/sui-indexer/src/models/tx_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

use crate::{
schema::{
tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects,
tx_kinds, tx_recipients, tx_senders,
tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects,
tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
},
types::TxIndex,
};
use diesel::prelude::*;
use itertools::Itertools;

#[derive(QueryableByName)]
pub struct TxSequenceNumber {
Expand All @@ -22,6 +23,14 @@ pub struct TxDigest {
pub transaction_digest: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_affected_addresses)]
pub struct StoredTxAffected {
pub tx_sequence_number: i64,
pub affected: Vec<u8>,
pub sender: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_senders)]
pub struct StoredTxSenders {
Expand Down Expand Up @@ -99,6 +108,7 @@ impl TxIndex {
pub fn split(
self: TxIndex,
) -> (
Vec<StoredTxAffected>,
Vec<StoredTxSenders>,
Vec<StoredTxRecipients>,
Vec<StoredTxInputObject>,
Expand All @@ -110,10 +120,24 @@ impl TxIndex {
Vec<StoredTxKind>,
) {
let tx_sequence_number = self.tx_sequence_number as i64;
let tx_affected_addresses = self
.recipients
.iter()
.chain(self.payers.iter())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish there are more comments in the TxIndex data structure but my understanding is that recipients should naturally include payers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is true, for the same reason that a sender (in the non-sponsorship case) is not necessarily in the recipients list: If the transaction transfers the gas coin to some other address.

.chain(std::iter::once(&self.sender))
.unique()
.map(|a| StoredTxAffected {
tx_sequence_number,
affected: a.to_vec(),
sender: self.sender.to_vec(),
})
.collect();

let tx_sender = StoredTxSenders {
tx_sequence_number,
sender: self.sender.to_vec(),
};

let tx_recipients = self
.recipients
.iter()
Expand All @@ -123,6 +147,7 @@ impl TxIndex {
sender: self.sender.to_vec(),
})
.collect();

let tx_input_objects = self
.input_objects
.iter()
Expand All @@ -132,6 +157,7 @@ impl TxIndex {
sender: self.sender.to_vec(),
})
.collect();

let tx_changed_objects = self
.changed_objects
.iter()
Expand Down Expand Up @@ -197,6 +223,7 @@ impl TxIndex {
};

(
tx_affected_addresses,
vec![tx_sender],
tx_recipients,
tx_input_objects,
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ diesel::table! {
}
}

diesel::table! {
tx_affected_addresses (affected, tx_sequence_number) {
tx_sequence_number -> Int8,
affected -> Bytea,
sender -> Bytea,
}
}

diesel::table! {
tx_calls_fun (package, module, func, tx_sequence_number) {
tx_sequence_number -> Int8,
Expand Down Expand Up @@ -375,6 +383,7 @@ diesel::allow_tables_to_appear_in_same_query!(
protocol_configs,
pruner_cp_watermark,
transactions,
tx_affected_addresses,
tx_calls_fun,
tx_calls_mod,
tx_calls_pkg,
Expand Down
123 changes: 75 additions & 48 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ use crate::schema::{
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
event_struct_package, events, feature_flags, full_objects_history, objects, objects_history,
objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
tx_input_objects, tx_kinds, tx_recipients, tx_senders,
transactions, tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
};
use crate::store::transaction_with_retry;
use crate::types::EventIndex;
Expand Down Expand Up @@ -1025,56 +1025,76 @@ impl PgIndexerStore {
.checkpoint_db_commit_latency_tx_indices_chunks
.start_timer();
let len = indices.len();
let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
indices.into_iter().map(|i| i.split()).fold(
let (
affected_addresses,
senders,
recipients,
input_objects,
changed_objects,
pkgs,
mods,
funs,
digests,
kinds,
) = indices.into_iter().map(|i| i.split()).fold(
(
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
),
|(
mut tx_affected_addresses,
mut tx_senders,
mut tx_recipients,
mut tx_input_objects,
mut tx_changed_objects,
mut tx_pkgs,
mut tx_mods,
mut tx_funs,
mut tx_digests,
mut tx_kinds,
),
index| {
tx_affected_addresses.extend(index.0);
tx_senders.extend(index.1);
tx_recipients.extend(index.2);
tx_input_objects.extend(index.3);
tx_changed_objects.extend(index.4);
tx_pkgs.extend(index.5);
tx_mods.extend(index.6);
tx_funs.extend(index.7);
tx_digests.extend(index.8);
tx_kinds.extend(index.9);
(
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
),
|(
mut tx_senders,
mut tx_recipients,
mut tx_input_objects,
mut tx_changed_objects,
mut tx_pkgs,
mut tx_mods,
mut tx_funs,
mut tx_digests,
mut tx_kinds,
),
index| {
tx_senders.extend(index.0);
tx_recipients.extend(index.1);
tx_input_objects.extend(index.2);
tx_changed_objects.extend(index.3);
tx_pkgs.extend(index.4);
tx_mods.extend(index.5);
tx_funs.extend(index.6);
tx_digests.extend(index.7);
tx_kinds.extend(index.8);
(
tx_senders,
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
tx_mods,
tx_funs,
tx_digests,
tx_kinds,
)
},
);
tx_affected_addresses,
tx_senders,
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
tx_mods,
tx_funs,
tx_digests,
tx_kinds,
)
},
);

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(tx_affected_addresses::table)
.values(&affected_addresses)
.on_conflict_do_nothing()
.execute(conn)
.await?;

diesel::insert_into(tx_senders::table)
.values(&senders)
.on_conflict_do_nothing()
Expand Down Expand Up @@ -1383,6 +1403,13 @@ impl PgIndexerStore {
let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::delete(
tx_affected_addresses::table
.filter(tx_affected_addresses::tx_sequence_number.between(min_tx, max_tx)),
)
.execute(conn)
.await?;

diesel::delete(
tx_senders::table
.filter(tx_senders::tx_sequence_number.between(min_tx, max_tx)),
Expand Down
Loading