Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, graph, runtime: Add store.get_in_block #4540

Merged
merged 1 commit into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::{Block, Blockchain, DataSource as _, TriggerFilter as _};
use graph::components::store::{EmptyStore, EntityKey, StoredDynamicDataSource};
use graph::components::store::{EmptyStore, EntityKey, GetScope, StoredDynamicDataSource};
use graph::components::{
store::ModificationsAndCache,
subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing},
Expand Down Expand Up @@ -1034,14 +1034,13 @@ async fn update_proof_of_indexing(
};

// Grab the current digest attribute on this entity
let prev_poi =
entity_cache
.get(&entity_key)
.map_err(Error::from)?
.map(|entity| match entity.get("digest") {
Some(Value::Bytes(b)) => b.clone(),
_ => panic!("Expected POI entity to have a digest and for it to be bytes"),
});
let prev_poi = entity_cache
.get(&entity_key, GetScope::Store)
.map_err(Error::from)?
.map(|entity| match entity.get("digest") {
Some(Value::Bytes(b)) => b.clone(),
_ => panic!("Expected POI entity to have a digest and for it to be bytes"),
});

// Finish the POI stream, getting the new POI value.
let updated_proof_of_indexing = stream.pause(prev_poi.as_deref());
Expand Down
33 changes: 26 additions & 7 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use crate::util::lfu_cache::LfuCache;

use super::{DerivedEntityQuery, EntityType, LoadRelatedRequest};

/// The scope in which the `EntityCache` should perform a `get` operation
pub enum GetScope {
/// Get from all previously stored entities in the store
Store,
/// Get from the entities that have been stored during this block
InBlock,
}

/// A cache for entities from the store that provides the basic functionality
/// needed for the store interactions in the host exports. This struct tracks
/// how entities are modified, and caches all entities looked up from the
Expand Down Expand Up @@ -98,18 +106,29 @@ impl EntityCache {
self.handler_updates.clear();
}

pub fn get(&mut self, eref: &EntityKey) -> Result<Option<Entity>, s::QueryExecutionError> {
pub fn get(
&mut self,
key: &EntityKey,
scope: GetScope,
) -> Result<Option<Entity>, s::QueryExecutionError> {
// Get the current entity, apply any updates from `updates`, then
// from `handler_updates`.
let mut entity = self.current.get_entity(&*self.store, eref)?;
let mut entity = match scope {
GetScope::Store => self.current.get_entity(&*self.store, key)?,
GetScope::InBlock => None,
};

// Always test the cache consistency in debug mode.
debug_assert!(entity == self.store.get(eref).unwrap());
// Always test the cache consistency in debug mode. The test only
// makes sense when we were actually asked to read from the store
debug_assert!(match scope {
GetScope::Store => entity == self.store.get(key).unwrap(),
GetScope::InBlock => true,
});

if let Some(op) = self.updates.get(eref).cloned() {
if let Some(op) = self.updates.get(key).cloned() {
entity = op.apply_to(entity)
}
if let Some(op) = self.handler_updates.get(eref).cloned() {
if let Some(op) = self.handler_updates.get(key).cloned() {
entity = op.apply_to(entity)
}
Ok(entity)
Expand Down Expand Up @@ -183,7 +202,7 @@ impl EntityCache {
// lookup in the database and check again with an entity that merges
// the existing entity with the changes
if !is_valid {
let entity = self.get(&key)?.ok_or_else(|| {
let entity = self.get(&key, GetScope::Store)?.ok_or_else(|| {
anyhow!(
"Failed to read entity {}[{}] back from cache",
key.entity_type,
Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod entity_cache;
mod err;
mod traits;

pub use entity_cache::{EntityCache, ModificationsAndCache};
pub use entity_cache::{EntityCache, GetScope, ModificationsAndCache};

use diesel::types::{FromSql, ToSql};
pub use err::StoreError;
Expand Down
5 changes: 3 additions & 2 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use wasmtime::Trap;
use web3::types::H160;

use graph::blockchain::Blockchain;
use graph::components::store::{EnsLookup, LoadRelatedRequest};
use graph::components::store::{EnsLookup, GetScope, LoadRelatedRequest};
use graph::components::store::{EntityKey, EntityType};
use graph::components::subgraph::{
PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing,
Expand Down Expand Up @@ -225,6 +225,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: String,
entity_id: String,
gas: &GasCounter,
scope: GetScope,
) -> Result<Option<Entity>, anyhow::Error> {
let store_key = EntityKey {
entity_type: EntityType::new(entity_type),
Expand All @@ -233,7 +234,7 @@ impl<C: Blockchain> HostExports<C> {
};
self.check_entity_type_access(&store_key.entity_type)?;

let result = state.entity_cache.get(&store_key)?;
let result = state.entity_cache.get(&store_key, scope)?;
gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?;

Ok(result)
Expand Down
135 changes: 83 additions & 52 deletions runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Instant;

use anyhow::anyhow;
use anyhow::Error;
use graph::components::store::GetScope;
use graph::slog::SendSyncRefUnwindSafeKV;
use never::Never;
use semver::Version;
Expand Down Expand Up @@ -535,6 +536,13 @@ impl<C: Blockchain> WasmInstance<C> {
id,
field
);
link!(
"store.get_in_block",
store_get_in_block,
"host_export_store_get_in_block",
entity,
id
);
link!(
"store.set",
store_set,
Expand Down Expand Up @@ -910,6 +918,71 @@ impl<C: Blockchain> WasmInstanceContext<C> {
experimental_features,
})
}

fn store_get_scoped(
&mut self,
gas: &GasCounter,
entity_ptr: AscPtr<AscString>,
id_ptr: AscPtr<AscString>,
scope: GetScope,
) -> Result<AscPtr<AscEntity>, HostExportError> {
let _timer = self
.host_metrics
.cheap_clone()
.time_host_fn_execution_region("store_get");

let entity_type: String = asc_get(self, entity_ptr, gas)?;
let id: String = asc_get(self, id_ptr, gas)?;
let entity_option = self.ctx.host_exports.store_get(
&mut self.ctx.state,
entity_type.clone(),
id.clone(),
gas,
scope,
)?;

if self.ctx.instrument {
debug!(self.ctx.logger, "store_get";
"type" => &entity_type,
"id" => &id,
"found" => entity_option.is_some());
}

let ret = match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
asc_new(self, &entity.sorted(), gas)?
}
None => match &self.ctx.debug_fork {
Some(fork) => {
let entity_option = fork.fetch(entity_type, id).map_err(|e| {
HostExportError::Unknown(anyhow!(
"store_get: failed to fetch entity from the debug fork: {}",
e
))
})?;
match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
let entity = asc_new(self, &entity.sorted(), gas)?;
self.store_set(gas, entity_ptr, id_ptr, entity)?;
entity
}
None => AscPtr::null(),
}
}
None => AscPtr::null(),
},
};

Ok(ret)
}
}

// Implementation of externals.
Expand Down Expand Up @@ -1012,59 +1085,17 @@ impl<C: Blockchain> WasmInstanceContext<C> {
entity_ptr: AscPtr<AscString>,
id_ptr: AscPtr<AscString>,
) -> Result<AscPtr<AscEntity>, HostExportError> {
let _timer = self
.host_metrics
.cheap_clone()
.time_host_fn_execution_region("store_get");

let entity_type: String = asc_get(self, entity_ptr, gas)?;
let id: String = asc_get(self, id_ptr, gas)?;
let entity_option = self.ctx.host_exports.store_get(
&mut self.ctx.state,
entity_type.clone(),
id.clone(),
gas,
)?;
if self.ctx.instrument {
debug!(self.ctx.logger, "store_get";
"type" => &entity_type,
"id" => &id,
"found" => entity_option.is_some());
}
let ret = match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
asc_new(self, &entity.sorted(), gas)?
}
None => match &self.ctx.debug_fork {
Some(fork) => {
let entity_option = fork.fetch(entity_type, id).map_err(|e| {
HostExportError::Unknown(anyhow!(
"store_get: failed to fetch entity from the debug fork: {}",
e
))
})?;
match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
let entity = asc_new(self, &entity.sorted(), gas)?;
self.store_set(gas, entity_ptr, id_ptr, entity)?;
entity
}
None => AscPtr::null(),
}
}
None => AscPtr::null(),
},
};
self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::Store)
}

Ok(ret)
/// function store.get_in_block(entity: string, id: string): Entity | null
pub fn store_get_in_block(
&mut self,
gas: &GasCounter,
entity_ptr: AscPtr<AscString>,
id_ptr: AscPtr<AscString>,
) -> Result<AscPtr<AscEntity>, HostExportError> {
self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::InBlock)
}

/// function store.loadRelated(entity_type: string, id: string, field: string): Array<Entity>
Expand Down
43 changes: 41 additions & 2 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use graph::blockchain::block_stream::FirehoseCursor;
use graph::components::store::{
DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, LoadRelatedRequest,
ReadStore, StoredDynamicDataSource, WritableStore,
DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, GetScope,
LoadRelatedRequest, ReadStore, StoredDynamicDataSource, WritableStore,
};
use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth};
use graph::data_source::CausalityRegion;
Expand Down Expand Up @@ -752,3 +752,42 @@ fn check_for_delete_async_related() {
assert_eq!(result, expeted_vec);
});
}

#[test]
fn scoped_get() {
run_store_test(|mut cache, _store, _deployment, _writable| async move {
// Key for an existing entity that is in the store
let key1 = EntityKey::data(WALLET.to_owned(), "1".to_owned());
let wallet1 = create_wallet_entity("1", "1", 67);

// Create a new entity that is not in the store
let wallet5 = create_wallet_entity("5", "5", 100);
let key5 = EntityKey::data(WALLET.to_owned(), "5".to_owned());
cache.set(key5.clone(), wallet5.clone()).unwrap();

// For the new entity, we can retrieve it with either scope
let act5 = cache.get(&key5, GetScope::InBlock).unwrap();
assert_eq!(Some(&wallet5), act5.as_ref());
let act5 = cache.get(&key5, GetScope::Store).unwrap();
assert_eq!(Some(&wallet5), act5.as_ref());

// For an entity in the store, we can not get it `InBlock` but with
// `Store`
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(None, act1);
let act1 = cache.get(&key1, GetScope::Store).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref());
// Even after reading from the store, the entity is not visible with
// `InBlock`
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(None, act1);
// But if it gets updated, it becomes visible with either scope
let mut wallet1 = wallet1;
wallet1.set("balance", 70);
cache.set(key1.clone(), wallet1.clone()).unwrap();
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref());
let act1 = cache.get(&key1, GetScope::Store).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref());
});
}