diff --git a/crates/sui-core/src/rest_index.rs b/crates/sui-core/src/rest_index.rs index beb762c6d6a7f..e999cfe2abc2e 100644 --- a/crates/sui-core/src/rest_index.rs +++ b/crates/sui-core/src/rest_index.rs @@ -406,7 +406,7 @@ impl IndexStoreTables { for tx in &checkpoint.transactions { // determine changes from removed objects - for removed_object in tx.removed_objects() { + for removed_object in tx.removed_objects_pre_version() { match removed_object.owner() { Owner::AddressOwner(address) => { let owner_key = OwnerIndexKey::new(*address, removed_object.id()); diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 7a8f8efe67fa9..a5273f6302e80 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -1,51 +1,45 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::handlers::committer::start_tx_checkpoint_commit_task; -use crate::handlers::tx_processor::IndexingPackageBuffer; -use crate::models::display::StoredDisplay; +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Mutex}; + use async_trait::async_trait; +use diesel::r2d2::R2D2Connection; use itertools::Itertools; +use tap::tap::TapFallible; +use tokio::sync::watch; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout}; use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; -use std::collections::{BTreeMap, HashMap}; -use std::sync::{Arc, Mutex}; +use sui_data_ingestion_core::Worker; +use sui_json_rpc_types::SuiMoveValue; use sui_package_resolver::{PackageStore, PackageStoreWithLruCache, Resolver}; use sui_rest_api::{CheckpointData, CheckpointTransaction}; -use sui_types::base_types::ObjectRef; +use sui_types::base_types::ObjectID; use sui_types::dynamic_field::DynamicFieldInfo; use sui_types::dynamic_field::DynamicFieldName; use sui_types::dynamic_field::DynamicFieldType; +use sui_types::effects::TransactionEffectsAPI; +use sui_types::event::SystemEpochInfoEvent; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, }; use sui_types::object::Object; -use tokio_util::sync::CancellationToken; - -use tokio::sync::watch; - -use diesel::r2d2::R2D2Connection; -use std::collections::hash_map::Entry; -use std::collections::HashSet; -use sui_data_ingestion_core::Worker; -use sui_json_rpc_types::SuiMoveValue; -use sui_types::base_types::SequenceNumber; -use sui_types::effects::{TransactionEffects, TransactionEffectsAPI}; -use sui_types::event::SystemEpochInfoEvent; use sui_types::object::Owner; -use sui_types::transaction::TransactionDataAPI; -use tap::tap::TapFallible; -use tracing::{info, warn}; - -use sui_types::base_types::ObjectID; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait}; +use sui_types::transaction::TransactionDataAPI; +use crate::db::ConnectionPool; use crate::errors::IndexerError; +use crate::handlers::committer::start_tx_checkpoint_commit_task; +use crate::handlers::tx_processor::IndexingPackageBuffer; use crate::metrics::IndexerMetrics; - -use crate::db::ConnectionPool; +use crate::models::display::StoredDisplay; use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver}; use crate::store::{IndexerStore, PgIndexerStore}; use crate::types::{ @@ -537,71 +531,41 @@ where ) -> Result { let _timer = metrics.indexing_objects_latency.start_timer(); let checkpoint_seq = data.checkpoint_summary.sequence_number; - let deleted_objects = data - .transactions - .iter() - .flat_map(|tx| get_deleted_objects(&tx.effects)) - .collect::>(); - let deleted_object_ids = deleted_objects - .iter() - .map(|o| (o.0, o.1)) - .collect::>(); - let indexed_deleted_objects = deleted_objects + + let eventually_removed_object_refs_post_version = + data.eventually_removed_object_refs_post_version(); + let indexed_eventually_removed_objects = eventually_removed_object_refs_post_version .into_iter() - .map(|o| IndexedDeletedObject { - object_id: o.0, - object_version: o.1.value(), + .map(|obj_ref| IndexedDeletedObject { + object_id: obj_ref.0, + object_version: obj_ref.1.into(), checkpoint_sequence_number: checkpoint_seq, }) .collect(); - let (latest_objects, intermediate_versions) = get_latest_objects(data.output_objects()); - - let live_objects: Vec = data - .transactions - .iter() - .flat_map(|tx| { - let CheckpointTransaction { - transaction: tx, - effects: fx, - .. - } = tx; - fx.all_changed_objects() - .into_iter() - .filter_map(|(oref, _owner, _kind)| { - // We don't care about objects that are deleted or updated more than once - if intermediate_versions.contains(&(oref.0, oref.1)) - || deleted_object_ids.contains(&(oref.0, oref.1)) - { - return None; - } - let object = latest_objects.get(&(oref.0)).unwrap_or_else(|| { - panic!( - "object {:?} not found in CheckpointData (tx_digest: {})", - oref.0, - tx.digest() - ) - }); - assert_eq!(oref.1, object.version()); - Some(object.clone()) - }) - .collect::>() - }) - .collect(); - + let latest_live_output_objects = data.latest_live_output_objects(); + let latest_live_output_object_map = latest_live_output_objects + .clone() + .into_iter() + .map(|o| (o.id(), o.clone())) + .collect::>(); let move_struct_layout_map = - get_move_struct_layout_map(&live_objects, package_resolver).await?; - let changed_objects = live_objects + get_move_struct_layout_map(latest_live_output_objects.clone(), package_resolver) + .await?; + let changed_objects = latest_live_output_objects .into_iter() .map(|o| { - let df_info = - try_create_dynamic_field_info(&o, &move_struct_layout_map, &latest_objects); - df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o, info)) + let df_info = try_create_dynamic_field_info( + o, + &move_struct_layout_map, + &latest_live_output_object_map, + ); + df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o.clone(), info)) }) .collect::, _>>()?; Ok(TransactionObjectChangesToCommit { changed_objects, - deleted_objects: indexed_deleted_objects, + deleted_objects: indexed_eventually_removed_objects, }) } @@ -614,59 +578,42 @@ where let deleted_objects = data .transactions .iter() - .flat_map(|tx| get_deleted_objects(&tx.effects)) + .flat_map(|tx| tx.removed_object_refs_post_version()) .collect::>(); let indexed_deleted_objects: Vec = deleted_objects .into_iter() - .map(|o| IndexedDeletedObject { - object_id: o.0, - object_version: o.1.value(), + .map(|obj_ref| IndexedDeletedObject { + object_id: obj_ref.0, + object_version: obj_ref.1.into(), checkpoint_sequence_number: checkpoint_seq, }) .collect(); - let (latest_objects, _) = get_latest_objects(data.output_objects()); - let history_object_map = data - .output_objects() + let latest_live_output_objects = data.latest_live_output_objects(); + let latest_live_output_object_map = latest_live_output_objects + .clone() .into_iter() - .map(|o| ((o.id(), o.version()), o.clone())) + .map(|o| (o.id(), o.clone())) .collect::>(); - let history_objects: Vec = data + let output_objects = data .transactions .iter() - .flat_map(|tx| { - let CheckpointTransaction { - transaction: tx, - effects: fx, - .. - } = tx; - fx.all_changed_objects() - .into_iter() - .map(|(oref, _owner, _kind)| { - let history_object = history_object_map.get(&(oref.0, oref.1)).unwrap_or_else(|| { - panic!( - "object {:?} version {:?} not found in CheckpointData (tx_digest: {})", - oref.0, - oref.1, - tx.digest() - ) - }); - assert_eq!(oref.2, history_object.digest()); - history_object.clone() - }) - .collect::>() - }) - .collect(); - + .flat_map(|tx| &tx.output_objects) + .collect::>(); + // TODO(gegaowp): the current df_info implementation is not correct, + // but we have decided remove all df_* except df_kind. let move_struct_layout_map = - get_move_struct_layout_map(&history_objects, package_resolver).await?; - let changed_objects = history_objects + get_move_struct_layout_map(output_objects.clone(), package_resolver).await?; + let changed_objects = output_objects .into_iter() .map(|o| { - let df_info = - try_create_dynamic_field_info(&o, &move_struct_layout_map, &latest_objects); - df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o, info)) + let df_info = try_create_dynamic_field_info( + o, + &move_struct_layout_map, + &latest_live_output_object_map, + ); + df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o.clone(), info)) }) .collect::, _>>()?; @@ -685,8 +632,9 @@ where .iter() .flat_map(|data| { let checkpoint_sequence_number = data.checkpoint_summary.sequence_number; - data.output_objects() + data.transactions .iter() + .flat_map(|tx| &tx.output_objects) .filter_map(|o| { if let sui_types::object::Data::Package(p) = &o.data { Some(IndexedPackage { @@ -710,8 +658,9 @@ where .iter() .flat_map(|data| { let checkpoint_sequence_number = data.checkpoint_summary.sequence_number; - data.output_objects() + data.transactions .iter() + .flat_map(|tx| &tx.output_objects) .filter_map(|o| { if let sui_types::object::Data::Package(p) = &o.data { let indexed_pkg = IndexedPackage { @@ -719,7 +668,7 @@ where move_package: p.clone(), checkpoint_sequence_number, }; - Some((indexed_pkg, (**o).clone())) + Some((indexed_pkg, o.clone())) } else { None } @@ -741,11 +690,11 @@ where } async fn get_move_struct_layout_map( - objects: &[Object], + objects: Vec<&Object>, package_resolver: Arc>, ) -> Result, IndexerError> { let struct_tags = objects - .iter() + .into_iter() .filter_map(|o| { let move_object = o.data.try_as_move().cloned(); move_object.map(|move_object| { @@ -796,40 +745,6 @@ async fn get_move_struct_layout_map( Ok(move_struct_layout_map) } -pub fn get_deleted_objects(effects: &TransactionEffects) -> Vec { - let deleted = effects.deleted().into_iter(); - let wrapped = effects.wrapped().into_iter(); - let unwrapped_then_deleted = effects.unwrapped_then_deleted().into_iter(); - deleted - .chain(wrapped) - .chain(unwrapped_then_deleted) - .collect::>() -} - -pub fn get_latest_objects( - objects: Vec<&Object>, -) -> ( - HashMap, - HashSet<(ObjectID, SequenceNumber)>, -) { - let mut latest_objects = HashMap::new(); - let mut discarded_versions = HashSet::new(); - for object in objects { - match latest_objects.entry(object.id()) { - Entry::Vacant(e) => { - e.insert(object.clone()); - } - Entry::Occupied(mut e) => { - if object.version() > e.get().version() { - discarded_versions.insert((e.get().id(), e.get().version())); - e.insert(object.clone()); - } - } - } - } - (latest_objects, discarded_versions) -} - fn try_create_dynamic_field_info( o: &Object, struct_tag_to_move_struct_layout: &HashMap, diff --git a/crates/sui-indexer/src/handlers/tx_processor.rs b/crates/sui-indexer/src/handlers/tx_processor.rs index 04d96d6eafd04..2ee43f5880ead 100644 --- a/crates/sui-indexer/src/handlers/tx_processor.rs +++ b/crates/sui-indexer/src/handlers/tx_processor.rs @@ -30,7 +30,6 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::errors::IndexerError; use crate::metrics::IndexerMetrics; - use crate::types::IndexedPackage; use crate::types::{IndexedObjectChange, IndexerResult}; @@ -289,9 +288,8 @@ pub(crate) struct EpochEndIndexingObjectStore<'a> { impl<'a> EpochEndIndexingObjectStore<'a> { pub fn new(data: &'a CheckpointData) -> Self { - // We only care about output objects for end-of-epoch indexing Self { - objects: data.output_objects(), + objects: data.latest_live_output_objects(), } } } diff --git a/crates/sui-types/src/full_checkpoint_content.rs b/crates/sui-types/src/full_checkpoint_content.rs index 02b7a897c3bed..4d0cb95217bbc 100644 --- a/crates/sui-types/src/full_checkpoint_content.rs +++ b/crates/sui-types/src/full_checkpoint_content.rs @@ -1,7 +1,12 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::effects::{IDOperation, ObjectIn, ObjectOut, TransactionEffects, TransactionEvents}; +use std::collections::BTreeMap; + +use crate::base_types::ObjectRef; +use crate::effects::{ + IDOperation, ObjectIn, ObjectOut, TransactionEffects, TransactionEffectsAPI, TransactionEvents, +}; use crate::messages_checkpoint::{CertifiedCheckpointSummary, CheckpointContents}; use crate::object::Object; use crate::storage::BackingPackageStore; @@ -18,11 +23,32 @@ pub struct CheckpointData { } impl CheckpointData { - pub fn output_objects(&self) -> Vec<&Object> { - self.transactions - .iter() - .flat_map(|tx| &tx.output_objects) - .collect() + // returns the latest versions of the output objects that still exist at the end of the checkpoint + pub fn latest_live_output_objects(&self) -> Vec<&Object> { + let mut latest_live_objects = BTreeMap::new(); + for tx in self.transactions.iter() { + for obj in tx.output_objects.iter() { + latest_live_objects.insert(obj.id(), obj); + } + for obj_ref in tx.removed_object_refs_post_version() { + latest_live_objects.remove(&(obj_ref.0)); + } + } + latest_live_objects.into_values().collect() + } + + // returns the object refs that are eventually deleted or wrapped in the current checkpoint + pub fn eventually_removed_object_refs_post_version(&self) -> Vec { + let mut eventually_removed_object_refs = BTreeMap::new(); + for tx in self.transactions.iter() { + for obj_ref in tx.removed_object_refs_post_version() { + eventually_removed_object_refs.insert(obj_ref.0, obj_ref); + } + for obj in tx.output_objects.iter() { + eventually_removed_object_refs.remove(&(obj.id())); + } + } + eventually_removed_object_refs.into_values().collect() } pub fn input_objects(&self) -> Vec<&Object> { @@ -51,19 +77,21 @@ pub struct CheckpointTransaction { pub events: Option, /// The state of all inputs to this transaction as they were prior to execution. pub input_objects: Vec, - /// The state of all output objects created or mutated by this transaction. + /// The state of all output objects created or mutated or unwrapped by this transaction. pub output_objects: Vec, } impl CheckpointTransaction { // provide an iterator over all deleted or wrapped objects in this transaction - pub fn removed_objects(&self) -> impl Iterator { + pub fn removed_objects_pre_version(&self) -> impl Iterator { // Iterator over id and versions for all deleted or wrapped objects match &self.effects { TransactionEffects::V1(v1) => Either::Left( // Effects v1 has delted and wrapped objects versions as the "new" version, not the // old one that was actually removed. So we need to take these and then look them // up in the `modified_at_versions`. + // No need to chain unwrapped_then_deleted because these objects must have been wrapped + // before the transaction, hence they will not be in modified_at_versions / input_objects. v1.deleted().iter().chain(v1.wrapped()).map(|(id, _, _)| { // lookup the old version for mutated objects let (_, old_version) = v1 @@ -108,6 +136,13 @@ impl CheckpointTransaction { }) } + pub fn removed_object_refs_post_version(&self) -> impl Iterator { + let deleted = self.effects.deleted().into_iter(); + let wrapped = self.effects.wrapped().into_iter(); + let unwrapped_then_deleted = self.effects.unwrapped_then_deleted().into_iter(); + deleted.chain(wrapped).chain(unwrapped_then_deleted) + } + pub fn changed_objects(&self) -> impl Iterator)> { // Iterator over ((ObjectId, new version), Option) match &self.effects {