Skip to content

Commit

Permalink
Add migrations for mysql (#17421)
Browse files Browse the repository at this point in the history
## Description 

This PR adds migrations for mysql database. After this we should be able
to use indexer-writer for data ingestion into a mysql database.
## Test plan 

Tested by running locally with:
```
cargo run  --bin sui-indexer --features mysql-feature --no-default-features -- --db-url <> --rpc-client-url <> --fullnode-sync-worker --reset-db

2024-04-30T18:16:06.843439Z  INFO sui_data_ingestion_core::worker_pool: received checkpoint for processing 1420 for workflow workflow
2024-04-30T18:16:06.843456Z  INFO sui_indexer::handlers::checkpoint_handler: Indexing checkpoint data blob checkpoint_seq=1420
2024-04-30T18:16:06.843477Z  INFO sui_indexer::handlers::checkpoint_handler: Resolving Move struct layouts for struct tags of size 1.
2024-04-30T18:16:06.845713Z  INFO sui_indexer::handlers::checkpoint_handler: Indexed one checkpoint. checkpoint_seq=1388
2024-04-30T18:16:06.845739Z  INFO sui_data_ingestion_core::worker_pool: finished checkpoint processing 1388 for workflow workflow in 33.194375ms
2024-04-30T18:16:06.845768Z  INFO sui_data_ingestion_core::worker_pool: received checkpoint for processing 1396 for workflow workflow
2024-04-30T18:16:06.845784Z  INFO sui_indexer::handlers::checkpoint_handler: Indexing checkpoint data blob checkpoint_seq=1396
2024-04-30T18:16:06.845804Z  INFO sui_indexer::handlers::checkpoint_handler: Resolving Move struct layouts for struct tags of size 1.
2024-04-30T18:16:06.848785Z  INFO sui_indexer::handlers::checkpoint_handler: Indexed one checkpoint. checkpoint_seq=1385
2024-04-30T18:16:06.848819Z  INFO sui_data_ingestion_core::worker_pool: finished checkpoint processing 1385 for workflow workflow in 56.440792ms
2024-04-30T18:16:06.848861Z  INFO sui_data_ingestion_core::worker_pool: received checkpoint for processing 1382 for workflow workflow
2024-04-30T18:16:06.848883Z  INFO sui_indexer::handlers::checkpoint_handler: Indexing checkpoint data blob checkpoint_seq=1382
2024-04-30T18:16:06.848912Z  INFO sui_indexer::handlers::checkpoint_handler: Resolving Move struct layouts for struct tags of size 1.
2024-04-30T18:16:06.849557Z  INFO sui_indexer::handlers::checkpoint_handler: Indexed one checkpoint. checkpoint_seq=1383
2024-04-30T18:16:06.849573Z  INFO sui_data_ingestion_core::worker_pool: finished checkpoint processing 1383 for workflow workflow in 39.8655ms

```
  • Loading branch information
sadhansood committed May 4, 2024
1 parent 3a062ec commit c1dfdcd
Show file tree
Hide file tree
Showing 55 changed files with 1,173 additions and 216 deletions.
25 changes: 22 additions & 3 deletions crates/sui-graphql-rpc/src/types/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl Event {
idx: usize,
checkpoint_viewed_at: u64,
) -> Result<Self, Error> {
let Some(Some(serialized_event)) = stored_tx.events.get(idx) else {
let Some(serialized_event) = &stored_tx.get_event_at_idx(idx) else {
return Err(Error::Internal(format!(
"Could not find event with event_sequence_number {} at transaction {}",
idx, stored_tx.tx_sequence_number
Expand All @@ -259,7 +259,11 @@ impl Event {
event_sequence_number: idx as i64,
transaction_digest: stored_tx.transaction_digest.clone(),
checkpoint_sequence_number: stored_tx.checkpoint_sequence_number,
#[cfg(feature = "postgres-feature")]
senders: vec![Some(native_event.sender.to_vec())],
#[cfg(feature = "mysql-feature")]
#[cfg(not(feature = "postgres-feature"))]
senders: serde_json::to_value(vec![native_event.sender.to_vec()]).unwrap(),
package: native_event.package_id.to_vec(),
module: native_event.transaction_module.to_string(),
event_type: native_event
Expand All @@ -283,12 +287,27 @@ impl Event {
stored: StoredEvent,
checkpoint_viewed_at: u64,
) -> Result<Self, Error> {
let Some(Some(sender_bytes)) = stored.senders.first() else {
let Some(Some(sender_bytes)) = ({
#[cfg(feature = "postgres-feature")]
{
stored.senders.first()
}
#[cfg(feature = "mysql-feature")]
#[cfg(not(feature = "postgres-feature"))]
{
stored
.senders
.as_array()
.ok_or_else(|| {
Error::Internal("Failed to parse event senders as array".to_string())
})?
.first()
}
}) else {
return Err(Error::Internal("No senders found for event".to_string()));
};
let sender = NativeSuiAddress::from_bytes(sender_bytes)
.map_err(|e| Error::Internal(e.to_string()))?;

let package_id =
ObjectID::from_bytes(&stored.package).map_err(|e| Error::Internal(e.to_string()))?;
let type_ =
Expand Down
10 changes: 4 additions & 6 deletions crates/sui-graphql-rpc/src/types/transaction_block_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,8 @@ impl TransactionBlockEffects {
return Ok(connection);
};

let Some((prev, next, _, cs)) = page.paginate_consistent_indices(
stored_tx.balance_changes.len(),
self.checkpoint_viewed_at,
)?
let Some((prev, next, _, cs)) = page
.paginate_consistent_indices(stored_tx.get_balance_len(), self.checkpoint_viewed_at)?
else {
return Ok(connection);
};
Expand All @@ -369,7 +367,7 @@ impl TransactionBlockEffects {
connection.has_next_page = next;

for c in cs {
let Some(serialized) = &stored_tx.balance_changes[c.ix] else {
let Some(serialized) = &stored_tx.get_balance_at_idx(c.ix) else {
continue;
};

Expand All @@ -394,7 +392,7 @@ impl TransactionBlockEffects {
let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
let mut connection = Connection::new(false, false);
let len = match &self.kind {
TransactionBlockEffectsKind::Stored { stored_tx, .. } => stored_tx.events.len(),
TransactionBlockEffectsKind::Stored { stored_tx, .. } => stored_tx.get_event_len(),
TransactionBlockEffectsKind::Executed { events, .. }
| TransactionBlockEffectsKind::DryRun { events, .. } => events.len(),
};
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/diesel.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ file = "src/schema.rs"
patch_file = "src/schema.patch"

[migrations_directory]
dir = "migrations"
dir = "migrations/mysql"
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
CREATE TABLE events
(
tx_sequence_number BIGINT NOT NULL,
event_sequence_number BIGINT NOT NULL,
transaction_digest blob NOT NULL,
checkpoint_sequence_number bigint NOT NULL,
-- array of SuiAddress in bytes. All signers of the transaction.
senders json NOT NULL,
-- bytes of the entry package ID. Notice that the package and module here
-- are the package and module of the function that emitted the event, diffrent
-- from the package and module of the event type.
package blob NOT NULL,
-- entry module name
module text NOT NULL,
-- StructTag in Display format, fully qualified including type parameters
event_type text NOT NULL,
-- Components of the StructTag of the event type: package, module,
-- name (name of the struct, without type parameters)
event_type_package blob NOT NULL,
event_type_module text NOT NULL,
event_type_name text NOT NULL,
-- timestamp of the checkpoint when the event was emitted
timestamp_ms BIGINT NOT NULL,
-- bcs of the Event contents (Event.contents)
bcs blob NOT NULL,
PRIMARY KEY(tx_sequence_number, event_sequence_number, checkpoint_sequence_number)
) PARTITION BY RANGE (checkpoint_sequence_number) (
PARTITION p0 VALUES LESS THAN MAXVALUE
);
CREATE INDEX events_package ON events (package(255), tx_sequence_number, event_sequence_number);
CREATE INDEX events_package_module ON events (package(255), module(255), tx_sequence_number, event_sequence_number);
CREATE INDEX events_event_type ON events (event_type(255), tx_sequence_number, event_sequence_number);
CREATE INDEX events_type_package_module_name ON events (event_type_package(128), event_type_module(128), event_type_name(128), tx_sequence_number, event_sequence_number);
CREATE INDEX events_checkpoint_sequence_number ON events (checkpoint_sequence_number);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS objects;
DROP TABLE IF EXISTS objects_history;
DROP TABLE IF EXISTS objects_snapshot;

109 changes: 109 additions & 0 deletions crates/sui-indexer/migrations/mysql/2024-03-25-203621_objects/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
-- Your SQL goes here
CREATE TABLE objects (
object_id blob NOT NULL,
object_version bigint NOT NULL,
object_digest blob NOT NULL,
checkpoint_sequence_number bigint NOT NULL,
-- Immutable/Address/Object/Shared, see types.rs
owner_type smallint NOT NULL,
-- bytes of SuiAddress/ObjectID of the owner ID.
-- Non-null for objects with an owner: Addresso or Objects
owner_id blob,
-- Object type
object_type text,
-- Components of the StructTag: package, module, name (name of the struct, without type parameters)
object_type_package blob,
object_type_module text,
object_type_name text,
-- bcs serialized Object
serialized_object mediumblob NOT NULL,
-- Non-null when the object is a coin.
-- e.g. `0x2::sui::SUI`
coin_type text,
-- Non-null when the object is a coin.
coin_balance bigint,
-- DynamicField/DynamicObject, see types.rs
-- Non-null when the object is a dynamic field
df_kind smallint,
-- bcs serialized DynamicFieldName
-- Non-null when the object is a dynamic field
df_name blob,
-- object_type in DynamicFieldInfo.
df_object_type text,
-- object_id in DynamicFieldInfo.
df_object_id blob,
CONSTRAINT objects_pk PRIMARY KEY (object_id(128))
);

-- OwnerType: 1: Address, 2: Object, see types.rs
CREATE INDEX objects_owner ON objects (owner_type, owner_id(128));
CREATE INDEX objects_coin ON objects (owner_id(128), coin_type(128));
CREATE INDEX objects_checkpoint_sequence_number ON objects (checkpoint_sequence_number);
CREATE INDEX objects_package_module_name_full_type ON objects (object_type_package(128), object_type_module(128), object_type_name(128), object_type(128));
CREATE INDEX objects_owner_package_module_name_full_type ON objects (owner_id(128), object_type_package(128), object_type_module(128), object_type_name(128), object_type(128));

-- similar to objects table, except that
-- 1. the primary key to store multiple object versions and partitions by checkpoint_sequence_number
-- 2. allow null values in some columns for deleted / wrapped objects
-- 3. object_status to mark the status of the object, which is either Active or WrappedOrDeleted
CREATE TABLE objects_history (
object_id blob NOT NULL,
object_version bigint NOT NULL,
object_status smallint NOT NULL,
object_digest blob,
checkpoint_sequence_number bigint NOT NULL,
owner_type smallint,
owner_id blob,
object_type text,
-- Components of the StructTag: package, module, name (name of the struct, without type parameters)
object_type_package blob,
object_type_module text,
object_type_name text,
serialized_object mediumblob,
coin_type text,
coin_balance bigint,
df_kind smallint,
df_name blob,
df_object_type text,
df_object_id blob,
CONSTRAINT objects_history_pk PRIMARY KEY (checkpoint_sequence_number, object_id(128), object_version)
) PARTITION BY RANGE (checkpoint_sequence_number) (
PARTITION p0 VALUES LESS THAN MAXVALUE
);
CREATE INDEX objects_history_id_version ON objects_history (object_id(128), object_version, checkpoint_sequence_number);
CREATE INDEX objects_history_owner ON objects_history (checkpoint_sequence_number, owner_type, owner_id(128));
CREATE INDEX objects_history_coin ON objects_history (checkpoint_sequence_number, owner_id(128), coin_type(128));
CREATE INDEX objects_history_type ON objects_history (checkpoint_sequence_number, object_type(128));
CREATE INDEX objects_history_package_module_name_full_type ON objects_history (checkpoint_sequence_number, object_type_package(128), object_type_module(128), object_type_name(128), object_type(128));
CREATE INDEX objects_history_owner_package_module_name_full_type ON objects_history (checkpoint_sequence_number, owner_id(128), object_type_package(128), object_type_module(128), object_type_name(128), object_type(128));

-- snapshot table by folding objects_history table until certain checkpoint,
-- effectively the snapshot of objects at the same checkpoint,
-- except that it also includes deleted or wrapped objects with the corresponding object_status.
CREATE TABLE objects_snapshot (
object_id BLOB NOT NULL,
object_version bigint NOT NULL,
object_status smallint NOT NULL,
object_digest BLOB,
checkpoint_sequence_number bigint NOT NULL,
owner_type smallint,
owner_id BLOB,
object_type text,
object_type_package blob,
object_type_module text,
object_type_name text,
serialized_object mediumblob,
coin_type text,
coin_balance bigint,
df_kind smallint,
df_name BLOB,
df_object_type text,
df_object_id BLOB,
CONSTRAINT objects_snapshot_pk PRIMARY KEY (object_id(128))
);
CREATE INDEX objects_snapshot_checkpoint_sequence_number ON objects_snapshot (checkpoint_sequence_number);
CREATE INDEX objects_snapshot_owner ON objects_snapshot (owner_type, owner_id(128), object_id(128));
CREATE INDEX objects_snapshot_coin ON objects_snapshot (owner_id(128), coin_type(128), object_id(128));
CREATE INDEX objects_snapshot_package_module_name_full_type ON objects_snapshot (object_type_package(128), object_type_module(128), object_type_name(128), object_type(128));
CREATE INDEX objects_snapshot_owner_package_module_name_full_type ON objects_snapshot (owner_id(128), object_type_package(128), object_type_module(128), object_type_name(128), object_type(128));

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file should undo anything in `up.sql`




Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
CREATE TABLE transactions (
tx_sequence_number BIGINT NOT NULL,
transaction_digest BLOB NOT NULL,
-- bcs serialized SenderSignedData bytes
raw_transaction BLOB NOT NULL,
-- bcs serialized TransactionEffects bytes
raw_effects BLOB NOT NULL,
checkpoint_sequence_number BIGINT NOT NULL,
timestamp_ms BIGINT NOT NULL,
-- array of bcs serialized IndexedObjectChange bytes
object_changes JSON NOT NULL,
-- array of bcs serialized BalanceChange bytes
balance_changes JSON NOT NULL,
-- array of bcs serialized StoredEvent bytes
events JSON NOT NULL,
-- SystemTransaction/ProgrammableTransaction. See types.rs
transaction_kind smallint NOT NULL,
-- number of successful commands in this transaction, bound by number of command
-- in a programmaable transaction.
success_command_count smallint NOT NULL,
CONSTRAINT transactions_pkey PRIMARY KEY (tx_sequence_number, checkpoint_sequence_number)
) PARTITION BY RANGE (checkpoint_sequence_number) (
PARTITION p0 VALUES LESS THAN MAXVALUE
);

CREATE INDEX transactions_transaction_digest ON transactions (transaction_digest(255));
CREATE INDEX transactions_checkpoint_sequence_number ON transactions (checkpoint_sequence_number);
-- only create index for system transactions (0). See types.rs
CREATE INDEX transactions_transaction_kind ON transactions (transaction_kind);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS checkpoints;
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
CREATE TABLE checkpoints
(
sequence_number bigint PRIMARY KEY,
checkpoint_digest blob NOT NULL,
epoch bigint NOT NULL,
-- total transactions in the network at the end of this checkpoint (including itself)
network_total_transactions bigint NOT NULL,
previous_checkpoint_digest blob,
-- if this checkpoitn is the last checkpoint of an epoch
end_of_epoch boolean NOT NULL,
-- array of TranscationDigest in bytes included in this checkpoint
tx_digests JSON NOT NULL,
timestamp_ms BIGINT NOT NULL,
total_gas_cost BIGINT NOT NULL,
computation_cost BIGINT NOT NULL,
storage_cost BIGINT NOT NULL,
storage_rebate BIGINT NOT NULL,
non_refundable_storage_fee BIGINT NOT NULL,
-- bcs serialized Vec<CheckpointCommitment> bytes
checkpoint_commitments blob NOT NULL,
-- bcs serialized AggregateAuthoritySignature bytes
validator_signature blob NOT NULL,
-- bcs serialzied EndOfEpochData bytes, if the checkpoint marks end of an epoch
end_of_epoch_data blob
);

CREATE INDEX checkpoints_epoch ON checkpoints (epoch, sequence_number);
CREATE INDEX checkpoints_digest ON checkpoints (checkpoint_digest(255));
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS epochs;
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
CREATE TABLE epochs
(
epoch BIGINT PRIMARY KEY,
first_checkpoint_id BIGINT NOT NULL,
epoch_start_timestamp BIGINT NOT NULL,
reference_gas_price BIGINT NOT NULL,
protocol_version BIGINT NOT NULL,
total_stake BIGINT NOT NULL,
storage_fund_balance BIGINT NOT NULL,
system_state BLOB NOT NULL,
-- The following fields are nullable because they are filled in
-- only at the end of an epoch.
epoch_total_transactions BIGINT,
last_checkpoint_id BIGINT,
epoch_end_timestamp BIGINT,
-- The following fields are from SystemEpochInfoEvent emitted
-- **after** advancing to the next epoch
storage_fund_reinvestment BIGINT,
storage_charge BIGINT,
storage_rebate BIGINT,
stake_subsidy_amount BIGINT,
total_gas_fees BIGINT,
total_stake_rewards_distributed BIGINT,
leftover_storage_fund_inflow BIGINT,
-- bcs serialized Vec<EpochCommitment> bytes, found in last CheckpointSummary
-- of the epoch
epoch_commitments BLOB
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS packages;


Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE packages
(
package_id blob NOT NULL,
-- bcs serialized MovePackage
move_package blob NOT NULL,
CONSTRAINT packages_pk PRIMARY KEY (package_id(255))
);
Loading

0 comments on commit c1dfdcd

Please sign in to comment.