Skip to content

Commit

Permalink
indexer fix: multiple object mutations in one checkpoint (#18991)
Browse files Browse the repository at this point in the history
## Description 

per Xun's report 

https://linear.app/mysten-labs/issue/DP-43/bug-epochendindexingobjectstore-might-contain-multiple-versions-of

## Test plan 

CI
ideally in the long run we want to have an embedded DB and test it on CI
with a test, but we lack that today.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp committed Aug 22, 2024
1 parent bacc76c commit bd0dc2f
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 167 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/rest_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl IndexStoreTables {

for tx in &checkpoint.transactions {
// determine changes from removed objects
for removed_object in tx.removed_objects() {
for removed_object in tx.removed_objects_pre_version() {
match removed_object.owner() {
Owner::AddressOwner(address) => {
let owner_key = OwnerIndexKey::new(*address, removed_object.id());
Expand Down
225 changes: 70 additions & 155 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,45 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::handlers::committer::start_tx_checkpoint_commit_task;
use crate::handlers::tx_processor::IndexingPackageBuffer;
use crate::models::display::StoredDisplay;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use diesel::r2d2::R2D2Connection;
use itertools::Itertools;
use tap::tap::TapFallible;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use sui_data_ingestion_core::Worker;
use sui_json_rpc_types::SuiMoveValue;
use sui_package_resolver::{PackageStore, PackageStoreWithLruCache, Resolver};
use sui_rest_api::{CheckpointData, CheckpointTransaction};
use sui_types::base_types::ObjectRef;
use sui_types::base_types::ObjectID;
use sui_types::dynamic_field::DynamicFieldInfo;
use sui_types::dynamic_field::DynamicFieldName;
use sui_types::dynamic_field::DynamicFieldType;
use sui_types::effects::TransactionEffectsAPI;
use sui_types::event::SystemEpochInfoEvent;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
};
use sui_types::object::Object;
use tokio_util::sync::CancellationToken;

use tokio::sync::watch;

use diesel::r2d2::R2D2Connection;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use sui_data_ingestion_core::Worker;
use sui_json_rpc_types::SuiMoveValue;
use sui_types::base_types::SequenceNumber;
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
use sui_types::event::SystemEpochInfoEvent;
use sui_types::object::Owner;
use sui_types::transaction::TransactionDataAPI;
use tap::tap::TapFallible;
use tracing::{info, warn};

use sui_types::base_types::ObjectID;
use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait};
use sui_types::transaction::TransactionDataAPI;

use crate::db::ConnectionPool;
use crate::errors::IndexerError;
use crate::handlers::committer::start_tx_checkpoint_commit_task;
use crate::handlers::tx_processor::IndexingPackageBuffer;
use crate::metrics::IndexerMetrics;

use crate::db::ConnectionPool;
use crate::models::display::StoredDisplay;
use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver};
use crate::store::{IndexerStore, PgIndexerStore};
use crate::types::{
Expand Down Expand Up @@ -537,71 +531,41 @@ where
) -> Result<TransactionObjectChangesToCommit, IndexerError> {
let _timer = metrics.indexing_objects_latency.start_timer();
let checkpoint_seq = data.checkpoint_summary.sequence_number;
let deleted_objects = data
.transactions
.iter()
.flat_map(|tx| get_deleted_objects(&tx.effects))
.collect::<Vec<_>>();
let deleted_object_ids = deleted_objects
.iter()
.map(|o| (o.0, o.1))
.collect::<HashSet<_>>();
let indexed_deleted_objects = deleted_objects

let eventually_removed_object_refs_post_version =
data.eventually_removed_object_refs_post_version();
let indexed_eventually_removed_objects = eventually_removed_object_refs_post_version
.into_iter()
.map(|o| IndexedDeletedObject {
object_id: o.0,
object_version: o.1.value(),
.map(|obj_ref| IndexedDeletedObject {
object_id: obj_ref.0,
object_version: obj_ref.1.into(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

let (latest_objects, intermediate_versions) = get_latest_objects(data.output_objects());

let live_objects: Vec<Object> = data
.transactions
.iter()
.flat_map(|tx| {
let CheckpointTransaction {
transaction: tx,
effects: fx,
..
} = tx;
fx.all_changed_objects()
.into_iter()
.filter_map(|(oref, _owner, _kind)| {
// We don't care about objects that are deleted or updated more than once
if intermediate_versions.contains(&(oref.0, oref.1))
|| deleted_object_ids.contains(&(oref.0, oref.1))
{
return None;
}
let object = latest_objects.get(&(oref.0)).unwrap_or_else(|| {
panic!(
"object {:?} not found in CheckpointData (tx_digest: {})",
oref.0,
tx.digest()
)
});
assert_eq!(oref.1, object.version());
Some(object.clone())
})
.collect::<Vec<_>>()
})
.collect();

let latest_live_output_objects = data.latest_live_output_objects();
let latest_live_output_object_map = latest_live_output_objects
.clone()
.into_iter()
.map(|o| (o.id(), o.clone()))
.collect::<HashMap<_, _>>();
let move_struct_layout_map =
get_move_struct_layout_map(&live_objects, package_resolver).await?;
let changed_objects = live_objects
get_move_struct_layout_map(latest_live_output_objects.clone(), package_resolver)
.await?;
let changed_objects = latest_live_output_objects
.into_iter()
.map(|o| {
let df_info =
try_create_dynamic_field_info(&o, &move_struct_layout_map, &latest_objects);
df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o, info))
let df_info = try_create_dynamic_field_info(
o,
&move_struct_layout_map,
&latest_live_output_object_map,
);
df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o.clone(), info))
})
.collect::<Result<Vec<_>, _>>()?;
Ok(TransactionObjectChangesToCommit {
changed_objects,
deleted_objects: indexed_deleted_objects,
deleted_objects: indexed_eventually_removed_objects,
})
}

Expand All @@ -614,59 +578,42 @@ where
let deleted_objects = data
.transactions
.iter()
.flat_map(|tx| get_deleted_objects(&tx.effects))
.flat_map(|tx| tx.removed_object_refs_post_version())
.collect::<Vec<_>>();
let indexed_deleted_objects: Vec<IndexedDeletedObject> = deleted_objects
.into_iter()
.map(|o| IndexedDeletedObject {
object_id: o.0,
object_version: o.1.value(),
.map(|obj_ref| IndexedDeletedObject {
object_id: obj_ref.0,
object_version: obj_ref.1.into(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

let (latest_objects, _) = get_latest_objects(data.output_objects());
let history_object_map = data
.output_objects()
let latest_live_output_objects = data.latest_live_output_objects();
let latest_live_output_object_map = latest_live_output_objects
.clone()
.into_iter()
.map(|o| ((o.id(), o.version()), o.clone()))
.map(|o| (o.id(), o.clone()))
.collect::<HashMap<_, _>>();

let history_objects: Vec<Object> = data
let output_objects = data
.transactions
.iter()
.flat_map(|tx| {
let CheckpointTransaction {
transaction: tx,
effects: fx,
..
} = tx;
fx.all_changed_objects()
.into_iter()
.map(|(oref, _owner, _kind)| {
let history_object = history_object_map.get(&(oref.0, oref.1)).unwrap_or_else(|| {
panic!(
"object {:?} version {:?} not found in CheckpointData (tx_digest: {})",
oref.0,
oref.1,
tx.digest()
)
});
assert_eq!(oref.2, history_object.digest());
history_object.clone()
})
.collect::<Vec<_>>()
})
.collect();

.flat_map(|tx| &tx.output_objects)
.collect::<Vec<_>>();
// TODO(gegaowp): the current df_info implementation is not correct,
// but we have decided remove all df_* except df_kind.
let move_struct_layout_map =
get_move_struct_layout_map(&history_objects, package_resolver).await?;
let changed_objects = history_objects
get_move_struct_layout_map(output_objects.clone(), package_resolver).await?;
let changed_objects = output_objects
.into_iter()
.map(|o| {
let df_info =
try_create_dynamic_field_info(&o, &move_struct_layout_map, &latest_objects);
df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o, info))
let df_info = try_create_dynamic_field_info(
o,
&move_struct_layout_map,
&latest_live_output_object_map,
);
df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o.clone(), info))
})
.collect::<Result<Vec<_>, _>>()?;

Expand All @@ -685,8 +632,9 @@ where
.iter()
.flat_map(|data| {
let checkpoint_sequence_number = data.checkpoint_summary.sequence_number;
data.output_objects()
data.transactions
.iter()
.flat_map(|tx| &tx.output_objects)
.filter_map(|o| {
if let sui_types::object::Data::Package(p) = &o.data {
Some(IndexedPackage {
Expand All @@ -710,16 +658,17 @@ where
.iter()
.flat_map(|data| {
let checkpoint_sequence_number = data.checkpoint_summary.sequence_number;
data.output_objects()
data.transactions
.iter()
.flat_map(|tx| &tx.output_objects)
.filter_map(|o| {
if let sui_types::object::Data::Package(p) = &o.data {
let indexed_pkg = IndexedPackage {
package_id: o.id(),
move_package: p.clone(),
checkpoint_sequence_number,
};
Some((indexed_pkg, (**o).clone()))
Some((indexed_pkg, o.clone()))
} else {
None
}
Expand All @@ -741,11 +690,11 @@ where
}

async fn get_move_struct_layout_map(
objects: &[Object],
objects: Vec<&Object>,
package_resolver: Arc<Resolver<impl PackageStore>>,
) -> Result<HashMap<StructTag, MoveStructLayout>, IndexerError> {
let struct_tags = objects
.iter()
.into_iter()
.filter_map(|o| {
let move_object = o.data.try_as_move().cloned();
move_object.map(|move_object| {
Expand Down Expand Up @@ -796,40 +745,6 @@ async fn get_move_struct_layout_map(
Ok(move_struct_layout_map)
}

pub fn get_deleted_objects(effects: &TransactionEffects) -> Vec<ObjectRef> {
let deleted = effects.deleted().into_iter();
let wrapped = effects.wrapped().into_iter();
let unwrapped_then_deleted = effects.unwrapped_then_deleted().into_iter();
deleted
.chain(wrapped)
.chain(unwrapped_then_deleted)
.collect::<Vec<_>>()
}

pub fn get_latest_objects(
objects: Vec<&Object>,
) -> (
HashMap<ObjectID, Object>,
HashSet<(ObjectID, SequenceNumber)>,
) {
let mut latest_objects = HashMap::new();
let mut discarded_versions = HashSet::new();
for object in objects {
match latest_objects.entry(object.id()) {
Entry::Vacant(e) => {
e.insert(object.clone());
}
Entry::Occupied(mut e) => {
if object.version() > e.get().version() {
discarded_versions.insert((e.get().id(), e.get().version()));
e.insert(object.clone());
}
}
}
}
(latest_objects, discarded_versions)
}

fn try_create_dynamic_field_info(
o: &Object,
struct_tag_to_move_struct_layout: &HashMap<StructTag, MoveStructLayout>,
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-indexer/src/handlers/tx_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::errors::IndexerError;
use crate::metrics::IndexerMetrics;

use crate::types::IndexedPackage;
use crate::types::{IndexedObjectChange, IndexerResult};

Expand Down Expand Up @@ -289,9 +288,8 @@ pub(crate) struct EpochEndIndexingObjectStore<'a> {

impl<'a> EpochEndIndexingObjectStore<'a> {
pub fn new(data: &'a CheckpointData) -> Self {
// We only care about output objects for end-of-epoch indexing
Self {
objects: data.output_objects(),
objects: data.latest_live_output_objects(),
}
}
}
Expand Down
Loading

0 comments on commit bd0dc2f

Please sign in to comment.