Skip to content

Commit

Permalink
[Indexer] Add full_objects_history table (#19227)
Browse files Browse the repository at this point in the history
## Description 

This is a redo of #18994, but
instead of merging to parking branch, merge it to main.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind committed Sep 6, 2024
1 parent 5928a0d commit 91aaa88
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS full_objects_history;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This table will store every history version of each object, and never get pruned.
-- Since it can grow indefinitely, we keep minimum amount of information in this table for the purpose
-- of point lookups.
CREATE TABLE full_objects_history
(
object_id bytea NOT NULL,
object_version bigint NOT NULL,
serialized_object bytea,
PRIMARY KEY (object_id, object_version)
);
5 changes: 5 additions & 0 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,13 @@ async fn commit_checkpoints<S>(
state.persist_event_indices(event_indices_batch),
state.persist_displays(display_updates_batch),
state.persist_packages(packages_batch),
// TODO: There are a few ways we could make the following more memory efficient.
// 1. persist_objects and persist_object_history both call another function to make the final
// committed object list. We could call it early and share the result.
// 2. We could avoid clone by using Arc.
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_history_changes_batch.clone()),
state.persist_full_objects_history(object_history_changes_batch.clone()),
];
if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
Expand Down
69 changes: 42 additions & 27 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const DATA_INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
];
/// NOTE: for objects_snapshot update and advance_epoch, which are expected to be within [0.1, 100] seconds,
/// and can go up to high hundres of seconds when things go wrong.
/// and can go up to high hundreds of seconds when things go wrong.
const DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS: &[f64] = &[
0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0,
10000.0,
Expand Down Expand Up @@ -111,9 +111,11 @@ pub struct IndexerMetrics {
pub checkpoint_db_commit_latency_objects: Histogram,
pub checkpoint_db_commit_latency_objects_snapshot: Histogram,
pub checkpoint_db_commit_latency_objects_history: Histogram,
pub checkpoint_db_commit_latency_full_objects_history: Histogram,
pub checkpoint_db_commit_latency_objects_chunks: Histogram,
pub checkpoint_db_commit_latency_objects_snapshot_chunks: Histogram,
pub checkpoint_db_commit_latency_objects_history_chunks: Histogram,
pub checkpoint_db_commit_latency_full_objects_history_chunks: Histogram,
pub checkpoint_db_commit_latency_events: Histogram,
pub checkpoint_db_commit_latency_events_chunks: Histogram,
pub checkpoint_db_commit_latency_event_indices: Histogram,
Expand Down Expand Up @@ -199,7 +201,7 @@ impl IndexerMetrics {
)
.unwrap(),
total_transaction_chunk_committed: register_int_counter_with_registry!(
"total_transaction_chunk_commited",
"total_transaction_chunk_committed",
"Total number of transaction chunk committed",
registry,
)
Expand Down Expand Up @@ -291,7 +293,7 @@ impl IndexerMetrics {
).unwrap(),
fullnode_checkpoint_data_download_latency: register_histogram_with_registry!(
"fullnode_checkpoint_data_download_latency",
"Time spent in downloading checkpoint and transation for a new checkpoint from the Full Node",
"Time spent in downloading checkpoint and transaction for a new checkpoint from the Full Node",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
Expand Down Expand Up @@ -384,29 +386,29 @@ impl IndexerMetrics {
.unwrap(),
checkpoint_db_commit_latency: register_histogram_with_registry!(
"checkpoint_db_commit_latency",
"Time spent commiting a checkpoint to the db",
"Time spent committing a checkpoint to the db",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),

checkpoint_db_commit_latency_step_1: register_histogram_with_registry!(
"checkpoint_db_commit_latency_step_1",
"Time spent commiting a checkpoint to the db, step 1",
"Time spent committing a checkpoint to the db, step 1",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_transactions: register_histogram_with_registry!(
"checkpoint_db_commit_latency_transactions",
"Time spent commiting transactions",
"Time spent committing transactions",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_transactions_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_transactions_chunks",
"Time spent commiting transactions chunks",
"Time spent committing transactions chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
Expand All @@ -420,103 +422,116 @@ impl IndexerMetrics {
.unwrap(),
checkpoint_db_commit_latency_objects: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects",
"Time spent commiting objects",
"Time spent committing objects",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_snapshot: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_snapshot",
"Time spent commiting objects snapshots",
"Time spent committing objects snapshots",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_history: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_history",
"Time spent commiting objects history",
"Time spent committing objects history",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
checkpoint_db_commit_latency_full_objects_history: register_histogram_with_registry!(
"checkpoint_db_commit_latency_full_objects_history",
"Time spent committing full objects history",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
checkpoint_db_commit_latency_objects_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_chunks",
"Time spent commiting objects chunks",
"Time spent committing objects chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_snapshot_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_snapshot_chunks",
"Time spent commiting objects snapshot chunks",
"Time spent committing objects snapshot chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_objects_history_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_objects_history_chunks",
"Time spent commiting objects history chunks",
"Time spent committing objects history chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
checkpoint_db_commit_latency_full_objects_history_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_full_objects_history_chunks",
"Time spent committing full objects history chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_events: register_histogram_with_registry!(
"checkpoint_db_commit_latency_events",
"Time spent commiting events",
"Time spent committing events",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_events_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_events_chunks",
"Time spent commiting events chunks",
"Time spent committing events chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_event_indices: register_histogram_with_registry!(
"checkpoint_db_commit_latency_event_indices",
"Time spent commiting event indices",
"Time spent committing event indices",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_event_indices_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_event_indices_chunks",
"Time spent commiting event indices chunks",
"Time spent committing event indices chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_packages: register_histogram_with_registry!(
"checkpoint_db_commit_latency_packages",
"Time spent commiting packages",
"Time spent committing packages",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_tx_indices: register_histogram_with_registry!(
"checkpoint_db_commit_latency_tx_indices",
"Time spent commiting tx indices",
"Time spent committing tx indices",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_tx_indices_chunks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_tx_indices_chunks",
"Time spent commiting tx_indices chunks",
"Time spent committing tx_indices chunks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_checkpoints: register_histogram_with_registry!(
"checkpoint_db_commit_latency_checkpoints",
"Time spent commiting checkpoints",
"Time spent committing checkpoints",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
checkpoint_db_commit_latency_epoch: register_histogram_with_registry!(
"checkpoint_db_commit_latency_epochs",
"Time spent commiting epochs",
"Time spent committing epochs",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
Expand All @@ -529,35 +544,35 @@ impl IndexerMetrics {
).unwrap(),
thousand_transaction_avg_db_commit_latency: register_histogram_with_registry!(
"transaction_db_commit_latency",
"Average time spent commiting 1000 transactions to the db",
"Average time spent committing 1000 transactions to the db",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
object_db_commit_latency: register_histogram_with_registry!(
"object_db_commit_latency",
"Time spent commiting a object to the db",
"Time spent committing a object to the db",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
object_mutation_db_commit_latency: register_histogram_with_registry!(
"object_mutation_db_commit_latency",
"Time spent commiting a object mutation to the db",
"Time spent committing a object mutation to the db",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
object_deletion_db_commit_latency: register_histogram_with_registry!(
"object_deletion_db_commit_latency",
"Time spent commiting a object deletion to the db",
"Time spent committing a object deletion to the db",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
epoch_db_commit_latency: register_histogram_with_registry!(
"epoch_db_commit_latency",
"Time spent commiting a epoch to the db",
"Time spent committing a epoch to the db",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
Expand Down
31 changes: 30 additions & 1 deletion crates/sui-indexer/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use sui_types::object::Object;
use sui_types::object::ObjectRead;

use crate::errors::IndexerError;
use crate::schema::{objects, objects_history, objects_snapshot};
use crate::schema::{full_objects_history, objects, objects_history, objects_snapshot};
use crate::types::{owner_to_owner_info, IndexedDeletedObject, IndexedObject, ObjectStatus};

#[derive(Queryable)]
Expand Down Expand Up @@ -544,6 +544,35 @@ impl TryFrom<CoinBalance> for Balance {
}
}

#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)]
#[diesel(table_name = full_objects_history, primary_key(object_id, object_version))]
pub struct StoredFullHistoryObject {
pub object_id: Vec<u8>,
pub object_version: i64,
pub serialized_object: Option<Vec<u8>>,
}

impl From<IndexedObject> for StoredFullHistoryObject {
fn from(o: IndexedObject) -> Self {
let object = o.object;
Self {
object_id: object.id().to_vec(),
object_version: object.version().value() as i64,
serialized_object: Some(bcs::to_bytes(&object).unwrap()),
}
}
}

impl From<IndexedDeletedObject> for StoredFullHistoryObject {
fn from(o: IndexedDeletedObject) -> Self {
Self {
object_id: o.object_id.to_vec(),
object_version: o.object_version as i64,
serialized_object: None,
}
}
}

#[cfg(test)]
mod tests {
use move_core_types::{account_address::AccountAddress, language_storage::StructTag};
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use pg::event_struct_name;
pub use pg::event_struct_package;
pub use pg::events;
pub use pg::feature_flags;
pub use pg::full_objects_history;
pub use pg::objects;
pub use pg::objects_history;
pub use pg::objects_snapshot;
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer/src/schema/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ diesel::table! {
}
}

diesel::table! {
full_objects_history (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
serialized_object -> Nullable<Bytea>,
}
}

diesel::table! {
objects (object_id) {
object_id -> Bytea,
Expand Down
5 changes: 5 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
object_changes: Vec<TransactionObjectChangesToCommit>,
) -> Result<(), IndexerError>;

async fn persist_full_objects_history(
&self,
object_changes: Vec<TransactionObjectChangesToCommit>,
) -> Result<(), IndexerError>;

async fn persist_objects_snapshot(
&self,
object_changes: Vec<TransactionObjectChangesToCommit>,
Expand Down
Loading

0 comments on commit 91aaa88

Please sign in to comment.