Skip to content

Commit

Permalink
[GraphQL] Leverage objects_version table. (#17543)
Browse files Browse the repository at this point in the history
## Description

Use the `objects_version` table to speed up point look-ups (via data
loaders) for historical objects (ID + version), and dynamic fields
(object look-up bounding version by parent ID).

With this change, the restriction of accessing dynamic fields only
within the available range is dropped.

## Test plan

```
sui$ cargo nextest run -p sui-graphql-rpc
sui$ cargo nextest run -p sui-graphql-e2e-tests --features pg_integration.
```

Perform a query that involves fetching a large number of dynamic fields,
which should now be fast. The following example, fetching dynamic fields
on a deepbook pool loads 50 dynamic fields in about 5s from cold (which
also requires loading packages for resolution), and then 2s from there:

```
query {
  owner(
    address: "0x029170bfa0a1677054263424fe4f9960c7cf05d359f6241333994c8830772bdb"
  ) {
    dynamicFields {
      pageInfo {
        hasNextPage
        endCursor
      }
      nodes {
        name {
          type {
            repr
          }
          json
        }
        value {
          ... on MoveValue {
            type {
              repr
            }
            json
          }
          ... on MoveObject {
            contents {
              json
              type {
                repr
              }
            }
          }
        }
      }
    }
  }
}
```

## Stack

- #17686
- #17687
- #17688
- #17689
- #17691
- #17694
- #17695
- #17542
- #17726

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol:
- [ ] Nodes (Validators and Full nodes):
- [ ] Indexer:
- [ ] JSON-RPC:
- [x] GraphQL: Dynamic fields can now be looked up on any historical
object (not just objects in the available range).
- [ ] CLI:
- [ ] Rust SDK:
  • Loading branch information
amnn committed Aug 20, 2024
1 parent c26c121 commit c3a35f4
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,17 @@ task 34, lines 497-528:
Response: {
"data": {
"parent_version_4": {
"dfAtParentVersion4_outside_range": null
"dfAtParentVersion4_outside_range": {
"name": {
"bcs": "A2RmMQ==",
"type": {
"repr": "0x0000000000000000000000000000000000000000000000000000000000000001::string::String"
}
},
"value": {
"json": "df1"
}
}
},
"parent_version_6": {
"dfAtParentVersion6": null
Expand Down
9 changes: 5 additions & 4 deletions crates/sui-graphql-rpc/src/types/dynamic_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sui_types::dynamic_field::{derive_dynamic_field_id, DynamicFieldInfo, Dynami

use super::available_range::AvailableRange;
use super::cursor::{Page, Target};
use super::object::{self, deserialize_move_struct, Object, ObjectKind, ObjectLookup};
use super::object::{self, deserialize_move_struct, Object, ObjectKind};
use super::type_filter::ExactTypeFilter;
use super::{
base64::Base64, move_object::MoveObject, move_value::MoveValue, sui_address::SuiAddress,
Expand Down Expand Up @@ -170,9 +170,10 @@ impl DynamicField {
let super_ = MoveObject::query(
ctx,
SuiAddress::from(field_id),
ObjectLookup::LatestAt {
parent_version,
checkpoint_viewed_at,
if let Some(parent_version) = parent_version {
Object::under_parent(parent_version, checkpoint_viewed_at)
} else {
Object::latest_at(checkpoint_viewed_at)
},
)
.await?;
Expand Down
226 changes: 166 additions & 60 deletions crates/sui-graphql-rpc/src/types/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ use crate::{filter, or_filter};
use async_graphql::connection::{CursorType, Edge};
use async_graphql::dataloader::Loader;
use async_graphql::{connection::Connection, *};
use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl};
use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout};
use move_core_types::language_storage::StructTag;
use serde::{Deserialize, Serialize};
use sui_indexer::models::objects::{StoredDeletedHistoryObject, StoredHistoryObject};
use sui_indexer::schema::objects_history;
use sui_indexer::schema::{objects_history, objects_version};
use sui_indexer::types::ObjectStatus as NativeObjectStatus;
use sui_indexer::types::OwnerType;
use sui_types::object::bounded_visitor::BoundedVisitor;
Expand Down Expand Up @@ -183,9 +183,14 @@ pub(crate) struct AddressOwner {

pub(crate) enum ObjectLookup {
LatestAt {
/// The parent version to be used as an optional upper bound for the query. Look for the
/// latest version of a child object that is less than or equal to this upper bound.
parent_version: Option<u64>,
/// The checkpoint sequence number at which this was viewed at
checkpoint_viewed_at: u64,
},

UnderParent {
/// The parent version to be used as an upper bound for the query. Look for the latest
/// version of a child object whose version is less than or equal to this upper bound.
parent_version: u64,
/// The checkpoint sequence number at which this was viewed at
checkpoint_viewed_at: u64,
},
Expand Down Expand Up @@ -283,13 +288,21 @@ struct HistoricalKey {
checkpoint_viewed_at: u64,
}

/// DataLoader key for fetching the latest version of an `Object` as of a consistency cursor. The
/// query can optionally be bounded by a `parent_version` which imposes an additional requirement
/// that the object's version is bounded above by the parent version.
/// DataLoader key for fetching the latest version of an object whose parent object has version
/// `parent_version`, as of `checkpoint_viewed_at`. This look-up can fail to find a valid object if
/// the key is not self-consistent, for example if the `parent_version` is set to a higher version
/// than the object's actual parent as of `checkpoint_viewed_at`.
#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
struct ParentVersionKey {
id: SuiAddress,
parent_version: u64,
checkpoint_viewed_at: u64,
}

/// DataLoader key for fetching the latest version of an `Object` as of a consistency cursor.
#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
struct LatestAtKey {
id: SuiAddress,
parent_version: Option<u64>,
checkpoint_viewed_at: u64,
}

Expand Down Expand Up @@ -807,16 +820,15 @@ impl Object {
/// Look-up the latest version of the object as of a given checkpoint.
pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
ObjectLookup::LatestAt {
parent_version: None,
checkpoint_viewed_at,
}
}

/// Look-up the latest version of an object whose version is less than or equal to its parent's
/// version, as of a given checkpoint.
pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
ObjectLookup::LatestAt {
parent_version: Some(parent_version),
ObjectLookup::UnderParent {
parent_version,
checkpoint_viewed_at,
}
}
Expand Down Expand Up @@ -849,18 +861,30 @@ impl Object {
})
.await
}
ObjectLookup::LatestAt {

ObjectLookup::UnderParent {
parent_version,
checkpoint_viewed_at,
} => {
loader
.load_one(LatestAtKey {
.load_one(ParentVersionKey {
id,
parent_version,
checkpoint_viewed_at,
})
.await
}

ObjectLookup::LatestAt {
checkpoint_viewed_at,
} => {
loader
.load_one(LatestAtKey {
id,
checkpoint_viewed_at,
})
.await
}
}
}

Expand Down Expand Up @@ -1177,7 +1201,8 @@ impl Loader<HistoricalKey> for Db {
type Error = Error;

async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
use objects_history::dsl;
use objects_history::dsl as h;
use objects_version::dsl as v;

let id_versions: BTreeSet<_> = keys
.iter()
Expand All @@ -1187,12 +1212,19 @@ impl Loader<HistoricalKey> for Db {
let objects: Vec<StoredHistoryObject> = self
.execute(move |conn| {
conn.results(move || {
let mut query = dsl::objects_history.into_boxed();
let mut query = h::objects_history
.inner_join(
v::objects_version.on(v::cp_sequence_number
.eq(h::checkpoint_sequence_number)
.and(v::object_id.eq(h::object_id))
.and(v::object_version.eq(h::object_version))),
)
.select(StoredHistoryObject::as_select())
.into_boxed();

// TODO: Speed up using an `obj_version` table.
for (id, version) in id_versions.iter().cloned() {
query = query
.or_filter(dsl::object_id.eq(id).and(dsl::object_version.eq(version)));
query =
query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
}

query
Expand Down Expand Up @@ -1234,17 +1266,20 @@ impl Loader<HistoricalKey> for Db {
}

#[async_trait::async_trait]
impl Loader<LatestAtKey> for Db {
impl Loader<ParentVersionKey> for Db {
type Value = Object;
type Error = Error;

async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
async fn load(
&self,
keys: &[ParentVersionKey],
) -> Result<HashMap<ParentVersionKey, Object>, Error> {
// Group keys by checkpoint viewed at and parent version -- we'll issue a separate query for
// each group.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
struct GroupKey {
checkpoint_viewed_at: u64,
parent_version: Option<u64>,
parent_version: u64,
}

let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
Expand All @@ -1257,50 +1292,40 @@ impl Loader<LatestAtKey> for Db {
keys_by_cursor_and_parent_version
.entry(group_key)
.or_default()
.insert(key.id);
.insert(key.id.into_vec());
}

// Issue concurrent reads for each group of keys.
let futures = keys_by_cursor_and_parent_version
.into_iter()
.map(|(group_key, ids)| {
self.execute_repeatable(move |conn| {
let Some(range) = AvailableRange::result(conn, group_key.checkpoint_viewed_at)?
else {
return Ok::<Vec<(GroupKey, StoredHistoryObject)>, diesel::result::Error>(
vec![],
);
};

let filter = ObjectFilter {
object_ids: Some(ids.iter().cloned().collect()),
..Default::default()
};

// TODO: Implement queries that use a parent version bound using an
// `obj_version` table.
let apply_parent_bound = |q: RawQuery| {
if let Some(parent_version) = group_key.parent_version {
filter!(q, format!("object_version <= {parent_version}"))
} else {
q
}
};

Ok(conn
.results(move || {
build_objects_query(
View::Consistent,
range,
&Page::bounded(ids.len() as u64),
|q| apply_parent_bound(filter.apply(q)),
apply_parent_bound,
self.execute(move |conn| {
let stored: Vec<StoredHistoryObject> = conn.results(move || {
use objects_history::dsl as h;
use objects_version::dsl as v;

h::objects_history
.inner_join(
v::objects_version.on(v::cp_sequence_number
.eq(h::checkpoint_sequence_number)
.and(v::object_id.eq(h::object_id))
.and(v::object_version.eq(h::object_version))),
)
.select(StoredHistoryObject::as_select())
.filter(v::object_id.eq_any(ids.iter().cloned()))
.filter(v::object_version.le(group_key.parent_version as i64))
.distinct_on(v::object_id)
.order_by(v::object_id)
.then_order_by(v::object_version.desc())
.into_boxed()
})?
.into_iter()
.map(|r| (group_key, r))
.collect())
})?;

Ok::<_, diesel::result::Error>(
stored
.into_iter()
.map(|stored| (group_key, stored))
.collect::<Vec<_>>(),
)
})
});

Expand All @@ -1312,15 +1337,21 @@ impl Loader<LatestAtKey> for Db {
for (group_key, stored) in
group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
{
// This particular object is invalid -- it didn't exist at the checkpoint we are
// viewing at.
if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
continue;
}

let object = Object::try_from_stored_history_object(
stored,
group_key.checkpoint_viewed_at,
// If `LatestAtKey::parent_version` is set, it must have been correctly
// propagated from the `Object::root_version` of some object.
group_key.parent_version,
Some(group_key.parent_version),
)?;

let key = LatestAtKey {
let key = ParentVersionKey {
id: object.address,
checkpoint_viewed_at: group_key.checkpoint_viewed_at,
parent_version: group_key.parent_version,
Expand All @@ -1334,6 +1365,81 @@ impl Loader<LatestAtKey> for Db {
}
}

#[async_trait::async_trait]
impl Loader<LatestAtKey> for Db {
type Value = Object;
type Error = Error;

async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
// Group keys by checkpoint viewed at -- we'll issue a separate query for each group.
let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();

for key in keys {
keys_by_cursor_and_parent_version
.entry(key.checkpoint_viewed_at)
.or_default()
.insert(key.id);
}

// Issue concurrent reads for each group of keys.
let futures =
keys_by_cursor_and_parent_version
.into_iter()
.map(|(checkpoint_viewed_at, ids)| {
self.execute_repeatable(move |conn| {
let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)?
else {
return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
vec![],
);
};

let filter = ObjectFilter {
object_ids: Some(ids.iter().cloned().collect()),
..Default::default()
};

Ok(conn
.results(move || {
build_objects_query(
View::Consistent,
range,
&Page::bounded(ids.len() as u64),
|q| filter.apply(q),
|q| q,
)
.into_boxed()
})?
.into_iter()
.map(|r| (checkpoint_viewed_at, r))
.collect())
})
});

// Wait for the reads to all finish, and gather them into the result map.
let groups = futures::future::join_all(futures).await;

let mut results = HashMap::new();
for group in groups {
for (checkpoint_viewed_at, stored) in
group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
{
let object =
Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;

let key = LatestAtKey {
id: object.address,
checkpoint_viewed_at,
};

results.insert(key, object);
}
}

Ok(results)
}
}

impl From<&ObjectKind> for ObjectStatus {
fn from(kind: &ObjectKind) -> Self {
match kind {
Expand Down
Loading

0 comments on commit c3a35f4

Please sign in to comment.