Skip to content

Commit

Permalink
core, graph, runtime: Add store.get_in_block
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Apr 21, 2023
1 parent 640d15d commit c99229c
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 73 deletions.
17 changes: 8 additions & 9 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::{Block, Blockchain, DataSource as _, TriggerFilter as _};
use graph::components::store::{EmptyStore, EntityKey, StoredDynamicDataSource};
use graph::components::store::{EmptyStore, EntityKey, GetScope, StoredDynamicDataSource};
use graph::components::{
store::ModificationsAndCache,
subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing},
Expand Down Expand Up @@ -1034,14 +1034,13 @@ async fn update_proof_of_indexing(
};

// Grab the current digest attribute on this entity
let prev_poi =
entity_cache
.get(&entity_key)
.map_err(Error::from)?
.map(|entity| match entity.get("digest") {
Some(Value::Bytes(b)) => b.clone(),
_ => panic!("Expected POI entity to have a digest and for it to be bytes"),
});
let prev_poi = entity_cache
.get(&entity_key, GetScope::Store)
.map_err(Error::from)?
.map(|entity| match entity.get("digest") {
Some(Value::Bytes(b)) => b.clone(),
_ => panic!("Expected POI entity to have a digest and for it to be bytes"),
});

// Finish the POI stream, getting the new POI value.
let updated_proof_of_indexing = stream.pause(prev_poi.as_deref());
Expand Down
33 changes: 26 additions & 7 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use crate::util::lfu_cache::LfuCache;

use super::{DerivedEntityQuery, EntityType, LoadRelatedRequest};

/// The scope in which the `EntityCache` should perform a `get` operation
pub enum GetScope {
/// Get from all previously stored entities in the store
Store,
/// Get from the entities that have been stored during this block
InBlock,
}

/// A cache for entities from the store that provides the basic functionality
/// needed for the store interactions in the host exports. This struct tracks
/// how entities are modified, and caches all entities looked up from the
Expand Down Expand Up @@ -98,18 +106,29 @@ impl EntityCache {
self.handler_updates.clear();
}

pub fn get(&mut self, eref: &EntityKey) -> Result<Option<Entity>, s::QueryExecutionError> {
pub fn get(
&mut self,
key: &EntityKey,
scope: GetScope,
) -> Result<Option<Entity>, s::QueryExecutionError> {
// Get the current entity, apply any updates from `updates`, then
// from `handler_updates`.
let mut entity = self.current.get_entity(&*self.store, eref)?;
let mut entity = match scope {
GetScope::Store => self.current.get_entity(&*self.store, key)?,
GetScope::InBlock => None,
};

// Always test the cache consistency in debug mode.
debug_assert!(entity == self.store.get(eref).unwrap());
// Always test the cache consistency in debug mode. The test only
// makes sense when we were actually asked to read from the store
debug_assert!(match scope {
GetScope::Store => entity == self.store.get(key).unwrap(),
GetScope::InBlock => true,
});

if let Some(op) = self.updates.get(eref).cloned() {
if let Some(op) = self.updates.get(key).cloned() {
entity = op.apply_to(entity)
}
if let Some(op) = self.handler_updates.get(eref).cloned() {
if let Some(op) = self.handler_updates.get(key).cloned() {
entity = op.apply_to(entity)
}
Ok(entity)
Expand Down Expand Up @@ -183,7 +202,7 @@ impl EntityCache {
// lookup in the database and check again with an entity that merges
// the existing entity with the changes
if !is_valid {
let entity = self.get(&key)?.ok_or_else(|| {
let entity = self.get(&key, GetScope::Store)?.ok_or_else(|| {
anyhow!(
"Failed to read entity {}[{}] back from cache",
key.entity_type,
Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod entity_cache;
mod err;
mod traits;

pub use entity_cache::{EntityCache, ModificationsAndCache};
pub use entity_cache::{EntityCache, GetScope, ModificationsAndCache};

use diesel::types::{FromSql, ToSql};
pub use err::StoreError;
Expand Down
5 changes: 3 additions & 2 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use wasmtime::Trap;
use web3::types::H160;

use graph::blockchain::Blockchain;
use graph::components::store::{EnsLookup, LoadRelatedRequest};
use graph::components::store::{EnsLookup, GetScope, LoadRelatedRequest};
use graph::components::store::{EntityKey, EntityType};
use graph::components::subgraph::{
PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing,
Expand Down Expand Up @@ -225,6 +225,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: String,
entity_id: String,
gas: &GasCounter,
scope: GetScope,
) -> Result<Option<Entity>, anyhow::Error> {
let store_key = EntityKey {
entity_type: EntityType::new(entity_type),
Expand All @@ -233,7 +234,7 @@ impl<C: Blockchain> HostExports<C> {
};
self.check_entity_type_access(&store_key.entity_type)?;

let result = state.entity_cache.get(&store_key)?;
let result = state.entity_cache.get(&store_key, scope)?;
gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?;

Ok(result)
Expand Down
135 changes: 83 additions & 52 deletions runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Instant;

use anyhow::anyhow;
use anyhow::Error;
use graph::components::store::GetScope;
use graph::slog::SendSyncRefUnwindSafeKV;
use never::Never;
use semver::Version;
Expand Down Expand Up @@ -535,6 +536,13 @@ impl<C: Blockchain> WasmInstance<C> {
id,
field
);
link!(
"store.get_in_block",
store_get_in_block,
"host_export_store_get_in_block",
entity,
id
);
link!(
"store.set",
store_set,
Expand Down Expand Up @@ -910,6 +918,71 @@ impl<C: Blockchain> WasmInstanceContext<C> {
experimental_features,
})
}

fn store_get_scoped(
&mut self,
gas: &GasCounter,
entity_ptr: AscPtr<AscString>,
id_ptr: AscPtr<AscString>,
scope: GetScope,
) -> Result<AscPtr<AscEntity>, HostExportError> {
let _timer = self
.host_metrics
.cheap_clone()
.time_host_fn_execution_region("store_get");

let entity_type: String = asc_get(self, entity_ptr, gas)?;
let id: String = asc_get(self, id_ptr, gas)?;
let entity_option = self.ctx.host_exports.store_get(
&mut self.ctx.state,
entity_type.clone(),
id.clone(),
gas,
scope,
)?;

if self.ctx.instrument {
debug!(self.ctx.logger, "store_get";
"type" => &entity_type,
"id" => &id,
"found" => entity_option.is_some());
}

let ret = match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
asc_new(self, &entity.sorted(), gas)?
}
None => match &self.ctx.debug_fork {
Some(fork) => {
let entity_option = fork.fetch(entity_type, id).map_err(|e| {
HostExportError::Unknown(anyhow!(
"store_get: failed to fetch entity from the debug fork: {}",
e
))
})?;
match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
let entity = asc_new(self, &entity.sorted(), gas)?;
self.store_set(gas, entity_ptr, id_ptr, entity)?;
entity
}
None => AscPtr::null(),
}
}
None => AscPtr::null(),
},
};

Ok(ret)
}
}

// Implementation of externals.
Expand Down Expand Up @@ -1012,59 +1085,17 @@ impl<C: Blockchain> WasmInstanceContext<C> {
entity_ptr: AscPtr<AscString>,
id_ptr: AscPtr<AscString>,
) -> Result<AscPtr<AscEntity>, HostExportError> {
let _timer = self
.host_metrics
.cheap_clone()
.time_host_fn_execution_region("store_get");

let entity_type: String = asc_get(self, entity_ptr, gas)?;
let id: String = asc_get(self, id_ptr, gas)?;
let entity_option = self.ctx.host_exports.store_get(
&mut self.ctx.state,
entity_type.clone(),
id.clone(),
gas,
)?;
if self.ctx.instrument {
debug!(self.ctx.logger, "store_get";
"type" => &entity_type,
"id" => &id,
"found" => entity_option.is_some());
}
let ret = match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
asc_new(self, &entity.sorted(), gas)?
}
None => match &self.ctx.debug_fork {
Some(fork) => {
let entity_option = fork.fetch(entity_type, id).map_err(|e| {
HostExportError::Unknown(anyhow!(
"store_get: failed to fetch entity from the debug fork: {}",
e
))
})?;
match entity_option {
Some(entity) => {
let _section = self
.host_metrics
.stopwatch
.start_section("store_get_asc_new");
let entity = asc_new(self, &entity.sorted(), gas)?;
self.store_set(gas, entity_ptr, id_ptr, entity)?;
entity
}
None => AscPtr::null(),
}
}
None => AscPtr::null(),
},
};
self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::Store)
}

Ok(ret)
/// function store.get_in_block(entity: string, id: string): Entity | null
pub fn store_get_in_block(
&mut self,
gas: &GasCounter,
entity_ptr: AscPtr<AscString>,
id_ptr: AscPtr<AscString>,
) -> Result<AscPtr<AscEntity>, HostExportError> {
self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::InBlock)
}

/// function store.loadRelated(entity_type: string, id: string, field: string): Array<Entity>
Expand Down
43 changes: 41 additions & 2 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use graph::blockchain::block_stream::FirehoseCursor;
use graph::components::store::{
DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, LoadRelatedRequest,
ReadStore, StoredDynamicDataSource, WritableStore,
DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, GetScope,
LoadRelatedRequest, ReadStore, StoredDynamicDataSource, WritableStore,
};
use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth};
use graph::data_source::CausalityRegion;
Expand Down Expand Up @@ -752,3 +752,42 @@ fn check_for_delete_async_related() {
assert_eq!(result, expeted_vec);
});
}

#[test]
fn scoped_get() {
run_store_test(|mut cache, _store, _deployment, _writable| async move {
// Key for an existing entity that is in the store
let key1 = EntityKey::data(WALLET.to_owned(), "1".to_owned());
let wallet1 = create_wallet_entity("1", "1", 67);

// Create a new entity that is not in the store
let wallet5 = create_wallet_entity("5", "5", 100);
let key5 = EntityKey::data(WALLET.to_owned(), "5".to_owned());
cache.set(key5.clone(), wallet5.clone()).unwrap();

// For the new entity, we can retrieve it with either scope
let act5 = cache.get(&key5, GetScope::InBlock).unwrap();
assert_eq!(Some(&wallet5), act5.as_ref());
let act5 = cache.get(&key5, GetScope::Store).unwrap();
assert_eq!(Some(&wallet5), act5.as_ref());

// For an entity in the store, we can not get it `InBlock` but with
// `Store`
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(None, act1);
let act1 = cache.get(&key1, GetScope::Store).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref());
// Even after reading from the store, the entity is not visible with
// `InBlock`
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(None, act1);
// But if it gets updated, it becomes visible with either scope
let mut wallet1 = wallet1;
wallet1.set("balance", 70);
cache.set(key1.clone(), wallet1.clone()).unwrap();
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref());
let act1 = cache.get(&key1, GetScope::Store).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref());
});
}

0 comments on commit c99229c

Please sign in to comment.