From 667745104e492e8a9a389bedcf4cf63c37f48118 Mon Sep 17 00:00:00 2001 From: Ge Gao <106119108+gegaowp@users.noreply.github.com> Date: Thu, 15 Aug 2024 18:02:42 -0400 Subject: [PATCH] indexer 2024-08: merge idx-breaking-change-park to main (#19005) ## Description title, this is another attempt of https://github.com/MystenLabs/sui/pull/18899 which got reverted as it triggered some CI issues, the issues have been resolved by #18993 ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --------- Co-authored-by: Emma Zhong Co-authored-by: Ashok Menon Co-authored-by: wlmyng <127570466+wlmyng@users.noreply.github.com> Co-authored-by: Emma Zhong --- .../src/types/transaction_block.rs | 20 +- .../mysql/2024-04-24-180249_packages/up.sql | 13 +- .../2024-05-05-155158_obj_indices/down.sql | 1 + .../2024-05-05-155158_obj_indices/up.sql | 9 + .../pg/2023-08-19-044020_events/up.sql | 5 +- .../2023-08-19-044026_transactions/down.sql | 1 + .../pg/2023-08-19-044026_transactions/up.sql | 8 +- .../pg/2023-08-19-044044_checkpoints/up.sql | 20 +- .../pg/2023-08-19-060729_packages/up.sql | 14 +- .../pg/2023-10-06-204335_tx_indices/down.sql | 5 +- .../pg/2023-10-06-204335_tx_indices/up.sql | 61 ++- .../up.sql | 6 +- .../pg/2024-05-05-155158_obj_indices/down.sql | 1 + .../pg/2024-05-05-155158_obj_indices/up.sql | 31 ++ .../2024-06-14-045801_event_indices/down.sql | 7 + .../pg/2024-06-14-045801_event_indices/up.sql | 74 +++ .../src/handlers/checkpoint_handler.rs | 66 ++- crates/sui-indexer/src/handlers/committer.rs | 8 + crates/sui-indexer/src/handlers/mod.rs | 6 +- crates/sui-indexer/src/indexer_reader.rs | 8 +- crates/sui-indexer/src/metrics.rs | 16 + crates/sui-indexer/src/models/checkpoints.rs | 4 + .../sui-indexer/src/models/event_indices.rs | 145 ++++++ crates/sui-indexer/src/models/mod.rs | 2 + crates/sui-indexer/src/models/obj_indices.rs | 40 ++ crates/sui-indexer/src/models/packages.rs | 6 + crates/sui-indexer/src/models/tx_indices.rs | 117 +++-- crates/sui-indexer/src/schema/mod.rs | 39 +- crates/sui-indexer/src/schema/mysql.rs | 128 ++++- crates/sui-indexer/src/schema/pg.rs | 149 +++++- crates/sui-indexer/src/store/indexer_store.rs | 9 +- crates/sui-indexer/src/store/mod.rs | 30 ++ .../sui-indexer/src/store/pg_indexer_store.rs | 450 ++++++++++++++++-- .../src/store/pg_partition_manager.rs | 78 ++- crates/sui-indexer/src/types.rs | 49 +- 35 files changed, 1448 insertions(+), 178 deletions(-) create mode 100644 crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/down.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/up.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/up.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/up.sql create mode 100644 crates/sui-indexer/src/models/event_indices.rs create mode 100644 crates/sui-indexer/src/models/obj_indices.rs diff --git a/crates/sui-graphql-rpc/src/types/transaction_block.rs b/crates/sui-graphql-rpc/src/types/transaction_block.rs index 5009d076145d8..c75a34ee2fcda 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block.rs @@ -8,14 +8,14 @@ use async_graphql::{ dataloader::Loader, *, }; -use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; +use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; use fastcrypto::encoding::{Base58, Encoding}; use serde::{Deserialize, Serialize}; use sui_indexer::{ models::transactions::StoredTransaction, schema::{ - transactions, tx_calls, tx_changed_objects, tx_digests, tx_input_objects, tx_recipients, - tx_senders, + transactions, tx_calls_fun, tx_changed_objects, tx_digests, tx_input_objects, + tx_recipients, tx_senders, }, }; use sui_types::{ @@ -318,15 +318,15 @@ impl TransactionBlock { let mut query = tx::dsl::transactions.into_boxed(); if let Some(f) = &filter.function { - let sub_query = tx_calls::dsl::tx_calls - .select(tx_calls::dsl::tx_sequence_number) + let sub_query = tx_calls_fun::dsl::tx_calls_fun + .select(tx_calls_fun::dsl::tx_sequence_number) .into_boxed(); query = query.filter(tx::dsl::tx_sequence_number.eq_any(f.apply( sub_query, - tx_calls::dsl::package, - tx_calls::dsl::module, - tx_calls::dsl::func, + tx_calls_fun::dsl::package, + tx_calls_fun::dsl::module, + tx_calls_fun::dsl::func, ))); } @@ -507,9 +507,7 @@ impl Loader for Db { let transactions: Vec = self .execute(move |conn| { conn.results(move || { - let join = ds::cp_sequence_number - .eq(tx::checkpoint_sequence_number) - .and(ds::tx_sequence_number.eq(tx::tx_sequence_number)); + let join = ds::tx_sequence_number.eq(tx::tx_sequence_number); tx::transactions .inner_join(ds::tx_digests.on(join)) diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql index f3fe2539038fc..7ee89206f254f 100644 --- a/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql @@ -1,7 +1,14 @@ CREATE TABLE packages ( - package_id blob NOT NULL, + package_id BLOB NOT NULL, + original_id BLOB NOT NULL, + package_version BIGINT NOT NULL, -- bcs serialized MovePackage - move_package MEDIUMBLOB NOT NULL, - CONSTRAINT packages_pk PRIMARY KEY (package_id(255)) + move_package MEDIUMBLOB NOT NULL, + checkpoint_sequence_number BIGINT NOT NULL, + CONSTRAINT packages_pk PRIMARY KEY (package_id(32), original_id(32), package_version), + CONSTRAINT packages_unique_package_id UNIQUE (package_id(32)) ); + +CREATE INDEX packages_cp_id_version ON packages (checkpoint_sequence_number, original_id(32), package_version); +CREATE INDEX packages_id_version_cp ON packages (original_id(32), package_version, checkpoint_sequence_number); diff --git a/crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/down.sql b/crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/down.sql new file mode 100644 index 0000000000000..7a3a7670f24c2 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS objects_version; diff --git a/crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/up.sql b/crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/up.sql new file mode 100644 index 0000000000000..e501b71a073c0 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-05-05-155158_obj_indices/up.sql @@ -0,0 +1,9 @@ +-- The Postgres version of this table is partitioned by the first byte +-- of object_id, but this kind of partition is not easily supported in +-- MySQL, so this variant is unpartitioned for now. +CREATE TABLE objects_version ( + object_id BLOB NOT NULL, + object_version BIGINT NOT NULL, + cp_sequence_number BIGINT NOT NULL, + PRIMARY KEY (object_id(32), object_version) +) diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/up.sql index a6c0d70566e7b..dfbfa3ea14495 100644 --- a/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/up.sql @@ -1,3 +1,4 @@ +-- TODO: modify queries in indexer reader to take advantage of the new indices CREATE TABLE events ( tx_sequence_number BIGINT NOT NULL, @@ -23,8 +24,8 @@ CREATE TABLE events timestamp_ms BIGINT NOT NULL, -- bcs of the Event contents (Event.contents) bcs BYTEA NOT NULL, - PRIMARY KEY(tx_sequence_number, event_sequence_number, checkpoint_sequence_number) -) PARTITION BY RANGE (checkpoint_sequence_number); + PRIMARY KEY(tx_sequence_number, event_sequence_number) +) PARTITION BY RANGE (tx_sequence_number); CREATE TABLE events_partition_0 PARTITION OF events FOR VALUES FROM (0) TO (MAXVALUE); CREATE INDEX events_package ON events (package, tx_sequence_number, event_sequence_number); CREATE INDEX events_package_module ON events (package, module, tx_sequence_number, event_sequence_number); diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/down.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/down.sql index e5850457f922e..15e9dc9f1cb82 100644 --- a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/down.sql +++ b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/down.sql @@ -1,2 +1,3 @@ -- This file should undo anything in `up.sql` DROP TABLE IF EXISTS transactions; +DROP TABLE IF EXISTS transactions_partition_0; diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql index ede66ad44798c..f5404e3610751 100644 --- a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql @@ -18,10 +18,6 @@ CREATE TABLE transactions ( -- number of successful commands in this transaction, bound by number of command -- in a programmaable transaction. success_command_count smallint NOT NULL, - PRIMARY KEY (tx_sequence_number, checkpoint_sequence_number) -) PARTITION BY RANGE (checkpoint_sequence_number); + PRIMARY KEY (tx_sequence_number) +) PARTITION BY RANGE (tx_sequence_number); CREATE TABLE transactions_partition_0 PARTITION OF transactions FOR VALUES FROM (0) TO (MAXVALUE); -CREATE INDEX transactions_transaction_digest ON transactions (transaction_digest); -CREATE INDEX transactions_checkpoint_sequence_number ON transactions (checkpoint_sequence_number); --- only create index for system transactions (0). See types.rs -CREATE INDEX transactions_transaction_kind ON transactions (transaction_kind) WHERE transaction_kind = 0; diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/up.sql index 5f7281a2e1a1d..ddb63b020de70 100644 --- a/crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/up.sql @@ -1,15 +1,15 @@ CREATE TABLE checkpoints ( - sequence_number bigint PRIMARY KEY, - checkpoint_digest bytea NOT NULL, - epoch bigint NOT NULL, + sequence_number BIGINT PRIMARY KEY, + checkpoint_digest BYTEA NOT NULL, + epoch BIGINT NOT NULL, -- total transactions in the network at the end of this checkpoint (including itself) - network_total_transactions bigint NOT NULL, - previous_checkpoint_digest bytea, + network_total_transactions BIGINT NOT NULL, + previous_checkpoint_digest BYTEA, -- if this checkpoitn is the last checkpoint of an epoch end_of_epoch boolean NOT NULL, -- array of TranscationDigest in bytes included in this checkpoint - tx_digests bytea[] NOT NULL, + tx_digests BYTEA[] NOT NULL, timestamp_ms BIGINT NOT NULL, total_gas_cost BIGINT NOT NULL, computation_cost BIGINT NOT NULL, @@ -17,11 +17,13 @@ CREATE TABLE checkpoints storage_rebate BIGINT NOT NULL, non_refundable_storage_fee BIGINT NOT NULL, -- bcs serialized Vec bytes - checkpoint_commitments bytea NOT NULL, + checkpoint_commitments BYTEA NOT NULL, -- bcs serialized AggregateAuthoritySignature bytes - validator_signature bytea NOT NULL, + validator_signature BYTEA NOT NULL, -- bcs serialzied EndOfEpochData bytes, if the checkpoint marks end of an epoch - end_of_epoch_data bytea + end_of_epoch_data BYTEA, + min_tx_sequence_number BIGINT, + max_tx_sequence_number BIGINT ); CREATE INDEX checkpoints_epoch ON checkpoints (epoch, sequence_number); diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/up.sql index a95489af4dc41..f08a5549608eb 100644 --- a/crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/up.sql @@ -1,6 +1,14 @@ -CREATE TABLE packages +CREATE TABLE packages ( - package_id bytea PRIMARY KEY, + package_id bytea NOT NULL, + original_id bytea NOT NULL, + package_version bigint NOT NULL, -- bcs serialized MovePackage - move_package bytea NOT NULL + move_package bytea NOT NULL, + checkpoint_sequence_number bigint NOT NULL, + CONSTRAINT packages_pkey PRIMARY KEY (package_id, original_id, package_version), + CONSTRAINT packages_unique_package_id UNIQUE (package_id) ); + +CREATE INDEX packages_cp_id_version ON packages (checkpoint_sequence_number, original_id, package_version); +CREATE INDEX packages_id_version_cp ON packages (original_id, package_version, checkpoint_sequence_number); diff --git a/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql b/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql index 8e4f29f981c22..f5604c0db5357 100644 --- a/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql +++ b/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql @@ -2,5 +2,8 @@ DROP TABLE IF EXISTS tx_senders; DROP TABLE IF EXISTS tx_recipients; DROP TABLE IF EXISTS tx_input_objects; DROP TABLE IF EXISTS tx_changed_objects; -DROP TABLE IF EXISTS tx_calls; +DROP TABLE IF EXISTS tx_calls_pkg; +DROP TABLE IF EXISTS tx_calls_mod; +DROP TABLE IF EXISTS tx_calls_fun; DROP TABLE IF EXISTS tx_digests; +DROP TABLE IF EXISTS tx_kinds; diff --git a/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/up.sql b/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/up.sql index ed81a281f2b0a..0bcd824e31254 100644 --- a/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/up.sql @@ -1,57 +1,70 @@ CREATE TABLE tx_senders ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- SuiAddress in bytes. sender BYTEA NOT NULL, - PRIMARY KEY(sender, tx_sequence_number, cp_sequence_number) + PRIMARY KEY(sender, tx_sequence_number) ); -CREATE INDEX tx_senders_tx_sequence_number_index ON tx_senders (tx_sequence_number, cp_sequence_number); CREATE TABLE tx_recipients ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- SuiAddress in bytes. recipient BYTEA NOT NULL, - PRIMARY KEY(recipient, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(recipient, tx_sequence_number) ); -CREATE INDEX tx_recipients_tx_sequence_number_index ON tx_recipients (tx_sequence_number, cp_sequence_number); +CREATE INDEX tx_recipients_sender ON tx_recipients (sender, recipient, tx_sequence_number); CREATE TABLE tx_input_objects ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- Object ID in bytes. object_id BYTEA NOT NULL, - PRIMARY KEY(object_id, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(object_id, tx_sequence_number) ); CREATE INDEX tx_input_objects_tx_sequence_number_index ON tx_input_objects (tx_sequence_number); +CREATE INDEX tx_input_objects_sender ON tx_input_objects (sender, object_id, tx_sequence_number); CREATE TABLE tx_changed_objects ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- Object Id in bytes. object_id BYTEA NOT NULL, - PRIMARY KEY(object_id, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(object_id, tx_sequence_number) ); CREATE INDEX tx_changed_objects_tx_sequence_number_index ON tx_changed_objects (tx_sequence_number); +CREATE INDEX tx_changed_objects_sender ON tx_changed_objects (sender, object_id, tx_sequence_number); + +CREATE TABLE tx_calls_pkg ( + tx_sequence_number BIGINT NOT NULL, + package BYTEA NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, tx_sequence_number) +); +CREATE INDEX tx_calls_pkg_sender ON tx_calls_pkg (sender, package, tx_sequence_number); + +CREATE TABLE tx_calls_mod ( + tx_sequence_number BIGINT NOT NULL, + package BYTEA NOT NULL, + module TEXT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, tx_sequence_number) +); +CREATE INDEX tx_calls_mod_sender ON tx_calls_mod (sender, package, module, tx_sequence_number); -CREATE TABLE tx_calls ( - cp_sequence_number BIGINT NOT NULL, +CREATE TABLE tx_calls_fun ( tx_sequence_number BIGINT NOT NULL, package BYTEA NOT NULL, module TEXT NOT NULL, func TEXT NOT NULL, - -- 1. Using Primary Key as a unique index. - -- 2. Diesel does not like tables with no primary key. - PRIMARY KEY(package, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, func, tx_sequence_number) ); -CREATE INDEX tx_calls_module ON tx_calls (package, module, tx_sequence_number, cp_sequence_number); -CREATE INDEX tx_calls_func ON tx_calls (package, module, func, tx_sequence_number, cp_sequence_number); -CREATE INDEX tx_calls_tx_sequence_number ON tx_calls (tx_sequence_number, cp_sequence_number); +CREATE INDEX tx_calls_fun_sender ON tx_calls_fun (sender, package, module, func, tx_sequence_number); --- un-partitioned table for tx_digest -> (cp_sequence_number, tx_sequence_number) lookup. CREATE TABLE tx_digests ( tx_digest BYTEA PRIMARY KEY, - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL ); CREATE INDEX tx_digests_tx_sequence_number ON tx_digests (tx_sequence_number); + +CREATE TABLE tx_kinds ( + tx_sequence_number BIGINT NOT NULL, + tx_kind SMALLINT NOT NULL, + PRIMARY KEY(tx_kind, tx_sequence_number) +); diff --git a/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql index cb24af8e09934..8ca64b86a7081 100644 --- a/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql @@ -1,10 +1,10 @@ -CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start_cp BIGINT, new_epoch_start_cp BIGINT) +CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start BIGINT, new_epoch_start BIGINT) LANGUAGE plpgsql AS $$ BEGIN EXECUTE format('ALTER TABLE %I DETACH PARTITION %I_partition_%s', table_name, table_name, last_epoch); - EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start_cp, new_epoch_start_cp); - EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start_cp); + EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start, new_epoch_start); + EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start); END; $$; diff --git a/crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/down.sql b/crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/down.sql new file mode 100644 index 0000000000000..7a3a7670f24c2 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS objects_version; diff --git a/crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/up.sql b/crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/up.sql new file mode 100644 index 0000000000000..666e5a2423319 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-05-05-155158_obj_indices/up.sql @@ -0,0 +1,31 @@ +-- Indexing table mapping an object's ID and version to its checkpoint +-- sequence number, partitioned by the first byte of its Object ID. +CREATE TABLE objects_version ( + object_id bytea NOT NULL, + object_version bigint NOT NULL, + cp_sequence_number bigint NOT NULL, + PRIMARY KEY (object_id, object_version) +) PARTITION BY RANGE (object_id); + +-- Create a partition for each first byte value. +DO $$ +DECLARE + lo text; + hi text; +BEGIN + FOR i IN 0..254 LOOP + lo := LPAD(TO_HEX(i), 2, '0'); + hi := LPAD(TO_HEX(i + 1), 2, '0'); + EXECUTE FORMAT($F$ + CREATE TABLE objects_version_%1$s PARTITION OF objects_version FOR VALUES + FROM (E'\\x%1$s00000000000000000000000000000000000000000000000000000000000000') + TO (E'\\x%2$s00000000000000000000000000000000000000000000000000000000000000'); + $F$, lo, hi); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- Special case for the last partition, because of the upper bound. +CREATE TABLE objects_version_ff PARTITION OF objects_version FOR VALUES +FROM (E'\\xff00000000000000000000000000000000000000000000000000000000000000') +TO (MAXVALUE); diff --git a/crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/down.sql b/crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/down.sql new file mode 100644 index 0000000000000..3583887435168 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/down.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS event_emit_package; +DROP TABLE IF EXISTS event_emit_module; +DROP TABLE IF EXISTS event_struct_package; +DROP TABLE IF EXISTS event_struct_module; +DROP TABLE IF EXISTS event_struct_name; +DROP TABLE IF EXISTS event_struct_instantiation; +DROP TABLE IF EXISTS event_senders; diff --git a/crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/up.sql b/crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/up.sql new file mode 100644 index 0000000000000..a89625146a9fd --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-06-14-045801_event_indices/up.sql @@ -0,0 +1,74 @@ +CREATE TABLE event_emit_package +( + package BYTEA NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_emit_package_sender ON event_emit_package (sender, package, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_emit_module +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_emit_module_sender ON event_emit_module (sender, package, module, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_struct_package +( + package BYTEA NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_package_sender ON event_struct_package (sender, package, tx_sequence_number, event_sequence_number); + + +CREATE TABLE event_struct_module +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_module_sender ON event_struct_module (sender, package, module, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_struct_name +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + type_name TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, type_name, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_name_sender ON event_struct_name (sender, package, module, type_name, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_struct_instantiation +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + type_instantiation TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, type_instantiation, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_instantiation_sender ON event_struct_instantiation (sender, package, module, type_instantiation, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_senders +( + sender BYTEA NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + PRIMARY KEY(sender, tx_sequence_number, event_sequence_number) +); diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index b3897f2fe8935..7a8f8efe67fa9 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -49,8 +49,8 @@ use crate::db::ConnectionPool; use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver}; use crate::store::{IndexerStore, PgIndexerStore}; use crate::types::{ - IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, - IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, + EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, + IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, }; use super::tx_processor::EpochEndIndexingObjectStore; @@ -215,6 +215,7 @@ where 0, //first_checkpoint_id None, ), + network_total_transactions: 0, })); } @@ -241,7 +242,12 @@ where let event = bcs::from_bytes::(&epoch_event.contents)?; // Now we just entered epoch X, we want to calculate the diff between - // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2) + // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2). Note that on + // the indexer's chain-reading side, this is not guaranteed to have the latest data. Rather + // than impose a wait on the reading side, however, we overwrite this on the persisting + // side, where we can guarantee that the previous epoch's checkpoints have been written to + // db. + let network_tx_count_prev_epoch = match system_state.epoch { // If first epoch change, this number is 0 1 => Ok(0), @@ -265,6 +271,7 @@ where checkpoint_summary.sequence_number + 1, // first_checkpoint_id Some(&event), ), + network_total_transactions: checkpoint_summary.network_total_transactions, })) } @@ -287,20 +294,21 @@ where let object_history_changes: TransactionObjectChangesToCommit = Self::index_objects_history(data.clone(), package_resolver.clone()).await?; - let (checkpoint, db_transactions, db_events, db_indices, db_displays) = { + let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = { let CheckpointData { transactions, checkpoint_summary, checkpoint_contents, } = data; - let (db_transactions, db_events, db_indices, db_displays) = Self::index_transactions( - transactions, - &checkpoint_summary, - &checkpoint_contents, - &metrics, - ) - .await?; + let (db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = + Self::index_transactions( + transactions, + &checkpoint_summary, + &checkpoint_contents, + &metrics, + ) + .await?; let successful_tx_num: u64 = db_transactions.iter().map(|t| t.successful_tx_num).sum(); ( @@ -311,7 +319,8 @@ where ), db_transactions, db_events, - db_indices, + db_tx_indices, + db_event_indices, db_displays, ) }; @@ -334,7 +343,8 @@ where checkpoint, transactions: db_transactions, events: db_events, - tx_indices: db_indices, + tx_indices: db_tx_indices, + event_indices: db_event_indices, display_updates: db_displays, object_changes, object_history_changes, @@ -352,6 +362,7 @@ where Vec, Vec, Vec, + Vec, BTreeMap, )> { let checkpoint_seq = checkpoint_summary.sequence_number(); @@ -372,7 +383,8 @@ where let mut db_transactions = Vec::new(); let mut db_events = Vec::new(); let mut db_displays = BTreeMap::new(); - let mut db_indices = Vec::new(); + let mut db_tx_indices = Vec::new(); + let mut db_event_indices = Vec::new(); for tx in transactions { let CheckpointTransaction { @@ -390,6 +402,7 @@ where checkpoint_seq, tx_digest, sender_signed_data.digest() ))); } + let tx = sender_signed_data.transaction_data(); let events = events .as_ref() @@ -413,6 +426,12 @@ where ) })); + db_event_indices.extend( + events.iter().enumerate().map(|(idx, event)| { + EventIndex::from_event(tx_sequence_number, idx as u64, event) + }), + ); + db_displays.extend( events .iter() @@ -440,7 +459,7 @@ where object_changes, balance_change, events, - transaction_kind, + transaction_kind: transaction_kind.clone(), successful_tx_num: if fx.status().is_ok() { tx.kind().tx_count() as u64 } else { @@ -468,8 +487,8 @@ where // Payers let payers = vec![tx.gas_owner()]; - // Senders - let senders = vec![tx.sender()]; + // Sender + let sender = tx.sender(); // Recipients let recipients = fx @@ -489,19 +508,26 @@ where .map(|(p, m, f)| (*<&ObjectID>::clone(p), m.to_string(), f.to_string())) .collect(); - db_indices.push(TxIndex { + db_tx_indices.push(TxIndex { tx_sequence_number, transaction_digest: tx_digest, checkpoint_sequence_number: *checkpoint_seq, input_objects, changed_objects, - senders, + sender, payers, recipients, move_calls, + tx_kind: transaction_kind, }); } - Ok((db_transactions, db_events, db_indices, db_displays)) + Ok(( + db_transactions, + db_events, + db_tx_indices, + db_event_indices, + db_displays, + )) } pub(crate) async fn index_objects( diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index eb1dc7365f2b8..58d6fe94e1f7c 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -89,6 +89,7 @@ async fn commit_checkpoints( let mut tx_batch = vec![]; let mut events_batch = vec![]; let mut tx_indices_batch = vec![]; + let mut event_indices_batch = vec![]; let mut display_updates_batch = BTreeMap::new(); let mut object_changes_batch = vec![]; let mut object_history_changes_batch = vec![]; @@ -99,6 +100,7 @@ async fn commit_checkpoints( checkpoint, transactions, events, + event_indices, tx_indices, display_updates, object_changes, @@ -110,6 +112,7 @@ async fn commit_checkpoints( tx_batch.push(transactions); events_batch.push(events); tx_indices_batch.push(tx_indices); + event_indices_batch.push(event_indices); display_updates_batch.extend(display_updates.into_iter()); object_changes_batch.push(object_changes); object_history_changes_batch.push(object_history_changes); @@ -123,6 +126,10 @@ async fn commit_checkpoints( let tx_batch = tx_batch.into_iter().flatten().collect::>(); let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::>(); let events_batch = events_batch.into_iter().flatten().collect::>(); + let event_indices_batch = event_indices_batch + .into_iter() + .flatten() + .collect::>(); let packages_batch = packages_batch.into_iter().flatten().collect::>(); let checkpoint_num = checkpoint_batch.len(); let tx_count = tx_batch.len(); @@ -133,6 +140,7 @@ async fn commit_checkpoints( state.persist_transactions(tx_batch), state.persist_tx_indices(tx_indices_batch), state.persist_events(events_batch), + state.persist_event_indices(event_indices_batch), state.persist_displays(display_updates_batch), state.persist_packages(packages_batch), state.persist_objects(object_changes_batch.clone()), diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index ca27e92a0bf41..2a6578fc18295 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -6,8 +6,8 @@ use std::collections::BTreeMap; use crate::{ models::display::StoredDisplay, types::{ - IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, - IndexedPackage, IndexedTransaction, TxIndex, + EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, + IndexedObject, IndexedPackage, IndexedTransaction, TxIndex, }, }; @@ -22,6 +22,7 @@ pub struct CheckpointDataToCommit { pub checkpoint: IndexedCheckpoint, pub transactions: Vec, pub events: Vec, + pub event_indices: Vec, pub tx_indices: Vec, pub display_updates: BTreeMap, pub object_changes: TransactionObjectChangesToCommit, @@ -40,4 +41,5 @@ pub struct TransactionObjectChangesToCommit { pub struct EpochToCommit { pub last_epoch: Option, pub new_epoch: IndexedEpochInfo, + pub network_total_transactions: u64, } diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index 00408945140bd..f05098a764185 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -777,14 +777,14 @@ impl IndexerReader { let package = Hex::encode(package.to_vec()); match (module, function) { (Some(module), Some(function)) => ( - "tx_calls".into(), + "tx_calls_fun".into(), format!( "package = '\\x{}'::bytea AND module = '{}' AND func = '{}'", package, module, function ), ), (Some(module), None) => ( - "tx_calls".into(), + "tx_calls_mod".into(), format!( "package = '\\x{}'::bytea AND module = '{}'", package, module @@ -792,11 +792,11 @@ impl IndexerReader { ), (None, Some(_)) => { return Err(IndexerError::InvalidArgumentError( - "Function cannot be present wihtout Module.".into(), + "Function cannot be present without Module.".into(), )); } (None, None) => ( - "tx_calls".into(), + "tx_calls_pkg".into(), format!("package = '\\x{}'::bytea", package), ), } diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 36d788f5fd580..34978836db5d2 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -139,6 +139,8 @@ pub struct IndexerMetrics { pub checkpoint_db_commit_latency_objects_history_chunks: Histogram, pub checkpoint_db_commit_latency_events: Histogram, pub checkpoint_db_commit_latency_events_chunks: Histogram, + pub checkpoint_db_commit_latency_event_indices: Histogram, + pub checkpoint_db_commit_latency_event_indices_chunks: Histogram, pub checkpoint_db_commit_latency_packages: Histogram, pub checkpoint_db_commit_latency_tx_indices: Histogram, pub checkpoint_db_commit_latency_tx_indices_chunks: Histogram, @@ -494,6 +496,20 @@ impl IndexerMetrics { registry, ) .unwrap(), + checkpoint_db_commit_latency_event_indices: register_histogram_with_registry!( + "checkpoint_db_commit_latency_event_indices", + "Time spent commiting event indices", + DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + checkpoint_db_commit_latency_event_indices_chunks: register_histogram_with_registry!( + "checkpoint_db_commit_latency_event_indices_chunks", + "Time spent commiting event indices chunks", + DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), checkpoint_db_commit_latency_packages: register_histogram_with_registry!( "checkpoint_db_commit_latency_packages", "Time spent commiting packages", diff --git a/crates/sui-indexer/src/models/checkpoints.rs b/crates/sui-indexer/src/models/checkpoints.rs index 260fcfb5944f2..69e2618c82297 100644 --- a/crates/sui-indexer/src/models/checkpoints.rs +++ b/crates/sui-indexer/src/models/checkpoints.rs @@ -42,6 +42,8 @@ pub struct StoredCheckpoint { pub checkpoint_commitments: Vec, pub validator_signature: Vec, pub end_of_epoch_data: Option>, + pub min_tx_sequence_number: Option, + pub max_tx_sequence_number: Option, } impl From<&IndexedCheckpoint> for StoredCheckpoint { @@ -83,6 +85,8 @@ impl From<&IndexedCheckpoint> for StoredCheckpoint { .as_ref() .map(|d| bcs::to_bytes(d).unwrap()), end_of_epoch: c.end_of_epoch_data.is_some(), + min_tx_sequence_number: Some(c.min_tx_sequence_number as i64), + max_tx_sequence_number: Some(c.max_tx_sequence_number as i64), } } } diff --git a/crates/sui-indexer/src/models/event_indices.rs b/crates/sui-indexer/src/models/event_indices.rs new file mode 100644 index 0000000000000..08f17cce339d5 --- /dev/null +++ b/crates/sui-indexer/src/models/event_indices.rs @@ -0,0 +1,145 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + schema::{ + event_emit_module, event_emit_package, event_senders, event_struct_instantiation, + event_struct_module, event_struct_name, event_struct_package, + }, + types::EventIndex, +}; +use diesel::prelude::*; + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_emit_package)] +pub struct StoredEventEmitPackage { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_emit_module)] +pub struct StoredEventEmitModule { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_senders)] +pub struct StoredEventSenders { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_package)] +pub struct StoredEventStructPackage { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_module)] +pub struct StoredEventStructModule { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_name)] +pub struct StoredEventStructName { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub type_name: String, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_instantiation)] +pub struct StoredEventStructInstantiation { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub type_instantiation: String, + pub sender: Vec, +} + +impl EventIndex { + pub fn split( + self: EventIndex, + ) -> ( + StoredEventEmitPackage, + StoredEventEmitModule, + StoredEventSenders, + StoredEventStructPackage, + StoredEventStructModule, + StoredEventStructName, + StoredEventStructInstantiation, + ) { + let tx_sequence_number = self.tx_sequence_number as i64; + let event_sequence_number = self.event_sequence_number as i64; + ( + StoredEventEmitPackage { + tx_sequence_number, + event_sequence_number, + package: self.emit_package.to_vec(), + sender: self.sender.to_vec(), + }, + StoredEventEmitModule { + tx_sequence_number, + event_sequence_number, + package: self.emit_package.to_vec(), + module: self.emit_module.clone(), + sender: self.sender.to_vec(), + }, + StoredEventSenders { + tx_sequence_number, + event_sequence_number, + sender: self.sender.to_vec(), + }, + StoredEventStructPackage { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + sender: self.sender.to_vec(), + }, + StoredEventStructModule { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + module: self.type_module.clone(), + sender: self.sender.to_vec(), + }, + StoredEventStructName { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + module: self.type_module.clone(), + type_name: self.type_name.clone(), + sender: self.sender.to_vec(), + }, + StoredEventStructInstantiation { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + module: self.type_module.clone(), + type_instantiation: self.type_instantiation.clone(), + sender: self.sender.to_vec(), + }, + ) + } +} diff --git a/crates/sui-indexer/src/models/mod.rs b/crates/sui-indexer/src/models/mod.rs index 3b8233ec45021..b677e09f1aaad 100644 --- a/crates/sui-indexer/src/models/mod.rs +++ b/crates/sui-indexer/src/models/mod.rs @@ -4,7 +4,9 @@ pub mod checkpoints; pub mod display; pub mod epoch; +pub mod event_indices; pub mod events; +pub mod obj_indices; pub mod objects; pub mod packages; pub mod transactions; diff --git a/crates/sui-indexer/src/models/obj_indices.rs b/crates/sui-indexer/src/models/obj_indices.rs new file mode 100644 index 0000000000000..7e5298008834c --- /dev/null +++ b/crates/sui-indexer/src/models/obj_indices.rs @@ -0,0 +1,40 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use diesel::prelude::*; + +use crate::schema::objects_version; + +use super::objects::StoredDeletedObject; +use super::objects::StoredObject; + +/// Model types related to tables that support efficient execution of queries on the `objects`, +/// `objects_history` and `objects_snapshot` tables. + +#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)] +#[diesel(table_name = objects_version, primary_key(object_id, object_version))] +pub struct StoredObjectVersion { + pub object_id: Vec, + pub object_version: i64, + pub cp_sequence_number: i64, +} + +impl From<&StoredObject> for StoredObjectVersion { + fn from(o: &StoredObject) -> Self { + Self { + object_id: o.object_id.clone(), + object_version: o.object_version, + cp_sequence_number: o.checkpoint_sequence_number, + } + } +} + +impl From<&StoredDeletedObject> for StoredObjectVersion { + fn from(o: &StoredDeletedObject) -> Self { + Self { + object_id: o.object_id.clone(), + object_version: o.object_version, + cp_sequence_number: o.checkpoint_sequence_number, + } + } +} diff --git a/crates/sui-indexer/src/models/packages.rs b/crates/sui-indexer/src/models/packages.rs index 63f61f01fc428..97c8e8fc5b459 100644 --- a/crates/sui-indexer/src/models/packages.rs +++ b/crates/sui-indexer/src/models/packages.rs @@ -10,14 +10,20 @@ use diesel::prelude::*; #[diesel(table_name = packages, primary_key(package_id))] pub struct StoredPackage { pub package_id: Vec, + pub original_id: Vec, + pub package_version: i64, pub move_package: Vec, + pub checkpoint_sequence_number: i64, } impl From for StoredPackage { fn from(p: IndexedPackage) -> Self { Self { package_id: p.package_id.to_vec(), + original_id: p.move_package.original_package_id().to_vec(), + package_version: p.move_package.version().value() as i64, move_package: bcs::to_bytes(&p.move_package).unwrap(), + checkpoint_sequence_number: p.checkpoint_sequence_number as i64, } } } diff --git a/crates/sui-indexer/src/models/tx_indices.rs b/crates/sui-indexer/src/models/tx_indices.rs index 86c4ae4c73819..4f942c2e7af0b 100644 --- a/crates/sui-indexer/src/models/tx_indices.rs +++ b/crates/sui-indexer/src/models/tx_indices.rs @@ -3,7 +3,8 @@ use crate::{ schema::{ - tx_calls, tx_changed_objects, tx_digests, tx_input_objects, tx_recipients, tx_senders, + tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, + tx_kinds, tx_recipients, tx_senders, }, types::TxIndex, }; @@ -24,7 +25,6 @@ pub struct TxDigest { #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_senders)] pub struct StoredTxSenders { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub sender: Vec, } @@ -32,35 +32,52 @@ pub struct StoredTxSenders { #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_recipients)] pub struct StoredTxRecipients { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub recipient: Vec, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_input_objects)] pub struct StoredTxInputObject { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub object_id: Vec, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_changed_objects)] pub struct StoredTxChangedObject { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub object_id: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_calls_pkg)] +pub struct StoredTxPkg { + pub tx_sequence_number: i64, + pub package: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_calls_mod)] +pub struct StoredTxMod { + pub tx_sequence_number: i64, + pub package: Vec, + pub module: String, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] -#[diesel(table_name = tx_calls)] -pub struct StoredTxCalls { - pub cp_sequence_number: i64, +#[diesel(table_name = tx_calls_fun)] +pub struct StoredTxFun { pub tx_sequence_number: i64, pub package: Vec, pub module: String, pub func: String, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] @@ -68,7 +85,13 @@ pub struct StoredTxCalls { pub struct StoredTxDigest { pub tx_digest: Vec, pub tx_sequence_number: i64, - pub cp_sequence_number: i64, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_kinds)] +pub struct StoredTxKind { + pub tx_kind: i16, + pub tx_sequence_number: i64, } #[allow(clippy::type_complexity)] @@ -80,71 +103,109 @@ impl TxIndex { Vec, Vec, Vec, - Vec, + Vec, + Vec, + Vec, Vec, + Vec, ) { let tx_sequence_number = self.tx_sequence_number as i64; - let cp_sequence_number = self.checkpoint_sequence_number as i64; - let tx_senders = self - .senders - .iter() - .map(|s| StoredTxSenders { - cp_sequence_number, - tx_sequence_number, - sender: s.to_vec(), - }) - .collect(); + let tx_sender = StoredTxSenders { + tx_sequence_number, + sender: self.sender.to_vec(), + }; let tx_recipients = self .recipients .iter() .map(|s| StoredTxRecipients { - cp_sequence_number, tx_sequence_number, recipient: s.to_vec(), + sender: self.sender.to_vec(), }) .collect(); let tx_input_objects = self .input_objects .iter() .map(|o| StoredTxInputObject { - cp_sequence_number, tx_sequence_number, object_id: bcs::to_bytes(&o).unwrap(), + sender: self.sender.to_vec(), }) .collect(); let tx_changed_objects = self .changed_objects .iter() .map(|o| StoredTxChangedObject { - cp_sequence_number, tx_sequence_number, object_id: bcs::to_bytes(&o).unwrap(), + sender: self.sender.to_vec(), }) .collect(); - let tx_calls = self + + let mut packages = Vec::new(); + let mut packages_modules = Vec::new(); + let mut packages_modules_funcs = Vec::new(); + + for (pkg, pkg_mod, pkg_mod_func) in self .move_calls .iter() - .map(|(p, m, f)| StoredTxCalls { - cp_sequence_number, + .map(|(p, m, f)| (*p, (*p, m.clone()), (*p, m.clone(), f.clone()))) + { + packages.push(pkg); + packages_modules.push(pkg_mod); + packages_modules_funcs.push(pkg_mod_func); + } + + let tx_pkgs = packages + .iter() + .map(|p| StoredTxPkg { + tx_sequence_number, + package: p.to_vec(), + sender: self.sender.to_vec(), + }) + .collect(); + + let tx_mods = packages_modules + .iter() + .map(|(p, m)| StoredTxMod { + tx_sequence_number, + package: p.to_vec(), + module: m.to_string(), + sender: self.sender.to_vec(), + }) + .collect(); + + let tx_calls = packages_modules_funcs + .iter() + .map(|(p, m, f)| StoredTxFun { tx_sequence_number, package: p.to_vec(), module: m.to_string(), func: f.to_string(), + sender: self.sender.to_vec(), }) .collect(); + let stored_tx_digest = StoredTxDigest { tx_digest: self.transaction_digest.into_inner().to_vec(), tx_sequence_number, - cp_sequence_number, + }; + + let tx_kind = StoredTxKind { + tx_kind: self.tx_kind as i16, + tx_sequence_number, }; ( - tx_senders, + vec![tx_sender], tx_recipients, tx_input_objects, tx_changed_objects, + tx_pkgs, + tx_mods, tx_calls, vec![stored_tx_digest], + vec![tx_kind], ) } } diff --git a/crates/sui-indexer/src/schema/mod.rs b/crates/sui-indexer/src/schema/mod.rs index d1d408d76a307..8f98297ac4d80 100644 --- a/crates/sui-indexer/src/schema/mod.rs +++ b/crates/sui-indexer/src/schema/mod.rs @@ -16,17 +16,28 @@ mod inner { pub use crate::schema::pg::checkpoints; pub use crate::schema::pg::display; pub use crate::schema::pg::epochs; + pub use crate::schema::pg::event_emit_module; + pub use crate::schema::pg::event_emit_package; + pub use crate::schema::pg::event_senders; + pub use crate::schema::pg::event_struct_instantiation; + pub use crate::schema::pg::event_struct_module; + pub use crate::schema::pg::event_struct_name; + pub use crate::schema::pg::event_struct_package; pub use crate::schema::pg::events; pub use crate::schema::pg::objects; pub use crate::schema::pg::objects_history; pub use crate::schema::pg::objects_snapshot; + pub use crate::schema::pg::objects_version; pub use crate::schema::pg::packages; pub use crate::schema::pg::pruner_cp_watermark; pub use crate::schema::pg::transactions; - pub use crate::schema::pg::tx_calls; + pub use crate::schema::pg::tx_calls_fun; + pub use crate::schema::pg::tx_calls_mod; + pub use crate::schema::pg::tx_calls_pkg; pub use crate::schema::pg::tx_changed_objects; pub use crate::schema::pg::tx_digests; pub use crate::schema::pg::tx_input_objects; + pub use crate::schema::pg::tx_kinds; pub use crate::schema::pg::tx_recipients; pub use crate::schema::pg::tx_senders; } @@ -38,17 +49,28 @@ mod inner { pub use crate::schema::mysql::checkpoints; pub use crate::schema::mysql::display; pub use crate::schema::mysql::epochs; + pub use crate::schema::mysql::event_emit_module; + pub use crate::schema::mysql::event_emit_package; + pub use crate::schema::mysql::event_senders; + pub use crate::schema::mysql::event_struct_instantiation; + pub use crate::schema::mysql::event_struct_module; + pub use crate::schema::mysql::event_struct_name; + pub use crate::schema::mysql::event_struct_package; pub use crate::schema::mysql::events; pub use crate::schema::mysql::objects; pub use crate::schema::mysql::objects_history; pub use crate::schema::mysql::objects_snapshot; + pub use crate::schema::mysql::objects_version; pub use crate::schema::mysql::packages; pub use crate::schema::mysql::pruner_cp_watermark; pub use crate::schema::mysql::transactions; - pub use crate::schema::mysql::tx_calls; + pub use crate::schema::mysql::tx_calls_fun; + pub use crate::schema::mysql::tx_calls_mod; + pub use crate::schema::mysql::tx_calls_pkg; pub use crate::schema::mysql::tx_changed_objects; pub use crate::schema::mysql::tx_digests; pub use crate::schema::mysql::tx_input_objects; + pub use crate::schema::mysql::tx_kinds; pub use crate::schema::mysql::tx_recipients; pub use crate::schema::mysql::tx_senders; } @@ -57,17 +79,28 @@ pub use inner::chain_identifier; pub use inner::checkpoints; pub use inner::display; pub use inner::epochs; +pub use inner::event_emit_module; +pub use inner::event_emit_package; +pub use inner::event_senders; +pub use inner::event_struct_instantiation; +pub use inner::event_struct_module; +pub use inner::event_struct_name; +pub use inner::event_struct_package; pub use inner::events; pub use inner::objects; pub use inner::objects_history; pub use inner::objects_snapshot; +pub use inner::objects_version; pub use inner::packages; pub use inner::pruner_cp_watermark; pub use inner::transactions; -pub use inner::tx_calls; +pub use inner::tx_calls_fun; +pub use inner::tx_calls_mod; +pub use inner::tx_calls_pkg; pub use inner::tx_changed_objects; pub use inner::tx_digests; pub use inner::tx_input_objects; +pub use inner::tx_kinds; pub use inner::tx_recipients; pub use inner::tx_senders; diff --git a/crates/sui-indexer/src/schema/mysql.rs b/crates/sui-indexer/src/schema/mysql.rs index 10cdc089c8884..e7805ae562be0 100644 --- a/crates/sui-indexer/src/schema/mysql.rs +++ b/crates/sui-indexer/src/schema/mysql.rs @@ -26,6 +26,8 @@ diesel::table! { checkpoint_commitments -> Mediumblob, validator_signature -> Blob, end_of_epoch_data -> Nullable, + min_tx_sequence_number -> Nullable, + max_tx_sequence_number -> Nullable } } @@ -80,6 +82,74 @@ diesel::table! { } } +diesel::table! { + event_emit_module (package, module, tx_sequence_number, event_sequence_number) { + package -> Blob, + module -> Text, + tx_sequence_number -> Bigint, + event_sequence_number -> Bigint, + sender -> Blob, + } +} + +diesel::table! { + event_emit_package (package, tx_sequence_number, event_sequence_number) { + package -> Blob, + tx_sequence_number -> Bigint, + event_sequence_number -> Bigint, + sender -> Blob, + } +} + +diesel::table! { + event_senders (sender, tx_sequence_number, event_sequence_number) { + sender -> Blob, + tx_sequence_number -> Bigint, + event_sequence_number -> Bigint, + } +} + +diesel::table! { + event_struct_instantiation (package, module, type_instantiation, tx_sequence_number, event_sequence_number) { + package -> Blob, + module -> Text, + type_instantiation -> Text, + tx_sequence_number -> Bigint, + event_sequence_number -> Bigint, + sender -> Blob, + } +} + +diesel::table! { + event_struct_module (package, module, tx_sequence_number, event_sequence_number) { + package -> Blob, + module -> Text, + tx_sequence_number -> Bigint, + event_sequence_number -> Bigint, + sender -> Blob, + } +} + +diesel::table! { + event_struct_name (package, module, type_name, tx_sequence_number, event_sequence_number) { + package -> Blob, + module -> Text, + type_name -> Text, + tx_sequence_number -> Bigint, + event_sequence_number -> Bigint, + sender -> Blob, + } +} + +diesel::table! { + event_struct_package (package, tx_sequence_number, event_sequence_number) { + package -> Blob, + tx_sequence_number -> Bigint, + event_sequence_number -> Bigint, + sender -> Blob, + } +} + diesel::table! { objects (object_id) { object_id -> Blob, @@ -148,10 +218,21 @@ diesel::table! { } } +diesel::table! { + objects_version (object_id, object_version) { + object_id -> Blob, + object_version -> Bigint, + cp_sequence_number -> Bigint, + } +} + diesel::table! { packages (package_id) { package_id -> Blob, + original_id -> Blob, + package_version -> Bigint, move_package -> Mediumblob, + checkpoint_sequence_number -> Bigint, } } @@ -180,12 +261,29 @@ diesel::table! { } diesel::table! { - tx_calls (package, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Bigint, + tx_calls_fun (package, module, func, tx_sequence_number) { tx_sequence_number -> Bigint, package -> Blob, module -> Text, func -> Text, + sender -> Blob, + } +} + +diesel::table! { + tx_calls_mod (package, module, tx_sequence_number) { + tx_sequence_number -> Bigint, + package -> Blob, + module -> Text, + sender -> Blob, + } +} + +diesel::table! { + tx_calls_pkg (package, tx_sequence_number) { + tx_sequence_number -> Bigint, + package -> Blob, + sender -> Blob, } } @@ -194,6 +292,7 @@ diesel::table! { cp_sequence_number -> Bigint, tx_sequence_number -> Bigint, object_id -> Blob, + sender -> Blob, } } @@ -210,6 +309,14 @@ diesel::table! { cp_sequence_number -> Bigint, tx_sequence_number -> Bigint, object_id -> Blob, + sender -> Blob, + } +} + +diesel::table! { + tx_kinds (tx_kind, tx_sequence_number) { + tx_sequence_number -> Bigint, + tx_kind -> Smallint, } } @@ -218,6 +325,7 @@ diesel::table! { cp_sequence_number -> Bigint, tx_sequence_number -> Bigint, recipient -> Blob, + sender -> Blob, } } @@ -235,18 +343,30 @@ macro_rules! for_all_tables { $action!( chain_identifier, checkpoints, + pruner_cp_watermark, + display, epochs, + event_emit_module, + event_emit_package, + event_senders, + event_struct_instantiation, + event_struct_module, + event_struct_name, + event_struct_package, events, objects, objects_history, objects_snapshot, + objects_version, packages, - pruner_cp_watermark, transactions, - tx_calls, + tx_calls_fun, + tx_calls_mod, + tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, + tx_kinds, tx_recipients, tx_senders ); diff --git a/crates/sui-indexer/src/schema/pg.rs b/crates/sui-indexer/src/schema/pg.rs index 564f7cd721343..1ef8e0aed8e81 100644 --- a/crates/sui-indexer/src/schema/pg.rs +++ b/crates/sui-indexer/src/schema/pg.rs @@ -26,6 +26,8 @@ diesel::table! { checkpoint_commitments -> Bytea, validator_signature -> Bytea, end_of_epoch_data -> Nullable, + min_tx_sequence_number -> Nullable, + max_tx_sequence_number -> Nullable } } @@ -71,7 +73,75 @@ diesel::table! { } diesel::table! { - events (tx_sequence_number, event_sequence_number, checkpoint_sequence_number) { + event_emit_module (package, module, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_emit_package (package, tx_sequence_number, event_sequence_number) { + package -> Bytea, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_senders (sender, tx_sequence_number, event_sequence_number) { + sender -> Bytea, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + } +} + +diesel::table! { + event_struct_instantiation (package, module, type_instantiation, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + type_instantiation -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_struct_module (package, module, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_struct_name (package, module, type_name, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + type_name -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_struct_package (package, tx_sequence_number, event_sequence_number) { + package -> Bytea, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + events (tx_sequence_number, event_sequence_number) { tx_sequence_number -> Int8, event_sequence_number -> Int8, transaction_digest -> Bytea, @@ -89,7 +159,7 @@ diesel::table! { } diesel::table! { - events_partition_0 (tx_sequence_number, event_sequence_number, checkpoint_sequence_number) { + events_partition_0 (tx_sequence_number, event_sequence_number) { tx_sequence_number -> Int8, event_sequence_number -> Int8, transaction_digest -> Bytea, @@ -198,14 +268,25 @@ diesel::table! { } diesel::table! { - packages (package_id) { + objects_version (object_id, object_version) { + object_id -> Bytea, + object_version -> Int8, + cp_sequence_number -> Int8, + } +} + +diesel::table! { + packages (package_id, original_id, package_version) { package_id -> Bytea, + original_id -> Bytea, + package_version -> Int8, move_package -> Bytea, + checkpoint_sequence_number -> Int8, } } diesel::table! { - transactions (tx_sequence_number, checkpoint_sequence_number) { + transactions (tx_sequence_number) { tx_sequence_number -> Int8, transaction_digest -> Bytea, raw_transaction -> Bytea, @@ -221,7 +302,7 @@ diesel::table! { } diesel::table! { - transactions_partition_0 (tx_sequence_number, checkpoint_sequence_number) { + transactions_partition_0 (tx_sequence_number) { tx_sequence_number -> Int8, transaction_digest -> Bytea, raw_transaction -> Bytea, @@ -237,50 +318,72 @@ diesel::table! { } diesel::table! { - tx_calls (package, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_calls_fun (package, module, func, tx_sequence_number) { tx_sequence_number -> Int8, package -> Bytea, module -> Text, func -> Text, + sender -> Bytea, } } diesel::table! { - tx_changed_objects (object_id, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_calls_mod (package, module, tx_sequence_number) { + tx_sequence_number -> Int8, + package -> Bytea, + module -> Text, + sender -> Bytea, + } +} + +diesel::table! { + tx_calls_pkg (package, tx_sequence_number) { + tx_sequence_number -> Int8, + package -> Bytea, + sender -> Bytea, + } +} + +diesel::table! { + tx_changed_objects (object_id, tx_sequence_number) { tx_sequence_number -> Int8, object_id -> Bytea, + sender -> Bytea, } } diesel::table! { tx_digests (tx_digest) { tx_digest -> Bytea, - cp_sequence_number -> Int8, tx_sequence_number -> Int8, } } diesel::table! { - tx_input_objects (object_id, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_input_objects (object_id, tx_sequence_number) { tx_sequence_number -> Int8, object_id -> Bytea, + sender -> Bytea, } } diesel::table! { - tx_recipients (recipient, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_kinds (tx_kind, tx_sequence_number) { + tx_sequence_number -> Int8, + tx_kind -> Int2, + } +} + +diesel::table! { + tx_recipients (recipient, tx_sequence_number) { tx_sequence_number -> Int8, recipient -> Bytea, + sender -> Bytea, } } diesel::table! { - tx_senders (sender, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_senders (sender, tx_sequence_number) { tx_sequence_number -> Int8, sender -> Bytea, } @@ -295,21 +398,33 @@ macro_rules! for_all_tables { pruner_cp_watermark, display, epochs, + event_emit_module, + event_emit_package, + event_senders, + event_struct_instantiation, + event_struct_module, + event_struct_name, + event_struct_package, events, objects, objects_history, objects_snapshot, + objects_version, packages, transactions, - tx_calls, + tx_calls_fun, + tx_calls_mod, + tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, + tx_kinds, tx_recipients, tx_senders ); }; } + pub use for_all_tables; for_all_tables!(diesel::allow_tables_to_appear_in_same_query); diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 868fe31416a6c..97516929ffe24 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -10,7 +10,9 @@ use crate::errors::IndexerError; use crate::handlers::{EpochToCommit, TransactionObjectChangesToCommit}; use crate::models::display::StoredDisplay; use crate::models::objects::{StoredDeletedObject, StoredObject}; -use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; +use crate::types::{ + EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex, +}; #[allow(clippy::large_enum_variant)] pub enum ObjectChangeToCommit { @@ -63,6 +65,11 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static { async fn persist_tx_indices(&self, indices: Vec) -> Result<(), IndexerError>; async fn persist_events(&self, events: Vec) -> Result<(), IndexerError>; + async fn persist_event_indices( + &self, + event_indices: Vec, + ) -> Result<(), IndexerError>; + async fn persist_displays( &self, display_updates: BTreeMap, diff --git a/crates/sui-indexer/src/store/mod.rs b/crates/sui-indexer/src/store/mod.rs index ae04c2ea1c1c2..920cc5817499c 100644 --- a/crates/sui-indexer/src/store/mod.rs +++ b/crates/sui-indexer/src/store/mod.rs @@ -312,6 +312,36 @@ pub mod diesel_macro { }}; } + #[macro_export] + macro_rules! persist_chunk_into_table { + ($table:expr, $chunk:expr, $pool:expr) => {{ + let now = std::time::Instant::now(); + let chunk_len = $chunk.len(); + transactional_blocking_with_retry!( + $pool, + |conn| { + for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!($table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!( + elapsed, + "Persisted {} rows to {}", + chunk_len, + stringify!($table), + ); + }) + .tap_err(|e| { + tracing::error!("Failed to persist {} with error: {}", stringify!($table), e); + }) + }}; + } + pub use blocking_call_is_ok_or_panic; pub use read_only_blocking; pub use read_only_repeatable_blocking; diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index e7fe8ce61d4f9..59e6ba3dc07ff 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -33,6 +33,7 @@ use crate::models::checkpoints::StoredCpTx; use crate::models::display::StoredDisplay; use crate::models::epoch::StoredEpochInfo; use crate::models::events::StoredEvent; +use crate::models::obj_indices::StoredObjectVersion; use crate::models::objects::{ StoredDeletedHistoryObject, StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot, @@ -40,13 +41,16 @@ use crate::models::objects::{ use crate::models::packages::StoredPackage; use crate::models::transactions::StoredTransaction; use crate::schema::{ - chain_identifier, checkpoints, display, epochs, events, objects, objects_history, - objects_snapshot, packages, pruner_cp_watermark, transactions, tx_calls, tx_changed_objects, - tx_digests, tx_input_objects, tx_recipients, tx_senders, + chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, + event_senders, event_struct_instantiation, event_struct_module, event_struct_name, + event_struct_package, events, objects, objects_history, objects_snapshot, objects_version, + packages, pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, + tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, }; +use crate::types::EventIndex; use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; use crate::{ - insert_or_ignore_into, on_conflict_do_update, read_only_blocking, + insert_or_ignore_into, on_conflict_do_update, persist_chunk_into_table, read_only_blocking, transactional_blocking_with_retry, }; @@ -69,7 +73,7 @@ macro_rules! chunk { }}; } -macro_rules! prune_tx_indice_table { +macro_rules! prune_tx_or_event_indice_table { ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => { diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx))) .execute($conn) @@ -462,6 +466,12 @@ impl PgIndexerStore { .eq(excluded(objects_snapshot::checkpoint_sequence_number)), objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)), objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)), + objects_snapshot::object_type_package + .eq(excluded(objects_snapshot::object_type_package)), + objects_snapshot::object_type_module + .eq(excluded(objects_snapshot::object_type_module)), + objects_snapshot::object_type_name + .eq(excluded(objects_snapshot::object_type_name)), objects_snapshot::object_type .eq(excluded(objects_snapshot::object_type)), objects_snapshot::serialized_object @@ -484,6 +494,9 @@ impl PgIndexerStore { .eq(excluded.checkpoint_sequence_number), objects_snapshot::owner_type.eq(excluded.owner_type), objects_snapshot::owner_id.eq(excluded.owner_id), + objects_snapshot::object_type_package.eq(excluded.object_type_package), + objects_snapshot::object_type_module.eq(excluded.object_type_module), + objects_snapshot::object_type_name.eq(excluded.object_type_name), objects_snapshot::object_type.eq(excluded.object_type), objects_snapshot::serialized_object.eq(excluded.serialized_object), objects_snapshot::coin_type.eq(excluded.coin_type), @@ -522,13 +535,16 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_objects_history_chunks .start_timer(); let mut mutated_objects: Vec = vec![]; + let mut object_versions: Vec = vec![]; let mut deleted_object_ids: Vec = vec![]; for object in objects { match object { ObjectChangeToCommit::MutatedObject(stored_object) => { + object_versions.push(StoredObjectVersion::from(&stored_object)); mutated_objects.push(stored_object.into()); } ObjectChangeToCommit::DeletedObject(stored_deleted_object) => { + object_versions.push(StoredObjectVersion::from(&stored_deleted_object)); deleted_object_ids.push(stored_deleted_object.into()); } } @@ -547,6 +563,11 @@ impl PgIndexerStore { ); } + for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + insert_or_ignore_into!(objects_version::table, object_version_chunk, conn); + } + for deleted_objects_chunk in deleted_object_ids.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { @@ -806,13 +827,144 @@ impl PgIndexerStore { }) } + async fn persist_event_indices_chunk( + &self, + indices: Vec, + ) -> Result<(), IndexerError> { + let guard = self + .metrics + .checkpoint_db_commit_latency_event_indices_chunks + .start_timer(); + let len = indices.len(); + let ( + event_emit_packages, + event_emit_modules, + event_senders, + event_struct_packages, + event_struct_modules, + event_struct_names, + event_struct_instantiations, + ) = indices.into_iter().map(|i| i.split()).fold( + ( + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ), + |( + mut event_emit_packages, + mut event_emit_modules, + mut event_senders, + mut event_struct_packages, + mut event_struct_modules, + mut event_struct_names, + mut event_struct_instantiations, + ), + index| { + event_emit_packages.push(index.0); + event_emit_modules.push(index.1); + event_senders.push(index.2); + event_struct_packages.push(index.3); + event_struct_modules.push(index.4); + event_struct_names.push(index.5); + event_struct_instantiations.push(index.6); + ( + event_emit_packages, + event_emit_modules, + event_senders, + event_struct_packages, + event_struct_modules, + event_struct_names, + event_struct_instantiations, + ) + }, + ); + + // Now persist all the event indices in parallel into their tables. + let mut futures = vec![]; + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_emit_package::table, + event_emit_packages, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_emit_module::table, + event_emit_modules, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_package::table, + event_struct_packages, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_module::table, + event_struct_modules, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_name::table, + event_struct_names, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_instantiation::table, + event_struct_instantiations, + &this.blocking_cp + ) + })); + + futures::future::join_all(futures) + .await + .into_iter() + .collect::, _>>() + .map_err(|e| { + tracing::error!("Failed to join event indices futures in a chunk: {}", e); + IndexerError::from(e) + })? + .into_iter() + .collect::, _>>() + .map_err(|e| { + IndexerError::PostgresWriteError(format!( + "Failed to persist all event indices in a chunk: {:?}", + e + )) + })?; + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} chunked event indices", len); + Ok(()) + } + async fn persist_tx_indices_chunk(&self, indices: Vec) -> Result<(), IndexerError> { let guard = self .metrics .checkpoint_db_commit_latency_tx_indices_chunks .start_timer(); let len = indices.len(); - let (senders, recipients, input_objects, changed_objects, calls, digests) = + let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) = indices.into_iter().map(|i| i.split()).fold( ( Vec::new(), @@ -821,29 +973,41 @@ impl PgIndexerStore { Vec::new(), Vec::new(), Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), ), |( mut tx_senders, mut tx_recipients, mut tx_input_objects, mut tx_changed_objects, - mut tx_calls, + mut tx_pkgs, + mut tx_mods, + mut tx_funs, mut tx_digests, + mut tx_kinds, ), index| { tx_senders.extend(index.0); tx_recipients.extend(index.1); tx_input_objects.extend(index.2); tx_changed_objects.extend(index.3); - tx_calls.extend(index.4); - tx_digests.extend(index.5); + tx_pkgs.extend(index.4); + tx_mods.extend(index.5); + tx_funs.extend(index.6); + tx_digests.extend(index.7); + tx_kinds.extend(index.8); ( tx_senders, tx_recipients, tx_input_objects, tx_changed_objects, - tx_calls, + tx_pkgs, + tx_mods, + tx_funs, tx_digests, + tx_kinds, ) }, ); @@ -932,14 +1096,15 @@ impl PgIndexerStore { tracing::error!("Failed to persist tx_changed_objects with error: {}", e); }) })); + futures.push(self.spawn_blocking_task(move |this| { let now = Instant::now(); - let calls_len = calls.len(); + let rows_len = pkgs.len(); transactional_blocking_with_retry!( &this.blocking_cp, |conn| { - for chunk in calls.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - insert_or_ignore_into!(tx_calls::table, chunk, conn); + for chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_calls_pkg::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -947,12 +1112,60 @@ impl PgIndexerStore { ) .tap_ok(|_| { let elapsed = now.elapsed().as_secs_f64(); - info!(elapsed, "Persisted {} rows to tx_calls tables", calls_len); + info!( + elapsed, + "Persisted {} rows to tx_calls_pkg tables", rows_len + ); }) .tap_err(|e| { - tracing::error!("Failed to persist tx_calls with error: {}", e); + tracing::error!("Failed to persist tx_calls_pkg with error: {}", e); }) })); + + futures.push(self.spawn_blocking_task(move |this| { + let now = Instant::now(); + let rows_len = mods.len(); + transactional_blocking_with_retry!( + &this.blocking_cp, + |conn| { + for chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_calls_mod::table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!(elapsed, "Persisted {} rows to tx_calls_mod table", rows_len); + }) + .tap_err(|e| { + tracing::error!("Failed to persist tx_calls_mod with error: {}", e); + }) + })); + + futures.push(self.spawn_blocking_task(move |this| { + let now = Instant::now(); + let rows_len = funs.len(); + transactional_blocking_with_retry!( + &this.blocking_cp, + |conn| { + for chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_calls_fun::table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!(elapsed, "Persisted {} rows to tx_calls_fun table", rows_len); + }) + .tap_err(|e| { + tracing::error!("Failed to persist tx_calls_fun with error: {}", e); + }) + })); + futures.push(self.spawn_blocking_task(move |this| { let now = Instant::now(); let calls_len = digests.len(); @@ -960,12 +1173,7 @@ impl PgIndexerStore { &this.blocking_cp, |conn| { for chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_digests::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_digests chunk to PostgresDB")?; + insert_or_ignore_into!(tx_digests::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -980,6 +1188,28 @@ impl PgIndexerStore { }) })); + futures.push(self.spawn_blocking_task(move |this| { + let now = Instant::now(); + let rows_len = kinds.len(); + transactional_blocking_with_retry!( + &this.blocking_cp, + |conn| { + for chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_kinds::table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + Duration::from_secs(60) + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!(elapsed, "Persisted {} rows to tx_kinds tables", rows_len); + }) + .tap_err(|e| { + tracing::error!("Failed to persist tx_kinds with error: {}", e); + }) + })); + futures::future::join_all(futures) .await .into_iter() @@ -1007,12 +1237,35 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_epoch .start_timer(); let epoch_id = epoch.new_epoch.epoch; + transactional_blocking_with_retry!( &self.blocking_cp, |conn| { if let Some(last_epoch) = &epoch.last_epoch { let last_epoch_id = last_epoch.epoch; - let last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + // Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because + // we are not guaranteed to have the latest data in db when this is set on + // indexer's chain-reading side. However, when we `persist_epoch`, the + // checkpoints from an epoch ago must have been indexed. + let previous_epoch_network_total_transactions = match epoch_id { + 0 | 1 => 0, + _ => { + let prev_epoch_id = epoch_id - 2; + let result = checkpoints::table + .filter(checkpoints::epoch.eq(prev_epoch_id as i64)) + .select(max(checkpoints::network_total_transactions)) + .first::>(conn) + .map(|o| o.unwrap_or(0))?; + + result as u64 + } + }; + + let epoch_total_transactions = epoch.network_total_transactions + - previous_epoch_network_total_transactions; + + let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64); info!(last_epoch_id, "Persisting epoch end data."); on_conflict_do_update!( epochs::table, @@ -1094,6 +1347,14 @@ impl PgIndexerStore { EpochPartitionData::compose_data(epoch_to_commit, last_epoch); let table_partitions = self.partition_manager.get_table_partitions()?; for (table, (_, last_partition)) in table_partitions { + // Only advance epoch partition for epoch partitioned tables. + if !self + .partition_manager + .get_strategy(&table) + .is_epoch_partitioned() + { + continue; + } let guard = self.metrics.advance_epoch_latency.start_timer(); self.partition_manager.advance_epoch( table.clone(), @@ -1147,47 +1408,121 @@ impl PgIndexerStore { ) } + fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> { + let (min_tx, max_tx) = (min_tx as i64, max_tx as i64); + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + prune_tx_or_event_indice_table!( + event_emit_module, + conn, + min_tx, + max_tx, + "Failed to prune event_emit_module table" + ); + prune_tx_or_event_indice_table!( + event_emit_package, + conn, + min_tx, + max_tx, + "Failed to prune event_emit_package table" + ); + prune_tx_or_event_indice_table![ + event_senders, + conn, + min_tx, + max_tx, + "Failed to prune event_senders table" + ]; + prune_tx_or_event_indice_table![ + event_struct_instantiation, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_instantiation table" + ]; + prune_tx_or_event_indice_table![ + event_struct_module, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_module table" + ]; + prune_tx_or_event_indice_table![ + event_struct_name, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_name table" + ]; + prune_tx_or_event_indice_table![ + event_struct_package, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_package table" + ]; + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + } + fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> { let (min_tx, max_tx) = (min_tx as i64, max_tx as i64); transactional_blocking_with_retry!( &self.blocking_cp, |conn| { - prune_tx_indice_table!( + prune_tx_or_event_indice_table!( tx_senders, conn, min_tx, max_tx, "Failed to prune tx_senders table" ); - prune_tx_indice_table!( + prune_tx_or_event_indice_table!( tx_recipients, conn, min_tx, max_tx, "Failed to prune tx_recipients table" ); - prune_tx_indice_table![ + prune_tx_or_event_indice_table![ tx_input_objects, conn, min_tx, max_tx, "Failed to prune tx_input_objects table" ]; - prune_tx_indice_table![ + prune_tx_or_event_indice_table![ tx_changed_objects, conn, min_tx, max_tx, "Failed to prune tx_changed_objects table" ]; - prune_tx_indice_table![ - tx_calls, + prune_tx_or_event_indice_table![ + tx_calls_pkg, conn, min_tx, max_tx, - "Failed to prune tx_calls table" + "Failed to prune tx_calls_pkg table" ]; - prune_tx_indice_table![ + prune_tx_or_event_indice_table![ + tx_calls_mod, + conn, + min_tx, + max_tx, + "Failed to prune tx_calls_mod table" + ]; + prune_tx_or_event_indice_table![ + tx_calls_fun, + conn, + min_tx, + max_tx, + "Failed to prune tx_calls_fun table" + ]; + prune_tx_or_event_indice_table![ tx_digests, conn, min_tx, @@ -1621,6 +1956,46 @@ impl IndexerStore for PgIndexerStore { .await } + async fn persist_event_indices(&self, indices: Vec) -> Result<(), IndexerError> { + if indices.is_empty() { + return Ok(()); + } + let len = indices.len(); + let guard = self + .metrics + .checkpoint_db_commit_latency_event_indices + .start_timer(); + let chunks = chunk!(indices, self.config.parallel_chunk_size); + + let futures = chunks + .into_iter() + .map(|chunk| { + self.spawn_task(move |this: Self| async move { + this.persist_event_indices_chunk(chunk).await + }) + }) + .collect::>(); + futures::future::join_all(futures) + .await + .into_iter() + .collect::, _>>() + .map_err(|e| { + tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e); + IndexerError::from(e) + })? + .into_iter() + .collect::, _>>() + .map_err(|e| { + IndexerError::PostgresWriteError(format!( + "Failed to persist all event_indices chunks: {:?}", + e + )) + })?; + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} event_indices chunks", len); + Ok(()) + } + async fn persist_tx_indices(&self, indices: Vec) -> Result<(), IndexerError> { if indices.is_empty() { return Ok(()); @@ -1716,6 +2091,21 @@ impl IndexerStore for PgIndexerStore { "Pruned transactions for checkpoint {} from tx {} to tx {}", cp, min_tx, max_tx ); + self.execute_in_blocking_worker(move |this| { + this.prune_event_indices_table(min_tx, max_tx) + }) + .await + .unwrap_or_else(|e| { + tracing::error!( + "Failed to prune events of transactions for cp {}: {}", + cp, + e + ); + }); + info!( + "Pruned events of transactions for checkpoint {} from tx {} to tx {}", + cp, min_tx, max_tx + ); self.metrics.last_pruned_transaction.set(max_tx as i64); self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp)) diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index f67f1ed6b4041..f27078ca47048 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -4,7 +4,7 @@ use diesel::r2d2::R2D2Connection; use diesel::sql_types::{BigInt, VarChar}; use diesel::{QueryableByName, RunQueryDsl}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::time::Duration; use tracing::{error, info}; @@ -44,22 +44,42 @@ GROUP BY table_name; pub struct PgPartitionManager { cp: ConnectionPool, + partition_strategies: HashMap<&'static str, PgPartitionStrategy>, } impl Clone for PgPartitionManager { fn clone(&self) -> PgPartitionManager { Self { cp: self.cp.clone(), + partition_strategies: self.partition_strategies.clone(), } } } +#[derive(Clone, Copy)] +pub enum PgPartitionStrategy { + CheckpointSequenceNumber, + TxSequenceNumber, + ObjectId, +} + +impl PgPartitionStrategy { + pub fn is_epoch_partitioned(&self) -> bool { + matches!( + self, + Self::CheckpointSequenceNumber | Self::TxSequenceNumber + ) + } +} + #[derive(Clone, Debug)] pub struct EpochPartitionData { last_epoch: u64, next_epoch: u64, last_epoch_start_cp: u64, next_epoch_start_cp: u64, + last_epoch_start_tx: u64, + next_epoch_start_tx: u64, } impl EpochPartitionData { @@ -68,18 +88,35 @@ impl EpochPartitionData { let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64; let next_epoch = epoch.new_epoch.epoch; let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id; + + // Determining the tx_sequence_number range for the epoch partition differs from the + // checkpoint_sequence_number range, because the former is a sum of total transactions - + // this sum already addresses the off-by-one. + let next_epoch_start_tx = epoch.network_total_transactions; + let last_epoch_start_tx = + next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64; + Self { last_epoch, next_epoch, last_epoch_start_cp, next_epoch_start_cp, + last_epoch_start_tx, + next_epoch_start_tx, } } } impl PgPartitionManager { pub fn new(cp: ConnectionPool) -> Result { - let manager = Self { cp }; + let mut partition_strategies = HashMap::new(); + partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber); + partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber); + partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId); + let manager = Self { + cp, + partition_strategies, + }; let tables = manager.get_table_partitions()?; info!( "Found {} tables with partitions : [{:?}]", @@ -116,12 +153,41 @@ impl PgPartitionManager { ) } + /// Tries to fetch the partitioning strategy for the given partitioned table. Defaults to + /// `CheckpointSequenceNumber` as the majority of our tables are partitioned on an epoch's + /// checkpoints today. + pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy { + self.partition_strategies + .get(table_name) + .copied() + .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber) + } + + pub fn determine_epoch_partition_range( + &self, + table_name: &str, + data: &EpochPartitionData, + ) -> Option<(u64, u64)> { + match self.get_strategy(table_name) { + PgPartitionStrategy::CheckpointSequenceNumber => { + Some((data.last_epoch_start_cp, data.next_epoch_start_cp)) + } + PgPartitionStrategy::TxSequenceNumber => { + Some((data.last_epoch_start_tx, data.next_epoch_start_tx)) + } + PgPartitionStrategy::ObjectId => None, + } + } + pub fn advance_epoch( &self, table: String, last_partition: u64, data: &EpochPartitionData, ) -> Result<(), IndexerError> { + let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else { + return Ok(()); + }; if data.next_epoch == 0 { tracing::info!("Epoch 0 partition has been created in the initial setup."); return Ok(()); @@ -136,8 +202,8 @@ impl PgPartitionManager { .bind::(table.clone()) .bind::(data.last_epoch as i64) .bind::(data.next_epoch as i64) - .bind::(data.last_epoch_start_cp as i64) - .bind::(data.next_epoch_start_cp as i64), + .bind::(partition_range.0 as i64) + .bind::(partition_range.1 as i64), conn, ) }, @@ -148,14 +214,14 @@ impl PgPartitionManager { transactional_blocking_with_retry!( &self.cp, |conn| { - RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start_cp}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start_cp = data.next_epoch_start_cp as i64, next_epoch = data.next_epoch as i64)), conn) + RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start = partition_range.1 as i64, next_epoch = data.next_epoch as i64)), conn) }, Duration::from_secs(10) )?; info!( "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}", - table, last_partition, data.next_epoch, data.last_epoch_start_cp + table, last_partition, data.next_epoch, partition_range.0 ); } else if last_partition != data.next_epoch { // skip when the partition is already advanced once, which is possible when indexer diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index 0d313250e2359..04409ecb53757 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -90,6 +90,8 @@ impl IndexedCheckpoint { } } +/// Represents system state and summary info at the start and end of an epoch. Optional fields are +/// populated at epoch boundary, since they cannot be determined at the start of the epoch. #[derive(Clone, Debug, Default)] pub struct IndexedEpochInfo { pub epoch: u64, @@ -135,6 +137,9 @@ impl IndexedEpochInfo { } } + /// Creates `IndexedEpochInfo` for epoch X-1 at the boundary of epoch X-1 to X. + /// `network_total_tx_num_at_last_epoch_end` is needed to determine the number of transactions + /// that occurred in the epoch X-1. pub fn from_end_of_epoch_data( system_state_summary: &SuiSystemStateSummary, last_checkpoint_summary: &CertifiedCheckpointSummary, @@ -218,6 +223,47 @@ impl IndexedEvent { } } +#[derive(Debug, Clone)] +pub struct EventIndex { + pub tx_sequence_number: u64, + pub event_sequence_number: u64, + pub sender: SuiAddress, + pub emit_package: ObjectID, + pub emit_module: String, + pub type_package: ObjectID, + pub type_module: String, + /// Struct name of the event, without type parameters. + pub type_name: String, + /// Type instantiation of the event, with type name and type parameters, if any. + pub type_instantiation: String, +} + +impl EventIndex { + pub fn from_event( + tx_sequence_number: u64, + event_sequence_number: u64, + event: &sui_types::event::Event, + ) -> Self { + let type_instantiation = event + .type_ + .to_canonical_string(/* with_prefix */ true) + .splitn(3, "::") + .collect::>()[2] + .to_string(); + Self { + tx_sequence_number, + event_sequence_number, + sender: event.sender, + emit_package: event.package_id, + emit_module: event.transaction_module.to_string(), + type_package: event.type_.address.into(), + type_module: event.type_.module.to_string(), + type_name: event.type_.name.to_string(), + type_instantiation, + } + } +} + #[derive(Debug, Copy, Clone)] pub enum OwnerType { Immutable = 0, @@ -340,12 +386,13 @@ pub struct IndexedTransaction { #[derive(Debug, Clone)] pub struct TxIndex { pub tx_sequence_number: u64, + pub tx_kind: TransactionKind, pub transaction_digest: TransactionDigest, pub checkpoint_sequence_number: u64, pub input_objects: Vec, pub changed_objects: Vec, pub payers: Vec, - pub senders: Vec, + pub sender: SuiAddress, pub recipients: Vec, pub move_calls: Vec<(ObjectID, String, String)>, }