From c1dfdcdb3d8a80aa7706701920bb6307fb62a1dc Mon Sep 17 00:00:00 2001 From: Sadhan Sood <106645797+sadhansood@users.noreply.github.com> Date: Sat, 4 May 2024 09:42:25 -0700 Subject: [PATCH] Add migrations for mysql (#17421) ## Description This PR adds migrations for mysql database. After this we should be able to use indexer-writer for data ingestion into a mysql database. ## Test plan Tested by running locally with: ``` cargo run --bin sui-indexer --features mysql-feature --no-default-features -- --db-url <> --rpc-client-url <> --fullnode-sync-worker --reset-db 2024-04-30T18:16:06.843439Z INFO sui_data_ingestion_core::worker_pool: received checkpoint for processing 1420 for workflow workflow 2024-04-30T18:16:06.843456Z INFO sui_indexer::handlers::checkpoint_handler: Indexing checkpoint data blob checkpoint_seq=1420 2024-04-30T18:16:06.843477Z INFO sui_indexer::handlers::checkpoint_handler: Resolving Move struct layouts for struct tags of size 1. 2024-04-30T18:16:06.845713Z INFO sui_indexer::handlers::checkpoint_handler: Indexed one checkpoint. checkpoint_seq=1388 2024-04-30T18:16:06.845739Z INFO sui_data_ingestion_core::worker_pool: finished checkpoint processing 1388 for workflow workflow in 33.194375ms 2024-04-30T18:16:06.845768Z INFO sui_data_ingestion_core::worker_pool: received checkpoint for processing 1396 for workflow workflow 2024-04-30T18:16:06.845784Z INFO sui_indexer::handlers::checkpoint_handler: Indexing checkpoint data blob checkpoint_seq=1396 2024-04-30T18:16:06.845804Z INFO sui_indexer::handlers::checkpoint_handler: Resolving Move struct layouts for struct tags of size 1. 2024-04-30T18:16:06.848785Z INFO sui_indexer::handlers::checkpoint_handler: Indexed one checkpoint. checkpoint_seq=1385 2024-04-30T18:16:06.848819Z INFO sui_data_ingestion_core::worker_pool: finished checkpoint processing 1385 for workflow workflow in 56.440792ms 2024-04-30T18:16:06.848861Z INFO sui_data_ingestion_core::worker_pool: received checkpoint for processing 1382 for workflow workflow 2024-04-30T18:16:06.848883Z INFO sui_indexer::handlers::checkpoint_handler: Indexing checkpoint data blob checkpoint_seq=1382 2024-04-30T18:16:06.848912Z INFO sui_indexer::handlers::checkpoint_handler: Resolving Move struct layouts for struct tags of size 1. 2024-04-30T18:16:06.849557Z INFO sui_indexer::handlers::checkpoint_handler: Indexed one checkpoint. checkpoint_seq=1383 2024-04-30T18:16:06.849573Z INFO sui_data_ingestion_core::worker_pool: finished checkpoint processing 1383 for workflow workflow in 39.8655ms ``` --- crates/sui-graphql-rpc/src/types/event.rs | 25 +- .../src/types/transaction_block_effects.rs | 10 +- crates/sui-indexer/diesel.toml | 2 +- .../2024-03-22-052019_events}/down.sql | 0 .../mysql/2024-03-22-052019_events/up.sql | 34 ++ .../mysql/2024-03-25-203621_objects/down.sql | 5 + .../mysql/2024-03-25-203621_objects/up.sql | 109 ++++++ .../2024-03-26-004025_transactions/down.sql | 5 + .../2024-03-26-004025_transactions/up.sql | 29 ++ .../2024-04-24-180008_checkpoints/down.sql | 1 + .../2024-04-24-180008_checkpoints/up.sql | 28 ++ .../mysql/2024-04-24-180207_epochs/down.sql | 1 + .../mysql/2024-04-24-180207_epochs/up.sql | 28 ++ .../mysql/2024-04-24-180249_packages/down.sql | 4 + .../mysql/2024-04-24-180249_packages/up.sql | 7 + .../2024-04-24-180418_tx_indices}/down.sql | 0 .../mysql/2024-04-24-180418_tx_indices/up.sql | 57 +++ .../2024-04-24-180727_display}/down.sql | 0 .../mysql/2024-04-24-180727_display/up.sql | 9 + .../down.sql | 3 + .../up.sql | 10 + .../down.sql | 0 .../up.sql | 24 ++ .../down.sql | 0 .../up.sql | 0 .../pg/2023-08-19-044020_events/down.sql | 2 + .../{ => pg}/2023-08-19-044020_events/up.sql | 0 .../2023-08-19-044023_objects/down.sql | 0 .../{ => pg}/2023-08-19-044023_objects/up.sql | 0 .../2023-08-19-044026_transactions/down.sql | 0 .../2023-08-19-044026_transactions/up.sql | 0 .../2023-08-19-044044_checkpoints/down.sql | 0 .../2023-08-19-044044_checkpoints/up.sql | 0 .../2023-08-19-044052_epochs/down.sql | 0 .../{ => pg}/2023-08-19-044052_epochs/up.sql | 0 .../2023-08-19-060729_packages/down.sql | 0 .../2023-08-19-060729_packages/up.sql | 0 .../pg/2023-10-06-204335_tx_indices/down.sql | 6 + .../2023-10-06-204335_tx_indices/up.sql | 0 .../pg/2023-10-07-160139_display/down.sql | 2 + .../{ => pg}/2023-10-07-160139_display/up.sql | 0 .../down.sql | 0 .../up.sql | 0 .../down.sql | 1 + .../up.sql | 0 crates/sui-indexer/src/db.rs | 153 +++++++- crates/sui-indexer/src/indexer_reader.rs | 9 +- crates/sui-indexer/src/models/checkpoints.rs | 84 +++- crates/sui-indexer/src/models/events.rs | 59 ++- crates/sui-indexer/src/models/objects.rs | 4 +- crates/sui-indexer/src/models/transactions.rs | 365 ++++++++++++++++-- crates/sui-indexer/src/models/tx_indices.rs | 2 +- crates/sui-indexer/src/store/mod.rs | 59 +++ .../sui-indexer/src/store/pg_indexer_store.rs | 235 +++++------ .../src/store/pg_partition_manager.rs | 17 +- 55 files changed, 1173 insertions(+), 216 deletions(-) rename crates/sui-indexer/migrations/{2023-08-19-044020_events => mysql/2024-03-22-052019_events}/down.sql (100%) create mode 100644 crates/sui-indexer/migrations/mysql/2024-03-22-052019_events/up.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/down.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/up.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/down.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/up.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/down.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/up.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/down.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/up.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/down.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql rename crates/sui-indexer/migrations/{2023-10-06-204335_tx_indices => mysql/2024-04-24-180418_tx_indices}/down.sql (100%) create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180418_tx_indices/up.sql rename crates/sui-indexer/migrations/{2023-10-07-160139_display => mysql/2024-04-24-180727_display}/down.sql (100%) create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180727_display/up.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/down.sql create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/up.sql rename crates/sui-indexer/migrations/{2023-11-29-193859_advance_partition => mysql/2024-04-24-182830_advance_partition}/down.sql (100%) create mode 100644 crates/sui-indexer/migrations/mysql/2024-04-24-182830_advance_partition/up.sql rename crates/sui-indexer/migrations/{ => pg}/00000000000000_diesel_initial_setup/down.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/00000000000000_diesel_initial_setup/up.sql (100%) create mode 100644 crates/sui-indexer/migrations/pg/2023-08-19-044020_events/down.sql rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044020_events/up.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044023_objects/down.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044023_objects/up.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044026_transactions/down.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044026_transactions/up.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044044_checkpoints/down.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044044_checkpoints/up.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044052_epochs/down.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-044052_epochs/up.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-060729_packages/down.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-08-19-060729_packages/up.sql (100%) create mode 100644 crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql rename crates/sui-indexer/migrations/{ => pg}/2023-10-06-204335_tx_indices/up.sql (100%) create mode 100644 crates/sui-indexer/migrations/pg/2023-10-07-160139_display/down.sql rename crates/sui-indexer/migrations/{ => pg}/2023-10-07-160139_display/up.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-10-24-160139_query_cost_function/down.sql (100%) rename crates/sui-indexer/migrations/{ => pg}/2023-10-24-160139_query_cost_function/up.sql (100%) create mode 100644 crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/down.sql rename crates/sui-indexer/migrations/{ => pg}/2023-11-29-193859_advance_partition/up.sql (100%) diff --git a/crates/sui-graphql-rpc/src/types/event.rs b/crates/sui-graphql-rpc/src/types/event.rs index 4cd60b6b554e0..ea09b2c5d062e 100644 --- a/crates/sui-graphql-rpc/src/types/event.rs +++ b/crates/sui-graphql-rpc/src/types/event.rs @@ -240,7 +240,7 @@ impl Event { idx: usize, checkpoint_viewed_at: u64, ) -> Result { - let Some(Some(serialized_event)) = stored_tx.events.get(idx) else { + let Some(serialized_event) = &stored_tx.get_event_at_idx(idx) else { return Err(Error::Internal(format!( "Could not find event with event_sequence_number {} at transaction {}", idx, stored_tx.tx_sequence_number @@ -259,7 +259,11 @@ impl Event { event_sequence_number: idx as i64, transaction_digest: stored_tx.transaction_digest.clone(), checkpoint_sequence_number: stored_tx.checkpoint_sequence_number, + #[cfg(feature = "postgres-feature")] senders: vec![Some(native_event.sender.to_vec())], + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + senders: serde_json::to_value(vec![native_event.sender.to_vec()]).unwrap(), package: native_event.package_id.to_vec(), module: native_event.transaction_module.to_string(), event_type: native_event @@ -283,12 +287,27 @@ impl Event { stored: StoredEvent, checkpoint_viewed_at: u64, ) -> Result { - let Some(Some(sender_bytes)) = stored.senders.first() else { + let Some(Some(sender_bytes)) = ({ + #[cfg(feature = "postgres-feature")] + { + stored.senders.first() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + stored + .senders + .as_array() + .ok_or_else(|| { + Error::Internal("Failed to parse event senders as array".to_string()) + })? + .first() + } + }) else { return Err(Error::Internal("No senders found for event".to_string())); }; let sender = NativeSuiAddress::from_bytes(sender_bytes) .map_err(|e| Error::Internal(e.to_string()))?; - let package_id = ObjectID::from_bytes(&stored.package).map_err(|e| Error::Internal(e.to_string()))?; let type_ = diff --git a/crates/sui-graphql-rpc/src/types/transaction_block_effects.rs b/crates/sui-graphql-rpc/src/types/transaction_block_effects.rs index 20e64fd033a2a..11ff0103c5abd 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block_effects.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block_effects.rs @@ -357,10 +357,8 @@ impl TransactionBlockEffects { return Ok(connection); }; - let Some((prev, next, _, cs)) = page.paginate_consistent_indices( - stored_tx.balance_changes.len(), - self.checkpoint_viewed_at, - )? + let Some((prev, next, _, cs)) = page + .paginate_consistent_indices(stored_tx.get_balance_len(), self.checkpoint_viewed_at)? else { return Ok(connection); }; @@ -369,7 +367,7 @@ impl TransactionBlockEffects { connection.has_next_page = next; for c in cs { - let Some(serialized) = &stored_tx.balance_changes[c.ix] else { + let Some(serialized) = &stored_tx.get_balance_at_idx(c.ix) else { continue; }; @@ -394,7 +392,7 @@ impl TransactionBlockEffects { let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; let mut connection = Connection::new(false, false); let len = match &self.kind { - TransactionBlockEffectsKind::Stored { stored_tx, .. } => stored_tx.events.len(), + TransactionBlockEffectsKind::Stored { stored_tx, .. } => stored_tx.get_event_len(), TransactionBlockEffectsKind::Executed { events, .. } | TransactionBlockEffectsKind::DryRun { events, .. } => events.len(), }; diff --git a/crates/sui-indexer/diesel.toml b/crates/sui-indexer/diesel.toml index 76b4591f871fd..e02a6684f1b13 100644 --- a/crates/sui-indexer/diesel.toml +++ b/crates/sui-indexer/diesel.toml @@ -6,4 +6,4 @@ file = "src/schema.rs" patch_file = "src/schema.patch" [migrations_directory] -dir = "migrations" +dir = "migrations/mysql" diff --git a/crates/sui-indexer/migrations/2023-08-19-044020_events/down.sql b/crates/sui-indexer/migrations/mysql/2024-03-22-052019_events/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044020_events/down.sql rename to crates/sui-indexer/migrations/mysql/2024-03-22-052019_events/down.sql diff --git a/crates/sui-indexer/migrations/mysql/2024-03-22-052019_events/up.sql b/crates/sui-indexer/migrations/mysql/2024-03-22-052019_events/up.sql new file mode 100644 index 0000000000000..e68bf743d025b --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-03-22-052019_events/up.sql @@ -0,0 +1,34 @@ +CREATE TABLE events +( + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + transaction_digest blob NOT NULL, + checkpoint_sequence_number bigint NOT NULL, + -- array of SuiAddress in bytes. All signers of the transaction. + senders json NOT NULL, + -- bytes of the entry package ID. Notice that the package and module here + -- are the package and module of the function that emitted the event, diffrent + -- from the package and module of the event type. + package blob NOT NULL, + -- entry module name + module text NOT NULL, + -- StructTag in Display format, fully qualified including type parameters + event_type text NOT NULL, + -- Components of the StructTag of the event type: package, module, + -- name (name of the struct, without type parameters) + event_type_package blob NOT NULL, + event_type_module text NOT NULL, + event_type_name text NOT NULL, + -- timestamp of the checkpoint when the event was emitted + timestamp_ms BIGINT NOT NULL, + -- bcs of the Event contents (Event.contents) + bcs blob NOT NULL, + PRIMARY KEY(tx_sequence_number, event_sequence_number, checkpoint_sequence_number) +) PARTITION BY RANGE (checkpoint_sequence_number) ( + PARTITION p0 VALUES LESS THAN MAXVALUE +); +CREATE INDEX events_package ON events (package(255), tx_sequence_number, event_sequence_number); +CREATE INDEX events_package_module ON events (package(255), module(255), tx_sequence_number, event_sequence_number); +CREATE INDEX events_event_type ON events (event_type(255), tx_sequence_number, event_sequence_number); +CREATE INDEX events_type_package_module_name ON events (event_type_package(128), event_type_module(128), event_type_name(128), tx_sequence_number, event_sequence_number); +CREATE INDEX events_checkpoint_sequence_number ON events (checkpoint_sequence_number); \ No newline at end of file diff --git a/crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/down.sql b/crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/down.sql new file mode 100644 index 0000000000000..f536fa94e1cd7 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/down.sql @@ -0,0 +1,5 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS objects; +DROP TABLE IF EXISTS objects_history; +DROP TABLE IF EXISTS objects_snapshot; + diff --git a/crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/up.sql b/crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/up.sql new file mode 100644 index 0000000000000..573389c5084a7 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/up.sql @@ -0,0 +1,109 @@ +-- Your SQL goes here +CREATE TABLE objects ( + object_id blob NOT NULL, + object_version bigint NOT NULL, + object_digest blob NOT NULL, + checkpoint_sequence_number bigint NOT NULL, + -- Immutable/Address/Object/Shared, see types.rs + owner_type smallint NOT NULL, + -- bytes of SuiAddress/ObjectID of the owner ID. + -- Non-null for objects with an owner: Addresso or Objects + owner_id blob, + -- Object type + object_type text, + -- Components of the StructTag: package, module, name (name of the struct, without type parameters) + object_type_package blob, + object_type_module text, + object_type_name text, + -- bcs serialized Object + serialized_object mediumblob NOT NULL, + -- Non-null when the object is a coin. + -- e.g. `0x2::sui::SUI` + coin_type text, + -- Non-null when the object is a coin. + coin_balance bigint, + -- DynamicField/DynamicObject, see types.rs + -- Non-null when the object is a dynamic field + df_kind smallint, + -- bcs serialized DynamicFieldName + -- Non-null when the object is a dynamic field + df_name blob, + -- object_type in DynamicFieldInfo. + df_object_type text, + -- object_id in DynamicFieldInfo. + df_object_id blob, + CONSTRAINT objects_pk PRIMARY KEY (object_id(128)) +); + +-- OwnerType: 1: Address, 2: Object, see types.rs +CREATE INDEX objects_owner ON objects (owner_type, owner_id(128)); +CREATE INDEX objects_coin ON objects (owner_id(128), coin_type(128)); +CREATE INDEX objects_checkpoint_sequence_number ON objects (checkpoint_sequence_number); +CREATE INDEX objects_package_module_name_full_type ON objects (object_type_package(128), object_type_module(128), object_type_name(128), object_type(128)); +CREATE INDEX objects_owner_package_module_name_full_type ON objects (owner_id(128), object_type_package(128), object_type_module(128), object_type_name(128), object_type(128)); + +-- similar to objects table, except that +-- 1. the primary key to store multiple object versions and partitions by checkpoint_sequence_number +-- 2. allow null values in some columns for deleted / wrapped objects +-- 3. object_status to mark the status of the object, which is either Active or WrappedOrDeleted +CREATE TABLE objects_history ( + object_id blob NOT NULL, + object_version bigint NOT NULL, + object_status smallint NOT NULL, + object_digest blob, + checkpoint_sequence_number bigint NOT NULL, + owner_type smallint, + owner_id blob, + object_type text, + -- Components of the StructTag: package, module, name (name of the struct, without type parameters) + object_type_package blob, + object_type_module text, + object_type_name text, + serialized_object mediumblob, + coin_type text, + coin_balance bigint, + df_kind smallint, + df_name blob, + df_object_type text, + df_object_id blob, + CONSTRAINT objects_history_pk PRIMARY KEY (checkpoint_sequence_number, object_id(128), object_version) +) PARTITION BY RANGE (checkpoint_sequence_number) ( + PARTITION p0 VALUES LESS THAN MAXVALUE +); +CREATE INDEX objects_history_id_version ON objects_history (object_id(128), object_version, checkpoint_sequence_number); +CREATE INDEX objects_history_owner ON objects_history (checkpoint_sequence_number, owner_type, owner_id(128)); +CREATE INDEX objects_history_coin ON objects_history (checkpoint_sequence_number, owner_id(128), coin_type(128)); +CREATE INDEX objects_history_type ON objects_history (checkpoint_sequence_number, object_type(128)); +CREATE INDEX objects_history_package_module_name_full_type ON objects_history (checkpoint_sequence_number, object_type_package(128), object_type_module(128), object_type_name(128), object_type(128)); +CREATE INDEX objects_history_owner_package_module_name_full_type ON objects_history (checkpoint_sequence_number, owner_id(128), object_type_package(128), object_type_module(128), object_type_name(128), object_type(128)); + +-- snapshot table by folding objects_history table until certain checkpoint, +-- effectively the snapshot of objects at the same checkpoint, +-- except that it also includes deleted or wrapped objects with the corresponding object_status. +CREATE TABLE objects_snapshot ( + object_id BLOB NOT NULL, + object_version bigint NOT NULL, + object_status smallint NOT NULL, + object_digest BLOB, + checkpoint_sequence_number bigint NOT NULL, + owner_type smallint, + owner_id BLOB, + object_type text, + object_type_package blob, + object_type_module text, + object_type_name text, + serialized_object mediumblob, + coin_type text, + coin_balance bigint, + df_kind smallint, + df_name BLOB, + df_object_type text, + df_object_id BLOB, + CONSTRAINT objects_snapshot_pk PRIMARY KEY (object_id(128)) +); +CREATE INDEX objects_snapshot_checkpoint_sequence_number ON objects_snapshot (checkpoint_sequence_number); +CREATE INDEX objects_snapshot_owner ON objects_snapshot (owner_type, owner_id(128), object_id(128)); +CREATE INDEX objects_snapshot_coin ON objects_snapshot (owner_id(128), coin_type(128), object_id(128)); +CREATE INDEX objects_snapshot_package_module_name_full_type ON objects_snapshot (object_type_package(128), object_type_module(128), object_type_name(128), object_type(128)); +CREATE INDEX objects_snapshot_owner_package_module_name_full_type ON objects_snapshot (owner_id(128), object_type_package(128), object_type_module(128), object_type_name(128), object_type(128)); + diff --git a/crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/down.sql b/crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/down.sql new file mode 100644 index 0000000000000..8623d59ea9b36 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/down.sql @@ -0,0 +1,5 @@ +-- This file should undo anything in `up.sql` + + + + diff --git a/crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/up.sql b/crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/up.sql new file mode 100644 index 0000000000000..f362527d55491 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-03-26-004025_transactions/up.sql @@ -0,0 +1,29 @@ +CREATE TABLE transactions ( + tx_sequence_number BIGINT NOT NULL, + transaction_digest BLOB NOT NULL, + -- bcs serialized SenderSignedData bytes + raw_transaction BLOB NOT NULL, + -- bcs serialized TransactionEffects bytes + raw_effects BLOB NOT NULL, + checkpoint_sequence_number BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + -- array of bcs serialized IndexedObjectChange bytes + object_changes JSON NOT NULL, + -- array of bcs serialized BalanceChange bytes + balance_changes JSON NOT NULL, + -- array of bcs serialized StoredEvent bytes + events JSON NOT NULL, + -- SystemTransaction/ProgrammableTransaction. See types.rs + transaction_kind smallint NOT NULL, + -- number of successful commands in this transaction, bound by number of command + -- in a programmaable transaction. + success_command_count smallint NOT NULL, + CONSTRAINT transactions_pkey PRIMARY KEY (tx_sequence_number, checkpoint_sequence_number) +) PARTITION BY RANGE (checkpoint_sequence_number) ( + PARTITION p0 VALUES LESS THAN MAXVALUE +); + +CREATE INDEX transactions_transaction_digest ON transactions (transaction_digest(255)); +CREATE INDEX transactions_checkpoint_sequence_number ON transactions (checkpoint_sequence_number); +-- only create index for system transactions (0). See types.rs +CREATE INDEX transactions_transaction_kind ON transactions (transaction_kind); diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/down.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/down.sql new file mode 100644 index 0000000000000..6a9018e3c881a --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS checkpoints; diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/up.sql new file mode 100644 index 0000000000000..61d6388eb3a21 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180008_checkpoints/up.sql @@ -0,0 +1,28 @@ +CREATE TABLE checkpoints +( + sequence_number bigint PRIMARY KEY, + checkpoint_digest blob NOT NULL, + epoch bigint NOT NULL, + -- total transactions in the network at the end of this checkpoint (including itself) + network_total_transactions bigint NOT NULL, + previous_checkpoint_digest blob, + -- if this checkpoitn is the last checkpoint of an epoch + end_of_epoch boolean NOT NULL, + -- array of TranscationDigest in bytes included in this checkpoint + tx_digests JSON NOT NULL, + timestamp_ms BIGINT NOT NULL, + total_gas_cost BIGINT NOT NULL, + computation_cost BIGINT NOT NULL, + storage_cost BIGINT NOT NULL, + storage_rebate BIGINT NOT NULL, + non_refundable_storage_fee BIGINT NOT NULL, + -- bcs serialized Vec bytes + checkpoint_commitments blob NOT NULL, + -- bcs serialized AggregateAuthoritySignature bytes + validator_signature blob NOT NULL, + -- bcs serialzied EndOfEpochData bytes, if the checkpoint marks end of an epoch + end_of_epoch_data blob +); + +CREATE INDEX checkpoints_epoch ON checkpoints (epoch, sequence_number); +CREATE INDEX checkpoints_digest ON checkpoints (checkpoint_digest(255)); diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/down.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/down.sql new file mode 100644 index 0000000000000..60ae1f98e64da --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS epochs; diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/up.sql new file mode 100644 index 0000000000000..89332d238bfe0 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180207_epochs/up.sql @@ -0,0 +1,28 @@ +CREATE TABLE epochs +( + epoch BIGINT PRIMARY KEY, + first_checkpoint_id BIGINT NOT NULL, + epoch_start_timestamp BIGINT NOT NULL, + reference_gas_price BIGINT NOT NULL, + protocol_version BIGINT NOT NULL, + total_stake BIGINT NOT NULL, + storage_fund_balance BIGINT NOT NULL, + system_state BLOB NOT NULL, + -- The following fields are nullable because they are filled in + -- only at the end of an epoch. + epoch_total_transactions BIGINT, + last_checkpoint_id BIGINT, + epoch_end_timestamp BIGINT, + -- The following fields are from SystemEpochInfoEvent emitted + -- **after** advancing to the next epoch + storage_fund_reinvestment BIGINT, + storage_charge BIGINT, + storage_rebate BIGINT, + stake_subsidy_amount BIGINT, + total_gas_fees BIGINT, + total_stake_rewards_distributed BIGINT, + leftover_storage_fund_inflow BIGINT, + -- bcs serialized Vec bytes, found in last CheckpointSummary + -- of the epoch + epoch_commitments BLOB +); diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/down.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/down.sql new file mode 100644 index 0000000000000..33ccb5a09f55f --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/down.sql @@ -0,0 +1,4 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS packages; + + diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql new file mode 100644 index 0000000000000..4c858d30c583a --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180249_packages/up.sql @@ -0,0 +1,7 @@ +CREATE TABLE packages +( + package_id blob NOT NULL, + -- bcs serialized MovePackage + move_package blob NOT NULL, + CONSTRAINT packages_pk PRIMARY KEY (package_id(255)) +); diff --git a/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/down.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180418_tx_indices/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/down.sql rename to crates/sui-indexer/migrations/mysql/2024-04-24-180418_tx_indices/down.sql diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180418_tx_indices/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180418_tx_indices/up.sql new file mode 100644 index 0000000000000..b84d4e637d507 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180418_tx_indices/up.sql @@ -0,0 +1,57 @@ +-- Your SQL goes here +CREATE TABLE tx_senders ( + cp_sequence_number BIGINT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + -- SuiAddress in bytes. + sender BLOB NOT NULL, + PRIMARY KEY(sender(255), tx_sequence_number, cp_sequence_number) +); +CREATE INDEX tx_senders_tx_sequence_number_index ON tx_senders (tx_sequence_number, cp_sequence_number); + +-- Your SQL goes here +CREATE TABLE tx_recipients ( + cp_sequence_number BIGINT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + -- SuiAddress in bytes. + recipient BLOB NOT NULL, + PRIMARY KEY(recipient(255), tx_sequence_number) +); +CREATE INDEX tx_recipients_tx_sequence_number_index ON tx_recipients (tx_sequence_number, cp_sequence_number); + +CREATE TABLE tx_input_objects ( + cp_sequence_number BIGINT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + -- Object ID in bytes. + object_id BLOB NOT NULL, + PRIMARY KEY(object_id(255), tx_sequence_number, cp_sequence_number) +); + +CREATE TABLE tx_changed_objects ( + cp_sequence_number BIGINT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + -- Object Id in bytes. + object_id BLOB NOT NULL, + PRIMARY KEY(object_id(255), tx_sequence_number) +); + +CREATE TABLE tx_calls ( + cp_sequence_number BIGINT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + package BLOB NOT NULL, + module TEXT NOT NULL, + func TEXT NOT NULL, + -- 1. Using Primary Key as a unique index. + -- 2. Diesel does not like tables with no primary key. + PRIMARY KEY(package(255), tx_sequence_number, cp_sequence_number) +); + +CREATE INDEX tx_calls_module ON tx_calls (package(255), module(255), tx_sequence_number, cp_sequence_number); +CREATE INDEX tx_calls_func ON tx_calls (package(255), module(255), func(255), tx_sequence_number, cp_sequence_number); +CREATE INDEX tx_calls_tx_sequence_number ON tx_calls (tx_sequence_number, cp_sequence_number); + +CREATE TABLE tx_digests ( + tx_digest BLOB NOT NULL, + cp_sequence_number BIGINT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + PRIMARY KEY(tx_digest(255)) +); \ No newline at end of file diff --git a/crates/sui-indexer/migrations/2023-10-07-160139_display/down.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180727_display/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-10-07-160139_display/down.sql rename to crates/sui-indexer/migrations/mysql/2024-04-24-180727_display/down.sql diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180727_display/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180727_display/up.sql new file mode 100644 index 0000000000000..c57e0597a5ba5 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180727_display/up.sql @@ -0,0 +1,9 @@ +-- Your SQL goes here +CREATE TABLE display +( + object_type text NOT NULL, + id BLOB NOT NULL, + version SMALLINT NOT NULL, + bcs BLOB NOT NULL, + primary key (object_type(255)) +); diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/down.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/down.sql new file mode 100644 index 0000000000000..e35c2cddd70c4 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +DROP FUNCTION IF EXISTS query_cost; + diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/up.sql new file mode 100644 index 0000000000000..26eaf89143a16 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-180814_query_cost_function/up.sql @@ -0,0 +1,10 @@ +-- Your SQL goes here +DROP FUNCTION IF EXISTS query_cost; +CREATE FUNCTION query_cost(query_in TEXT) +RETURNS FLOAT +DETERMINISTIC +BEGIN + DECLARE cost FLOAT; + SET cost = 1.0; + RETURN cost; +END; \ No newline at end of file diff --git a/crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/down.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-182830_advance_partition/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/down.sql rename to crates/sui-indexer/migrations/mysql/2024-04-24-182830_advance_partition/down.sql diff --git a/crates/sui-indexer/migrations/mysql/2024-04-24-182830_advance_partition/up.sql b/crates/sui-indexer/migrations/mysql/2024-04-24-182830_advance_partition/up.sql new file mode 100644 index 0000000000000..9d920a2160553 --- /dev/null +++ b/crates/sui-indexer/migrations/mysql/2024-04-24-182830_advance_partition/up.sql @@ -0,0 +1,24 @@ +DROP PROCEDURE IF EXISTS advance_partition; +CREATE PROCEDURE advance_partition( + IN table_name TEXT, + IN last_epoch BIGINT, + IN new_epoch BIGINT, + IN last_epoch_start_cp BIGINT, + IN new_epoch_start_cp BIGINT +) +BEGIN +SET @sql = CONCAT('ALTER TABLE ', table_name, ' REMOVE PARTITIONING'); +PREPARE stmt FROM @sql; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; + +SET @sql = CONCAT('ALTER TABLE ', table_name, ' ADD PARTITION (PARTITION ', table_name, '_partition_', new_epoch, ' VALUES LESS THAN (', new_epoch_start_cp, '))'); +PREPARE stmt FROM @sql; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; + +SET @sql = CONCAT('CREATE TABLE IF NOT EXISTS ', table_name, '_partition_', new_epoch, ' PARTITION OF ', table_name, ' VALUES LESS THAN (MAXVALUE)'); +PREPARE stmt FROM @sql; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; +END; \ No newline at end of file diff --git a/crates/sui-indexer/migrations/00000000000000_diesel_initial_setup/down.sql b/crates/sui-indexer/migrations/pg/00000000000000_diesel_initial_setup/down.sql similarity index 100% rename from crates/sui-indexer/migrations/00000000000000_diesel_initial_setup/down.sql rename to crates/sui-indexer/migrations/pg/00000000000000_diesel_initial_setup/down.sql diff --git a/crates/sui-indexer/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-indexer/migrations/pg/00000000000000_diesel_initial_setup/up.sql similarity index 100% rename from crates/sui-indexer/migrations/00000000000000_diesel_initial_setup/up.sql rename to crates/sui-indexer/migrations/pg/00000000000000_diesel_initial_setup/up.sql diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/down.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/down.sql new file mode 100644 index 0000000000000..6cd33fe82b338 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS events; diff --git a/crates/sui-indexer/migrations/2023-08-19-044020_events/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044020_events/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044020_events/up.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044020_events/up.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044023_objects/down.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044023_objects/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044023_objects/down.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044023_objects/down.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044023_objects/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044023_objects/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044023_objects/up.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044023_objects/up.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044026_transactions/down.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044026_transactions/down.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/down.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044026_transactions/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044026_transactions/up.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/down.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/down.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/down.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/up.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044044_checkpoints/up.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044052_epochs/down.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044052_epochs/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044052_epochs/down.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044052_epochs/down.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-044052_epochs/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044052_epochs/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-044052_epochs/up.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-044052_epochs/up.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-060729_packages/down.sql b/crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-060729_packages/down.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/down.sql diff --git a/crates/sui-indexer/migrations/2023-08-19-060729_packages/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-08-19-060729_packages/up.sql rename to crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/up.sql diff --git a/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql b/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql new file mode 100644 index 0000000000000..8e4f29f981c22 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/down.sql @@ -0,0 +1,6 @@ +DROP TABLE IF EXISTS tx_senders; +DROP TABLE IF EXISTS tx_recipients; +DROP TABLE IF EXISTS tx_input_objects; +DROP TABLE IF EXISTS tx_changed_objects; +DROP TABLE IF EXISTS tx_calls; +DROP TABLE IF EXISTS tx_digests; diff --git a/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/up.sql b/crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/up.sql rename to crates/sui-indexer/migrations/pg/2023-10-06-204335_tx_indices/up.sql diff --git a/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/down.sql b/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/down.sql new file mode 100644 index 0000000000000..f73e497c406d3 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS display; diff --git a/crates/sui-indexer/migrations/2023-10-07-160139_display/up.sql b/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-10-07-160139_display/up.sql rename to crates/sui-indexer/migrations/pg/2023-10-07-160139_display/up.sql diff --git a/crates/sui-indexer/migrations/2023-10-24-160139_query_cost_function/down.sql b/crates/sui-indexer/migrations/pg/2023-10-24-160139_query_cost_function/down.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-10-24-160139_query_cost_function/down.sql rename to crates/sui-indexer/migrations/pg/2023-10-24-160139_query_cost_function/down.sql diff --git a/crates/sui-indexer/migrations/2023-10-24-160139_query_cost_function/up.sql b/crates/sui-indexer/migrations/pg/2023-10-24-160139_query_cost_function/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-10-24-160139_query_cost_function/up.sql rename to crates/sui-indexer/migrations/pg/2023-10-24-160139_query_cost_function/up.sql diff --git a/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/down.sql b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/down.sql new file mode 100644 index 0000000000000..1693f3892a5fa --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/down.sql @@ -0,0 +1 @@ +DROP PROCEDURE IF EXISTS advance_partition; diff --git a/crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/up.sql b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql similarity index 100% rename from crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/up.sql rename to crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs index 44a4f3652f469..a45d43bbe95f6 100644 --- a/crates/sui-indexer/src/db.rs +++ b/crates/sui-indexer/src/db.rs @@ -4,11 +4,12 @@ use anyhow::anyhow; use std::time::Duration; +use crate::errors::IndexerError; use diesel::connection::BoxableConnection; +#[cfg(feature = "postgres-feature")] +use diesel::query_dsl::RunQueryDsl; +use diesel::r2d2::ConnectionManager; use diesel::r2d2::{Pool, PooledConnection, R2D2Connection}; -use diesel::{r2d2::ConnectionManager, sql_query, RunQueryDsl}; - -use crate::errors::IndexerError; pub type ConnectionPool = Pool>; pub type PoolConnection = PooledConnection>; @@ -77,10 +78,11 @@ pub struct ConnectionConfig { impl diesel::r2d2::CustomizeConnection for ConnectionConfig { - fn on_acquire(&self, conn: &mut T) -> std::result::Result<(), diesel::r2d2::Error> { + fn on_acquire(&self, _conn: &mut T) -> std::result::Result<(), diesel::r2d2::Error> { #[cfg(feature = "postgres-feature")] { - conn.as_any_mut() + _conn + .as_any_mut() .downcast_mut::() .map_or_else( || { @@ -93,7 +95,7 @@ impl diesel::r2d2::CustomizeConnection diesel::r2d2::CustomizeConnection( .map_or_else( || Err(anyhow!("Failed to downcast connection to PgConnection")), |mysql_conn| { - setup_mysql::reset_database(mysql_conn, drop_all) - .map_err(diesel::r2d2::Error::QueryError)?; + setup_mysql::reset_database(mysql_conn, drop_all)?; Ok(()) }, )?; @@ -206,7 +207,7 @@ pub mod setup_postgres { use secrecy::ExposeSecret; use tracing::{error, info}; - const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); + const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg"); pub fn reset_database( conn: &mut PoolConnection, @@ -333,22 +334,140 @@ pub mod setup_postgres { #[cfg(feature = "mysql-feature")] #[cfg(not(feature = "postgres-feature"))] pub mod setup_mysql { - use crate::db::PoolConnection; + use crate::db::{get_pool_connection, new_connection_pool, PoolConnection}; use crate::errors::IndexerError; + use crate::indexer::Indexer; + use crate::metrics::IndexerMetrics; + use crate::store::PgIndexerStore; use crate::IndexerConfig; - use diesel::MysqlConnection; + use anyhow::anyhow; + use diesel::migration::MigrationSource; + use diesel::{MysqlConnection, RunQueryDsl}; + use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use prometheus::Registry; + use secrecy::ExposeSecret; + use tracing::{error, info}; + + const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/mysql"); pub fn reset_database( - _conn: &mut PoolConnection, - _drop_all: bool, + conn: &mut PoolConnection, + drop_all: bool, ) -> Result<(), anyhow::Error> { - todo!() + info!("Resetting database ..."); + if drop_all { + crate::db::setup_mysql::drop_all_tables(conn) + .map_err(|e| anyhow!("Encountering error when dropping all tables {e}"))?; + } else { + conn.revert_all_migrations(MIGRATIONS) + .map_err(|e| anyhow!("Error reverting all migrations {e}"))?; + } + conn.run_migrations(&MIGRATIONS.migrations().unwrap()) + .map_err(|e| anyhow!("Failed to run migrations {e}"))?; + info!("Reset database complete."); + Ok(()) } + + fn drop_all_tables(conn: &mut MysqlConnection) -> Result<(), diesel::result::Error> { + info!("Dropping all tables in the database"); + let table_names: Vec = diesel::dsl::sql::( + " + SELECT TABLE_NAME FROM information_schema.tables WHERE table_schema = DATABASE() + ", + ) + .load(conn)?; + + for table_name in table_names { + let drop_table_query = format!("DROP TABLE IF EXISTS {}", table_name); + diesel::sql_query(drop_table_query).execute(conn)?; + } + + // Recreate the __diesel_schema_migrations table + diesel::sql_query( + " + CREATE TABLE __diesel_schema_migrations ( + version VARCHAR(50) PRIMARY KEY, + run_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP() + ) + ", + ) + .execute(conn)?; + info!("Dropped all tables in the database"); + Ok(()) + } + pub async fn setup( - _indexer_config: IndexerConfig, + indexer_config: IndexerConfig, registry: Registry, ) -> Result<(), IndexerError> { - todo!() + let db_url_secret = indexer_config.get_db_url().map_err(|e| { + IndexerError::PgPoolConnectionError(format!( + "Failed parsing database url with error {:?}", + e + )) + })?; + let db_url = db_url_secret.expose_secret(); + let blocking_cp = new_connection_pool::(db_url, None).map_err(|e| { + error!("Failed creating Mysql connection pool with error {:?}", e); + e + })?; + if indexer_config.reset_db { + let mut conn = get_pool_connection(&blocking_cp).map_err(|e| { + error!( + "Failed getting Mysql connection from connection pool with error {:?}", + e + ); + e + })?; + crate::db::setup_mysql::reset_database(&mut conn, /* drop_all */ true).map_err( + |e| { + let db_err_msg = format!( + "Failed resetting database with url: {:?} and error: {:?}", + db_url, e + ); + error!("{}", db_err_msg); + IndexerError::PostgresResetError(db_err_msg) + }, + )?; + } + let indexer_metrics = IndexerMetrics::new(®istry); + mysten_metrics::init_metrics(®istry); + + let report_cp = blocking_cp.clone(); + let report_metrics = indexer_metrics.clone(); + tokio::spawn(async move { + loop { + let cp_state = report_cp.state(); + info!( + "DB connection pool size: {}, with idle conn: {}.", + cp_state.connections, cp_state.idle_connections + ); + report_metrics + .db_conn_pool_size + .set(cp_state.connections as i64); + report_metrics + .idle_db_conn + .set(cp_state.idle_connections as i64); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + } + }); + if indexer_config.fullnode_sync_worker { + let store = + PgIndexerStore::::new(blocking_cp, indexer_metrics.clone()); + return Indexer::start_writer::, MysqlConnection>( + &indexer_config, + store, + indexer_metrics, + ) + .await; + } else if indexer_config.rpc_server_worker { + return Indexer::start_reader::( + &indexer_config, + ®istry, + db_url.to_string(), + ) + .await; + } + Ok(()) } } diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index 5254c042489eb..b3dcbabc2a938 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -44,6 +44,7 @@ use sui_types::{ use sui_types::{coin::CoinMetadata, event::EventID}; use crate::db::{ConnectionConfig, ConnectionPool, ConnectionPoolConfig}; +use crate::models::transactions::{stored_events_to_events, StoredTransactionEvents}; use crate::store::diesel_macro::*; use crate::{ errors::IndexerError, @@ -982,14 +983,10 @@ impl IndexerReader { transactions::table .filter(transactions::transaction_digest.eq(digest.into_inner().to_vec())) .select((transactions::timestamp_ms, transactions::events)) - .first::<(i64, Vec>>)>(conn) + .first::<(i64, StoredTransactionEvents)>(conn) })?; - let events = serialized_events - .into_iter() - .flatten() - .map(|event| bcs::from_bytes::(&event)) - .collect::, _>>()?; + let events = stored_events_to_events(serialized_events)?; let tx_events = TransactionEvents { data: events }; let sui_tx_events = tx_events_to_sui_tx_events( diff --git a/crates/sui-indexer/src/models/checkpoints.rs b/crates/sui-indexer/src/models/checkpoints.rs index a4124d3fd490f..3240e21ef3d00 100644 --- a/crates/sui-indexer/src/models/checkpoints.rs +++ b/crates/sui-indexer/src/models/checkpoints.rs @@ -21,7 +21,12 @@ pub struct StoredCheckpoint { pub network_total_transactions: i64, pub previous_checkpoint_digest: Option>, pub end_of_epoch: bool, + #[cfg(feature = "postgres-feature")] pub tx_digests: Vec>>, + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + #[diesel(sql_type = diesel::sql_types::Json)] + pub tx_digests: serde_json::Value, pub timestamp_ms: i64, pub total_gas_cost: i64, pub computation_cost: i64, @@ -39,11 +44,21 @@ impl From<&IndexedCheckpoint> for StoredCheckpoint { sequence_number: c.sequence_number as i64, checkpoint_digest: c.checkpoint_digest.into_inner().to_vec(), epoch: c.epoch as i64, + #[cfg(feature = "postgres-feature")] tx_digests: c .tx_digests .iter() .map(|tx| Some(tx.into_inner().to_vec())) .collect(), + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + tx_digests: serde_json::to_value( + c.tx_digests + .iter() + .map(|tx| tx.into_inner().to_vec()) + .collect::>>(), + ) + .unwrap(), network_total_transactions: c.network_total_transactions as i64, previous_checkpoint_digest: c .previous_checkpoint_digest @@ -89,22 +104,59 @@ impl TryFrom for RpcCheckpoint { }) .transpose()?; - let transactions: Vec = checkpoint - .tx_digests - .into_iter() - .map(|tx_digest| match tx_digest { - None => Err(IndexerError::PersistentStorageDataCorruptionError( - "tx_digests should not contain null elements".to_string(), - )), - Some(tx_digest) => TransactionDigest::try_from(tx_digest.as_slice()).map_err(|e| { - IndexerError::PersistentStorageDataCorruptionError(format!( - "Failed to decode transaction digest: {:?} with err: {:?}", - tx_digest, e - )) - }), - }) - .collect::, IndexerError>>()?; - + let transactions: Vec = { + #[cfg(feature = "postgres-feature")] + { + checkpoint + .tx_digests + .into_iter() + .map(|tx_digest| match tx_digest { + None => Err(IndexerError::PersistentStorageDataCorruptionError( + "tx_digests should not contain null elements".to_string(), + )), + Some(tx_digest) => TransactionDigest::try_from(tx_digest.as_slice()) + .map_err(|e| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Failed to decode transaction digest: {:?} with err: {:?}", + tx_digest, e + )) + }), + }) + .collect::, IndexerError>>()? + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + checkpoint + .tx_digests + .as_array() + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Failed to parse tx_digests as array".to_string(), + ) + })? + .iter() + .map(|tx_digest| match tx_digest { + serde_json::Value::Null => { + Err(IndexerError::PersistentStorageDataCorruptionError( + "tx_digests should not contain null elements".to_string(), + )) + } + serde_json::Value::String(tx_digest) => { + TransactionDigest::try_from(tx_digest.as_bytes()).map_err(|e| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Failed to decode transaction digest: {:?} with err: {:?}", + tx_digest, e + )) + }) + } + _ => Err(IndexerError::PersistentStorageDataCorruptionError( + "tx_digests should contain only string elements".to_string(), + )), + }) + .collect::, IndexerError>>()? + } + }; let validator_signature = bcs::from_bytes(&checkpoint.validator_signature).map_err(|e| { IndexerError::PersistentStorageDataCorruptionError(format!( diff --git a/crates/sui-indexer/src/models/events.rs b/crates/sui-indexer/src/models/events.rs index 173375d1d01cc..a35cf7c773f2e 100644 --- a/crates/sui-indexer/src/models/events.rs +++ b/crates/sui-indexer/src/models/events.rs @@ -29,16 +29,22 @@ pub struct StoredEvent { #[diesel(sql_type = diesel::sql_types::BigInt)] pub event_sequence_number: i64, - #[diesel(sql_type = diesel::sql_types::Bytea)] + #[diesel(sql_type = diesel::sql_types::Binary)] pub transaction_digest: Vec, #[diesel(sql_type = diesel::sql_types::BigInt)] pub checkpoint_sequence_number: i64, + #[cfg(feature = "postgres-feature")] #[diesel(sql_type = diesel::sql_types::Array>)] pub senders: Vec>>, - #[diesel(sql_type = diesel::sql_types::Bytea)] + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + #[diesel(sql_type = diesel::sql_types::Json)] + pub senders: serde_json::Value, + + #[diesel(sql_type = diesel::sql_types::Binary)] pub package: Vec, #[diesel(sql_type = diesel::sql_types::Text)] @@ -59,10 +65,17 @@ pub struct StoredEvent { #[diesel(sql_type = diesel::sql_types::BigInt)] pub timestamp_ms: i64, - #[diesel(sql_type = diesel::sql_types::Bytea)] + #[diesel(sql_type = diesel::sql_types::Binary)] pub bcs: Vec, } +#[cfg(feature = "postgres-feature")] +pub type SendersType = Vec>>; + +#[cfg(feature = "postgres-feature")] +#[cfg(not(feature = "postgres-feature"))] +pub type SendersType = serde_json::Value; + impl From for StoredEvent { fn from(event: IndexedEvent) -> Self { Self { @@ -70,11 +83,15 @@ impl From for StoredEvent { event_sequence_number: event.event_sequence_number as i64, transaction_digest: event.transaction_digest.into_inner().to_vec(), checkpoint_sequence_number: event.checkpoint_sequence_number as i64, + #[cfg(feature = "postgres-feature")] senders: event .senders .into_iter() .map(|sender| Some(sender.to_vec())) .collect(), + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + senders: serde_json::to_value(event.senders).unwrap(), package: event.package.to_vec(), module: event.module.clone(), event_type: event.event_type.clone(), @@ -99,13 +116,37 @@ impl StoredEvent { )) })?; // Note: SuiEvent only has one sender today, so we always use the first one. - let sender = self.senders.first().ok_or_else(|| { - IndexerError::PersistentStorageDataCorruptionError( - "Event senders should contain at least one address".to_string(), - ) - })?; + let sender = { + #[cfg(feature = "postgres-feature")] + { + self.senders.first().ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Event senders should contain at least one address".to_string(), + ) + })? + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.senders + .as_array() + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Failed to parse event senders as array".to_string(), + ) + })? + .first() + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Event senders should contain at least one address".to_string(), + ) + })? + .as_str() + .map(|s| s.as_bytes().to_vec()) + } + }; let sender = match sender { - Some(s) => SuiAddress::from_bytes(s).map_err(|_e| { + Some(ref s) => SuiAddress::from_bytes(s).map_err(|_e| { IndexerError::PersistentStorageDataCorruptionError(format!( "Failed to parse event sender address: {:?}", sender diff --git a/crates/sui-indexer/src/models/objects.rs b/crates/sui-indexer/src/models/objects.rs index 4389510dd9d24..b7663163731d1 100644 --- a/crates/sui-indexer/src/models/objects.rs +++ b/crates/sui-indexer/src/models/objects.rs @@ -164,11 +164,11 @@ impl From for StoredHistoryObject { object_digest: Some(o.object_digest), checkpoint_sequence_number: o.checkpoint_sequence_number, owner_type: Some(o.owner_type), - owner_id: o.owner_id, - object_type: o.object_type, object_type_package: o.object_type_package, object_type_module: o.object_type_module, object_type_name: o.object_type_name, + owner_id: o.owner_id, + object_type: o.object_type, serialized_object: Some(o.serialized_object), coin_type: o.coin_type, coin_balance: o.coin_balance, diff --git a/crates/sui-indexer/src/models/transactions.rs b/crates/sui-indexer/src/models/transactions.rs index eb144da54fe77..c5c7d2448f1ce 100644 --- a/crates/sui-indexer/src/models/transactions.rs +++ b/crates/sui-indexer/src/models/transactions.rs @@ -33,13 +33,35 @@ pub struct StoredTransaction { pub raw_effects: Vec, pub checkpoint_sequence_number: i64, pub timestamp_ms: i64, + #[cfg(feature = "postgres-feature")] pub object_changes: Vec>>, + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + #[diesel(sql_type = diesel::sql_types::Json)] + pub object_changes: serde_json::Value, + #[cfg(feature = "postgres-feature")] pub balance_changes: Vec>>, + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + #[diesel(sql_type = diesel::sql_types::Json)] + pub balance_changes: serde_json::Value, + #[cfg(feature = "postgres-feature")] pub events: Vec>>, + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + #[diesel(sql_type = diesel::sql_types::Json)] + pub events: serde_json::Value, pub transaction_kind: i16, pub success_command_count: i16, } +#[cfg(feature = "postgres-feature")] +pub type StoredTransactionEvents = Vec>>; + +#[cfg(feature = "mysql-feature")] +#[cfg(not(feature = "postgres-feature"))] +pub type StoredTransactionEvents = serde_json::Value; + #[derive(Debug, Queryable)] pub struct TxSeq { pub seq: i64, @@ -79,21 +101,51 @@ impl From<&IndexedTransaction> for StoredTransaction { raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(), raw_effects: bcs::to_bytes(&tx.effects).unwrap(), checkpoint_sequence_number: tx.checkpoint_sequence_number as i64, + #[cfg(feature = "postgres-feature")] object_changes: tx .object_changes .iter() .map(|oc| Some(bcs::to_bytes(&oc).unwrap())) .collect(), + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + object_changes: serde_json::to_value( + tx.object_changes + .iter() + .map(|oc| bcs::to_bytes(&oc).unwrap()) + .collect::>>(), + ) + .unwrap(), + #[cfg(feature = "postgres-feature")] balance_changes: tx .balance_change .iter() .map(|bc| Some(bcs::to_bytes(&bc).unwrap())) .collect(), + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + balance_changes: serde_json::to_value( + tx.balance_change + .iter() + .map(|bc| bcs::to_bytes(&bc).unwrap()) + .collect::>>(), + ) + .unwrap(), + #[cfg(feature = "postgres-feature")] events: tx .events .iter() .map(|e| Some(bcs::to_bytes(&e).unwrap())) .collect(), + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + events: serde_json::to_value( + tx.events + .iter() + .map(|e| bcs::to_bytes(&e).unwrap()) + .collect::>>(), + ) + .unwrap(), timestamp_ms: tx.timestamp_ms as i64, transaction_kind: tx.transaction_kind.clone() as i16, success_command_count: tx.successful_tx_num as i16, @@ -102,6 +154,84 @@ impl From<&IndexedTransaction> for StoredTransaction { } impl StoredTransaction { + pub fn get_balance_len(&self) -> usize { + #[cfg(feature = "postgres-feature")] + { + self.balance_changes.len() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.balance_changes.as_array().unwrap().len() + } + } + + pub fn get_balance_at_idx(&self, idx: usize) -> Option> { + #[cfg(feature = "postgres-feature")] + { + self.balance_changes.get(idx).cloned().flatten() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.balance_changes.as_array().unwrap()[idx] + .as_str() + .map(|s| s.as_bytes().to_vec()) + } + } + + pub fn get_object_len(&self) -> usize { + #[cfg(feature = "postgres-feature")] + { + self.object_changes.len() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.object_changes.as_array().unwrap().len() + } + } + + pub fn get_object_at_idx(&self, idx: usize) -> Option> { + #[cfg(feature = "postgres-feature")] + { + self.object_changes.get(idx).cloned().flatten() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.object_changes.as_array().unwrap()[idx] + .as_str() + .map(|s| s.as_bytes().to_vec()) + } + } + + pub fn get_event_len(&self) -> usize { + #[cfg(feature = "postgres-feature")] + { + self.events.len() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.events.as_array().unwrap().len() + } + } + + pub fn get_event_at_idx(&self, idx: usize) -> Option> { + #[cfg(feature = "postgres-feature")] + { + self.events.get(idx).cloned().flatten() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.events.as_array().unwrap()[idx] + .as_str() + .map(|s| s.as_bytes().to_vec()) + } + } + pub async fn try_into_sui_transaction_block_response( self, options: SuiTransactionBlockResponseOptions, @@ -142,25 +272,60 @@ impl StoredTransaction { }; let events = if options.show_events { - let events = self - .events - .into_iter() - .map(|event| match event { - Some(event) => { - let event: Event = bcs::from_bytes(&event).map_err(|e| { - IndexerError::PersistentStorageDataCorruptionError(format!( - "Can't convert event bytes into Event. tx_digest={:?} Error: {e}", + let events = { + #[cfg(feature = "postgres-feature")] + { + self + .events + .into_iter() + .map(|event| match event { + Some(event) => { + let event: Event = bcs::from_bytes(&event).map_err(|e| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Can't convert event bytes into Event. tx_digest={:?} Error: {e}", + tx_digest + )) + })?; + Ok(event) + } + None => Err(IndexerError::PersistentStorageDataCorruptionError(format!( + "Event should not be null, tx_digest={:?}", tx_digest - )) - })?; - Ok(event) - } - None => Err(IndexerError::PersistentStorageDataCorruptionError(format!( - "Event should not be null, tx_digest={:?}", - tx_digest - ))), - }) - .collect::, IndexerError>>()?; + ))), + }) + .collect::, IndexerError>>()? + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.events + .as_array() + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Failed to parse events as array".to_string(), + ) + })? + .iter() + .map(|event| match event { + serde_json::Value::Null => Err(IndexerError::PersistentStorageDataCorruptionError( + "events should not contain null elements".to_string(), + )), + serde_json::Value::String(event) => { + let event: Event = bcs::from_bytes(event.as_bytes()).map_err(|e| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Can't convert event bytes into Event. tx_digest={:?} Error: {e}", + tx_digest + )) + })?; + Ok(event) + } + _ => Err(IndexerError::PersistentStorageDataCorruptionError( + "events should contain only string elements".to_string(), + )), + }) + .collect::, IndexerError>>()? + } + }; let timestamp = self.timestamp_ms as u64; let tx_events = TransactionEvents { data: events }; @@ -170,38 +335,102 @@ impl StoredTransaction { }; let object_changes = if options.show_object_changes { - let object_changes = self.object_changes.into_iter().map(|object_change| { - match object_change { - Some(object_change) => { - let object_change: IndexedObjectChange = bcs::from_bytes(&object_change) - .map_err(|e| IndexerError::PersistentStorageDataCorruptionError( - format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest) - ))?; - Ok(ObjectChange::from(object_change)) - } - None => Err(IndexerError::PersistentStorageDataCorruptionError(format!("object_change should not be null, tx_digest={:?}", tx_digest))), + let object_changes = { + #[cfg(feature = "postgres-feature")] + { + self.object_changes.into_iter().map(|object_change| { + match object_change { + Some(object_change) => { + let object_change: IndexedObjectChange = bcs::from_bytes(&object_change) + .map_err(|e| IndexerError::PersistentStorageDataCorruptionError( + format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest) + ))?; + Ok(ObjectChange::from(object_change)) + } + None => Err(IndexerError::PersistentStorageDataCorruptionError(format!("object_change should not be null, tx_digest={:?}", tx_digest))), + } + }).collect::, IndexerError>>()? } - }).collect::, IndexerError>>()?; - + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.object_changes + .as_array() + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Failed to parse object_changes as array".to_string(), + ) + })? + .iter() + .map(|object_change| match object_change { + serde_json::Value::Null => Err(IndexerError::PersistentStorageDataCorruptionError( + "object_changes should not contain null elements".to_string(), + )), + serde_json::Value::String(object_change) => { + let object_change: IndexedObjectChange = bcs::from_bytes(object_change.as_bytes()) + .map_err(|e| IndexerError::PersistentStorageDataCorruptionError( + format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest) + ))?; + Ok(ObjectChange::from(object_change)) + } + _ => Err(IndexerError::PersistentStorageDataCorruptionError( + "object_changes should contain only string elements".to_string(), + )), + }) + .collect::, IndexerError>>()? + } + }; Some(object_changes) } else { None }; let balance_changes = if options.show_balance_changes { - let balance_changes = self.balance_changes.into_iter().map(|balance_change| { - match balance_change { - Some(balance_change) => { - let balance_change: BalanceChange = bcs::from_bytes(&balance_change) - .map_err(|e| IndexerError::PersistentStorageDataCorruptionError( - format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest) - ))?; - Ok(balance_change) - } - None => Err(IndexerError::PersistentStorageDataCorruptionError(format!("object_change should not be null, tx_digest={:?}", tx_digest))), + let balance_changes = { + #[cfg(feature = "postgres-feature")] + { + self.balance_changes.into_iter().map(|balance_change| { + match balance_change { + Some(balance_change) => { + let balance_change: BalanceChange = bcs::from_bytes(&balance_change) + .map_err(|e| IndexerError::PersistentStorageDataCorruptionError( + format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest) + ))?; + Ok(balance_change) + } + None => Err(IndexerError::PersistentStorageDataCorruptionError(format!("object_change should not be null, tx_digest={:?}", tx_digest))), + } + }).collect::, IndexerError>>()? } - }).collect::, IndexerError>>()?; - + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + self.balance_changes + .as_array() + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Failed to parse balance_changes as array".to_string(), + ) + })? + .iter() + .map(|balance_change| match balance_change { + serde_json::Value::Null => Err(IndexerError::PersistentStorageDataCorruptionError( + "balance_changes should not contain null elements".to_string(), + )), + serde_json::Value::String(balance_change) => { + let balance_change: BalanceChange = bcs::from_bytes(balance_change.as_bytes()) + .map_err(|e| IndexerError::PersistentStorageDataCorruptionError( + format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest) + ))?; + Ok(balance_change) + } + _ => Err(IndexerError::PersistentStorageDataCorruptionError( + "balance_changes should contain only string elements".to_string(), + )), + }) + .collect::, IndexerError>>()? + } + }; Some(balance_changes) } else { None @@ -245,6 +474,58 @@ impl StoredTransaction { } } +pub fn stored_events_to_events( + stored_events: StoredTransactionEvents, +) -> Result, IndexerError> { + #[cfg(feature = "postgres-feature")] + { + stored_events + .into_iter() + .map(|event| match event { + Some(event) => { + let event: Event = bcs::from_bytes(&event).map_err(|e| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Can't convert event bytes into Event. Error: {e}", + )) + })?; + Ok(event) + } + None => Err(IndexerError::PersistentStorageDataCorruptionError( + "Event should not be null".to_string(), + )), + }) + .collect::, IndexerError>>() + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + stored_events + .as_array() + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError( + "Failed to parse events as array".to_string(), + ) + })? + .iter() + .map(|event| match event { + serde_json::Value::Null => Err(IndexerError::PersistentStorageDataCorruptionError( + "events should not contain null elements".to_string(), + )), + serde_json::Value::String(event) => { + let event: Event = bcs::from_bytes(event.as_bytes()).map_err(|e| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Can't convert event bytes into Event. Error: {e}", + )) + })?; + Ok(event) + } + _ => Err(IndexerError::PersistentStorageDataCorruptionError( + "events should contain only string elements".to_string(), + )), + }) + .collect::, IndexerError>>() + } +} pub async fn tx_events_to_sui_tx_events( tx_events: TransactionEvents, package_resolver: Arc>, diff --git a/crates/sui-indexer/src/models/tx_indices.rs b/crates/sui-indexer/src/models/tx_indices.rs index acbbbfff5bbba..2bae099de41e7 100644 --- a/crates/sui-indexer/src/models/tx_indices.rs +++ b/crates/sui-indexer/src/models/tx_indices.rs @@ -17,7 +17,7 @@ pub struct TxSequenceNumber { #[derive(QueryableByName)] pub struct TxDigest { - #[diesel(sql_type = diesel::sql_types::Bytea)] + #[diesel(sql_type = diesel::sql_types::Binary)] pub transaction_digest: Vec, } diff --git a/crates/sui-indexer/src/store/mod.rs b/crates/sui-indexer/src/store/mod.rs index a263e4166caa4..87bbbda82b577 100644 --- a/crates/sui-indexer/src/store/mod.rs +++ b/crates/sui-indexer/src/store/mod.rs @@ -68,6 +68,7 @@ pub mod diesel_macro { #[cfg(feature = "mysql-feature")] #[cfg(not(feature = "postgres-feature"))] { + use diesel::Connection; let mut pool_conn = get_pool_connection($pool)?; pool_conn .as_any_mut() @@ -116,6 +117,7 @@ pub mod diesel_macro { #[cfg(feature = "mysql-feature")] #[cfg(not(feature = "postgres-feature"))] { + use diesel::Connection; let mut pool_conn = get_pool_connection($pool).map_err(|e| backoff::Error::Transient { err: IndexerError::PostgresWriteError(e.to_string()), @@ -187,6 +189,7 @@ pub mod diesel_macro { #[cfg(feature = "mysql-feature")] #[cfg(not(feature = "postgres-feature"))] { + use diesel::Connection; pool_conn .as_any_mut() .downcast_mut::>() @@ -200,6 +203,62 @@ pub mod diesel_macro { }}; } + #[macro_export] + macro_rules! insert_or_ignore_into { + ($table:expr, $values:expr, $conn:expr) => {{ + use diesel::RunQueryDsl; + let error_message = concat!("Failed to write to ", stringify!($table), " DB"); + #[cfg(feature = "postgres-feature")] + { + diesel::insert_into($table) + .values($values) + .on_conflict_do_nothing() + .execute($conn) + .map_err(IndexerError::from) + .context(error_message)?; + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + diesel::insert_or_ignore_into($table) + .values($values) + .execute($conn) + .map_err(IndexerError::from) + .context(error_message)?; + } + }}; + } + + #[macro_export] + macro_rules! on_conflict_do_update { + ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $mysql_columns:expr, $conn:expr) => {{ + use diesel::ExpressionMethods; + use diesel::RunQueryDsl; + #[cfg(feature = "postgres-feature")] + { + diesel::insert_into($table) + .values($values) + .on_conflict($target) + .do_update() + .set($pg_columns) + .execute($conn)?; + } + #[cfg(feature = "mysql-feature")] + #[cfg(not(feature = "postgres-feature"))] + { + for excluded_row in $values.iter() { + let columns = $mysql_columns; + diesel::insert_into($table) + .values(excluded_row.clone()) + .on_conflict(diesel::dsl::DuplicatedKeys) + .do_update() + .set(columns(excluded_row.clone())) + .execute($conn)?; + } + } + }}; + } + #[macro_export] macro_rules! run_query { ($pool:expr, $query:expr) => {{ diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 7f2bc9767a1fc..16a685b0f0a92 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -12,7 +12,6 @@ use async_trait::async_trait; use core::result::Result::Ok; use diesel::dsl::max; use diesel::r2d2::R2D2Connection; -use diesel::upsert::excluded; use diesel::ExpressionMethods; use diesel::OptionalExtension; use diesel::{QueryDsl, RunQueryDsl}; @@ -44,12 +43,18 @@ use crate::schema::{ tx_senders, }; use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; -use crate::{read_only_blocking, transactional_blocking_with_retry}; +use crate::{ + insert_or_ignore_into, on_conflict_do_update, read_only_blocking, + transactional_blocking_with_retry, +}; use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager}; use super::IndexerStore; use super::ObjectChangeToCommit; +#[cfg(feature = "postgres-feature")] +use diesel::upsert::excluded; + #[macro_export] macro_rules! chunk { ($data: expr, $size: expr) => {{ @@ -204,18 +209,22 @@ impl PgIndexerStore { transactional_blocking_with_retry!( &self.blocking_cp, |conn| { - diesel::insert_into(display::table) - .values(display_updates.values().collect::>()) - .on_conflict(display::object_type) - .do_update() - .set(( + on_conflict_do_update!( + display::table, + display_updates.values().collect::>(), + display::object_type, + ( display::id.eq(excluded(display::id)), display::version.eq(excluded(display::version)), display::bcs.eq(excluded(display::bcs)), - )) - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write display updates to PostgresDB")?; + ), + |excluded: &StoredDisplay| ( + display::id.eq(excluded.id.clone()), + display::version.eq(excluded.version), + display::bcs.eq(excluded.bcs.clone()), + ), + conn + ); Ok::<(), IndexerError>(()) }, PG_DB_COMMIT_SLEEP_DURATION @@ -236,11 +245,11 @@ impl PgIndexerStore { transactional_blocking_with_retry!( &self.blocking_cp, |conn| { - diesel::insert_into(objects::table) - .values(mutated_object_mutation_chunk.clone()) - .on_conflict(objects::object_id) - .do_update() - .set(( + on_conflict_do_update!( + objects::table, + mutated_object_mutation_chunk.clone(), + objects::object_id, + ( objects::object_id.eq(excluded(objects::object_id)), objects::object_version.eq(excluded(objects::object_version)), objects::object_digest.eq(excluded(objects::object_digest)), @@ -256,10 +265,25 @@ impl PgIndexerStore { objects::df_name.eq(excluded(objects::df_name)), objects::df_object_type.eq(excluded(objects::df_object_type)), objects::df_object_id.eq(excluded(objects::df_object_id)), - )) - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write object mutation to PostgresDB")?; + ), + |excluded: StoredObject| ( + objects::object_id.eq(excluded.object_id.clone()), + objects::object_version.eq(excluded.object_version), + objects::object_digest.eq(excluded.object_digest.clone()), + objects::checkpoint_sequence_number.eq(excluded.checkpoint_sequence_number), + objects::owner_type.eq(excluded.owner_type), + objects::owner_id.eq(excluded.owner_id.clone()), + objects::object_type.eq(excluded.object_type.clone()), + objects::serialized_object.eq(excluded.serialized_object.clone()), + objects::coin_type.eq(excluded.coin_type.clone()), + objects::coin_balance.eq(excluded.coin_balance), + objects::df_kind.eq(excluded.df_kind), + objects::df_name.eq(excluded.df_name.clone()), + objects::df_object_type.eq(excluded.df_object_type.clone()), + objects::df_object_id.eq(excluded.df_object_id.clone()), + ), + conn + ); Ok::<(), IndexerError>(()) }, PG_DB_COMMIT_SLEEP_DURATION @@ -338,11 +362,11 @@ impl PgIndexerStore { for objects_snapshot_chunk in objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(objects_snapshot::table) - .values(objects_snapshot_chunk) - .on_conflict(objects_snapshot::object_id) - .do_update() - .set(( + on_conflict_do_update!( + objects_snapshot::table, + objects_snapshot_chunk, + objects_snapshot::object_id, + ( objects_snapshot::object_version .eq(excluded(objects_snapshot::object_version)), objects_snapshot::object_status @@ -366,10 +390,26 @@ impl PgIndexerStore { .eq(excluded(objects_snapshot::df_object_type)), objects_snapshot::df_object_id .eq(excluded(objects_snapshot::df_object_id)), - )) - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write object mutations to objects_snapshot in DB.")?; + ), + |excluded: StoredObjectSnapshot| ( + objects_snapshot::object_version.eq(excluded.object_version), + objects_snapshot::object_status.eq(excluded.object_status), + objects_snapshot::object_digest.eq(excluded.object_digest), + objects_snapshot::checkpoint_sequence_number + .eq(excluded.checkpoint_sequence_number), + objects_snapshot::owner_type.eq(excluded.owner_type), + objects_snapshot::owner_id.eq(excluded.owner_id), + objects_snapshot::object_type.eq(excluded.object_type), + objects_snapshot::serialized_object.eq(excluded.serialized_object), + objects_snapshot::coin_type.eq(excluded.coin_type), + objects_snapshot::coin_balance.eq(excluded.coin_balance), + objects_snapshot::df_kind.eq(excluded.df_kind), + objects_snapshot::df_name.eq(excluded.df_name), + objects_snapshot::df_object_type.eq(excluded.df_object_type), + objects_snapshot::df_object_id.eq(excluded.df_object_id), + ), + conn + ); } Ok::<(), IndexerError>(()) }, @@ -415,23 +455,17 @@ impl PgIndexerStore { for mutated_object_change_chunk in mutated_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(objects_history::table) - .values(mutated_object_change_chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write object mutations to objects_history in DB.")?; + insert_or_ignore_into!( + objects_history::table, + mutated_object_change_chunk, + conn + ); } for deleted_objects_chunk in deleted_object_ids.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(objects_history::table) - .values(deleted_objects_chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write object deletions to objects_history in DB.")?; + insert_or_ignore_into!(objects_history::table, deleted_objects_chunk, conn); } Ok::<(), IndexerError>(()) @@ -486,12 +520,7 @@ impl PgIndexerStore { for stored_checkpoint_chunk in stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(checkpoints::table) - .values(stored_checkpoint_chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write checkpoints to PostgresDB")?; + insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn); let time_now_ms = chrono::Utc::now().timestamp_millis(); for stored_checkpoint in stored_checkpoint_chunk { self.metrics @@ -538,12 +567,7 @@ impl PgIndexerStore { &self.blocking_cp, |conn| { for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(transactions::table) - .values(transaction_chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write transactions to PostgresDB")?; + insert_or_ignore_into!(transactions::table, transaction_chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -577,12 +601,7 @@ impl PgIndexerStore { &self.blocking_cp, |conn| { for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(events::table) - .values(event_chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write events to PostgresDB")?; + insert_or_ignore_into!(events::table, event_chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -613,17 +632,20 @@ impl PgIndexerStore { &self.blocking_cp, |conn| { for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(packages::table) - .values(packages_chunk) - // System packages such as 0x2/0x9 will have their package_id - // unchanged during upgrades. In this case, we override the modules - // TODO: race condition is possible here. Figure out how to avoid/detect - .on_conflict(packages::package_id) - .do_update() - .set(packages::move_package.eq(excluded(packages::move_package))) - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write packages to PostgresDB")?; + on_conflict_do_update!( + packages::table, + packages_chunk, + packages::package_id, + ( + packages::package_id.eq(excluded(packages::package_id)), + packages::move_package.eq(excluded(packages::move_package)), + ), + |excluded: StoredPackage| ( + packages::package_id.eq(excluded.package_id.clone()), + packages::move_package.eq(excluded.move_package), + ), + conn + ); } Ok::<(), IndexerError>(()) }, @@ -689,20 +711,10 @@ impl PgIndexerStore { &this.blocking_cp, |conn| { for chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_senders::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_senders to PostgresDB")?; + insert_or_ignore_into!(tx_senders::table, chunk, conn); } for chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_recipients::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_recipients to PostgresDB")?; + insert_or_ignore_into!(tx_recipients::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -732,12 +744,7 @@ impl PgIndexerStore { &this.blocking_cp, |conn| { for chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_input_objects::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_input_objects chunk to PostgresDB")?; + insert_or_ignore_into!(tx_input_objects::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -762,12 +769,7 @@ impl PgIndexerStore { &this.blocking_cp, |conn| { for chunk in changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_changed_objects::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_changed_objects chunk to PostgresDB")?; + insert_or_ignore_into!(tx_changed_objects::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -791,12 +793,7 @@ impl PgIndexerStore { &this.blocking_cp, |conn| { for chunk in calls.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_calls::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_calls chunk to PostgresDB")?; + insert_or_ignore_into!(tx_calls::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -871,11 +868,11 @@ impl PgIndexerStore { let last_epoch_id = last_epoch.epoch; let last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); info!(last_epoch_id, "Persisting epoch end data: {:?}", last_epoch); - diesel::insert_into(epochs::table) - .values(last_epoch) - .on_conflict(epochs::epoch) - .do_update() - .set(( + on_conflict_do_update!( + epochs::table, + vec![last_epoch], + epochs::epoch, + ( // Note: Exclude epoch beginning info except system_state below. // This is to ensure that epoch beginning info columns are not overridden with default values, // because these columns are default values in `last_epoch`. @@ -895,16 +892,32 @@ impl PgIndexerStore { epochs::leftover_storage_fund_inflow .eq(excluded(epochs::leftover_storage_fund_inflow)), epochs::epoch_commitments.eq(excluded(epochs::epoch_commitments)), - )) - .execute(conn)?; + ), + |excluded: StoredEpochInfo| ( + epochs::system_state.eq(excluded.system_state.clone()), + epochs::epoch_total_transactions.eq(excluded.epoch_total_transactions), + epochs::last_checkpoint_id.eq(excluded.last_checkpoint_id), + epochs::epoch_end_timestamp.eq(excluded.epoch_end_timestamp), + epochs::storage_fund_reinvestment + .eq(excluded.storage_fund_reinvestment), + epochs::storage_charge.eq(excluded.storage_charge), + epochs::storage_rebate.eq(excluded.storage_rebate), + epochs::stake_subsidy_amount.eq(excluded.stake_subsidy_amount), + epochs::total_gas_fees.eq(excluded.total_gas_fees), + epochs::total_stake_rewards_distributed + .eq(excluded.total_stake_rewards_distributed), + epochs::leftover_storage_fund_inflow + .eq(excluded.leftover_storage_fund_inflow), + epochs::epoch_commitments.eq(excluded.epoch_commitments) + ), + conn + ); } + let epoch_id = epoch.new_epoch.epoch; info!(epoch_id, "Persisting epoch beginning info"); let new_epoch = StoredEpochInfo::from_epoch_beginning_info(&epoch.new_epoch); - diesel::insert_into(epochs::table) - .values(new_epoch) - .on_conflict_do_nothing() - .execute(conn)?; + insert_or_ignore_into!(epochs::table, new_epoch, conn); Ok::<(), IndexerError>(()) }, PG_DB_COMMIT_SLEEP_DURATION diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index 452c2f6f3812b..4b0ac87b0b2e8 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -15,7 +15,8 @@ use crate::models::epoch::StoredEpochInfo; use crate::store::diesel_macro::*; use downcast::Any; -const GET_PARTITION_SQL: &str = r" +const GET_PARTITION_SQL: &str = if cfg!(feature = "postgres-feature") { + r" SELECT parent.relname AS table_name, MIN(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS first_partition, MAX(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS last_partition @@ -26,7 +27,19 @@ FROM pg_inherits JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace WHERE parent.relkind = 'p' GROUP BY table_name; -"; +" +} else if cfg!(feature = "mysql-feature") && cfg!(not(feature = "postgres-feature")) { + r" + SELECT TABLE_NAME AS table_name, + MAX(CAST(SUBSTRING(PARTITION_NAME, -1) AS UNSIGNED)) AS last_partition +FROM information_schema.PARTITIONS +WHERE TABLE_SCHEMA = DATABASE() +AND PARTITION_NAME IS NOT NULL +GROUP BY table_name; + " +} else { + "" +}; pub struct PgPartitionManager { cp: ConnectionPool,