From 07fb4fcf566c41a04fae9db31313e684f5a20887 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo <filipe@azevedo.io> Date: Tue, 19 Jul 2022 12:55:50 +0100 Subject: [PATCH] expose timestamp on meta block --- graph/src/components/store/traits.rs | 8 +++ graphql/src/runner.rs | 2 +- graphql/src/store/resolver.rs | 84 +++++++++++++++++++++++----- graphql/src/subscription/mod.rs | 2 +- store/postgres/src/chain_store.rs | 6 +- store/postgres/src/query_store.rs | 15 +++-- store/postgres/tests/store.rs | 31 ++++++++++ store/test-store/src/block_store.rs | 21 ++++++- 8 files changed, 145 insertions(+), 24 deletions(-) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index f3939bfa624..99cebdd45a2 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -415,6 +415,14 @@ pub trait QueryStore: Send + Sync { fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError>; + /// Returns the blocknumber as well as the timestamp. Timestamp depends on the chain block type + /// and can have multiple formats, it can also not be prevent. For now this is only available + /// for EVM chains both firehose and rpc. + fn block_number_with_timestamp( + &self, + block_hash: &BlockHash, + ) -> Result<Option<(BlockNumber, Option<String>)>, StoreError>; + fn wait_stats(&self) -> Result<PoolWaitStats, StoreError>; async fn has_deterministic_errors(&self, block: BlockNumber) -> Result<bool, StoreError>; diff --git a/graphql/src/runner.rs b/graphql/src/runner.rs index 6dce3b74c60..968c4263d00 100644 --- a/graphql/src/runner.rs +++ b/graphql/src/runner.rs @@ -161,7 +161,7 @@ where let query_res = execute_query( query.clone(), Some(selection_set), - resolver.block_ptr.clone(), + resolver.block_ptr.as_ref().map(Into::into).clone(), QueryExecutionOptions { resolver, deadline: ENV_VARS.graphql.query_timeout.map(|t| Instant::now() + t), diff --git a/graphql/src/store/resolver.rs b/graphql/src/store/resolver.rs index 6769a9212e4..69576cd9641 100644 --- a/graphql/src/store/resolver.rs +++ b/graphql/src/store/resolver.rs @@ -26,13 +26,34 @@ pub struct StoreResolver { logger: Logger, pub(crate) store: Arc<dyn QueryStore>, subscription_manager: Arc<dyn SubscriptionManager>, - pub(crate) block_ptr: Option<BlockPtr>, + pub(crate) block_ptr: Option<BlockPtrTs>, deployment: DeploymentHash, has_non_fatal_errors: bool, error_policy: ErrorPolicy, graphql_metrics: Arc<GraphQLMetrics>, } +#[derive(Clone, Debug)] +pub(crate) struct BlockPtrTs { + pub ptr: BlockPtr, + pub timestamp: Option<String>, +} + +impl From<BlockPtr> for BlockPtrTs { + fn from(ptr: BlockPtr) -> Self { + Self { + ptr, + timestamp: None, + } + } +} + +impl From<&BlockPtrTs> for BlockPtr { + fn from(ptr: &BlockPtrTs) -> Self { + ptr.ptr.cheap_clone() + } +} + impl CheapClone for StoreResolver {} impl StoreResolver { @@ -80,7 +101,7 @@ impl StoreResolver { let block_ptr = Self::locate_block(store_clone.as_ref(), bc, state).await?; let has_non_fatal_errors = store - .has_deterministic_errors(block_ptr.block_number()) + .has_deterministic_errors(block_ptr.ptr.block_number()) .await?; let resolver = StoreResolver { @@ -99,15 +120,16 @@ impl StoreResolver { pub fn block_number(&self) -> BlockNumber { self.block_ptr .as_ref() - .map(|ptr| ptr.number as BlockNumber) + .map(|ptr| ptr.ptr.number as BlockNumber) .unwrap_or(BLOCK_NUMBER_MAX) } + /// locate_block returns the block pointer and it's timestamp when available. async fn locate_block( store: &dyn QueryStore, bc: BlockConstraint, state: &DeploymentState, - ) -> Result<BlockPtr, QueryExecutionError> { + ) -> Result<BlockPtrTs, QueryExecutionError> { fn block_queryable( state: &DeploymentState, block: BlockNumber, @@ -117,23 +139,39 @@ impl StoreResolver { .map_err(|msg| QueryExecutionError::ValueParseError("block.number".to_owned(), msg)) } + fn get_block_ts( + store: &dyn QueryStore, + ptr: &BlockPtr, + ) -> Result<Option<String>, QueryExecutionError> { + match store + .block_number_with_timestamp(&ptr.hash) + .map_err(Into::<QueryExecutionError>::into)? + { + Some((_, Some(ts))) => Ok(Some(ts)), + _ => Ok(None), + } + } + match bc { BlockConstraint::Hash(hash) => { let ptr = store - .block_number(&hash) + .block_number_with_timestamp(&hash) .map_err(Into::into) - .and_then(|number| { - number + .and_then(|result| { + result .ok_or_else(|| { QueryExecutionError::ValueParseError( "block.hash".to_owned(), "no block with that hash found".to_owned(), ) }) - .map(|number| BlockPtr::new(hash, number)) + .map(|(number, ts)| BlockPtrTs { + ptr: BlockPtr::new(hash, number), + timestamp: ts, + }) })?; - block_queryable(state, ptr.number)?; + block_queryable(state, ptr.ptr.number)?; Ok(ptr) } BlockConstraint::Number(number) => { @@ -144,7 +182,7 @@ impl StoreResolver { // always return an all zeroes hash when users specify // a block number // See 7a7b9708-adb7-4fc2-acec-88680cb07ec1 - Ok(BlockPtr::from((web3::types::H256::zero(), number as u64))) + Ok(BlockPtr::from((web3::types::H256::zero(), number as u64)).into()) } BlockConstraint::Min(min) => { let ptr = state.latest_block.cheap_clone(); @@ -158,9 +196,18 @@ impl StoreResolver { ), )); } - Ok(ptr) + let timestamp = get_block_ts(store, &state.latest_block)?; + + Ok(BlockPtrTs { ptr, timestamp }) + } + BlockConstraint::Latest => { + let timestamp = get_block_ts(store, &state.latest_block)?; + + Ok(BlockPtrTs { + ptr: state.latest_block.cheap_clone(), + timestamp, + }) } - BlockConstraint::Latest => Ok(state.latest_block.cheap_clone()), } } @@ -181,7 +228,7 @@ impl StoreResolver { // locate_block indicates that we do not have a block hash // by setting the hash to `zero` // See 7a7b9708-adb7-4fc2-acec-88680cb07ec1 - let hash_h256 = ptr.hash_as_h256(); + let hash_h256 = ptr.ptr.hash_as_h256(); if hash_h256 == web3::types::H256::zero() { None } else { @@ -192,12 +239,21 @@ impl StoreResolver { let number = self .block_ptr .as_ref() - .map(|ptr| r::Value::Int((ptr.number as i32).into())) + .map(|ptr| r::Value::Int((ptr.ptr.number as i32).into())) .unwrap_or(r::Value::Null); + + let timestamp = self.block_ptr.as_ref().map(|ptr| { + ptr.timestamp + .clone() + .map(|ts| r::Value::String(ts)) + .unwrap_or(r::Value::Null) + }); + let mut map = BTreeMap::new(); let block = object! { hash: hash, number: number, + timestamp: timestamp, __typename: BLOCK_FIELD_TYPE }; map.insert("prefetch:block".into(), r::Value::List(vec![block])); diff --git a/graphql/src/subscription/mod.rs b/graphql/src/subscription/mod.rs index a6d405dbc0c..c07bf27d209 100644 --- a/graphql/src/subscription/mod.rs +++ b/graphql/src/subscription/mod.rs @@ -211,7 +211,7 @@ async fn execute_subscription_event( Err(e) => return Arc::new(e.into()), }; - let block_ptr = resolver.block_ptr.clone(); + let block_ptr = resolver.block_ptr.as_ref().map(Into::into); // Create a fresh execution context with deadline. let ctx = Arc::new(ExecutionContext { diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index a41eaf85da3..b14a1983f3d 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -606,14 +606,14 @@ mod data { b::table .select((b::number, sql(TIMESTAMP_QUERY))) .filter(b::hash.eq(format!("{:x}", hash))) - .first::<(i64, String)>(conn) + .first::<(i64, Option<String>)>(conn) .optional()? } Storage::Private(Schema { blocks, .. }) => blocks .table() .select((blocks.number(), sql(TIMESTAMP_QUERY))) .filter(blocks.hash().eq(hash.as_slice())) - .first::<(i64, String)>(conn) + .first::<(i64, Option<String>)>(conn) .optional()?, }; @@ -622,7 +622,7 @@ mod data { Some((number, ts)) => { let number = BlockNumber::try_from(number) .map_err(|e| StoreError::QueryExecutionError(e.to_string()))?; - Ok(Some((number, Some(ts)))) + Ok(Some((number, ts))) } } } diff --git a/store/postgres/src/query_store.rs b/store/postgres/src/query_store.rs index 42a0bb855dd..82d79cca5a7 100644 --- a/store/postgres/src/query_store.rs +++ b/store/postgres/src/query_store.rs @@ -60,8 +60,10 @@ impl QueryStoreTrait for QueryStore { async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> { self.store.block_ptr(self.site.cheap_clone()).await } - - fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError> { + fn block_number_with_timestamp( + &self, + block_hash: &BlockHash, + ) -> Result<Option<(BlockNumber, Option<String>)>, StoreError> { // We should also really check that the block with the given hash is // on the chain starting at the subgraph's current head. That check is // very expensive though with the data structures we have currently @@ -71,9 +73,9 @@ impl QueryStoreTrait for QueryStore { let subgraph_network = self.network_name(); self.chain_store .block_number(block_hash)? - .map(|(network_name, number, _timestamp)| { + .map(|(network_name, number, timestamp)| { if network_name == subgraph_network { - Ok(number) + Ok((number, timestamp)) } else { Err(StoreError::QueryExecutionError(format!( "subgraph {} belongs to network {} but block {:x} belongs to network {}", @@ -84,6 +86,11 @@ impl QueryStoreTrait for QueryStore { .transpose() } + fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError> { + self.block_number_with_timestamp(block_hash) + .map(|opt| opt.map(|(number, _)| number)) + } + fn wait_stats(&self) -> Result<PoolWaitStats, StoreError> { self.store.wait_stats(self.replica_id) } diff --git a/store/postgres/tests/store.rs b/store/postgres/tests/store.rs index e30a1055976..ba27bf24fd8 100644 --- a/store/postgres/tests/store.rs +++ b/store/postgres/tests/store.rs @@ -2026,6 +2026,37 @@ fn parse_timestamp() { }) } +#[test] +/// checks if retrieving the timestamp from the data blob works. +/// on ethereum, the block has timestamp as U256 so it will always have a value +fn parse_null_timestamp() { + run_test(|store, _, _| async move { + use block_store::*; + // The test subgraph is at block 2. Since we don't ever delete + // the genesis block, the only block eligible for cleanup is BLOCK_ONE + // and the first retained block is block 2. + block_store::set_chain( + vec![ + &*GENESIS_BLOCK, + &*BLOCK_ONE, + &*BLOCK_TWO, + &*BLOCK_THREE_NO_TIMESTAMP, + ], + NETWORK_NAME, + ); + let chain_store = store + .block_store() + .chain_store(NETWORK_NAME) + .expect("fake chain store"); + + let (_network, number, timestamp) = chain_store + .block_number(&BLOCK_THREE_NO_TIMESTAMP.block_hash()) + .expect("block_number to return correct number and timestamp") + .unwrap(); + assert_eq!(number, 3); + assert_eq!(true, timestamp.is_none()); + }) +} #[test] fn reorg_tracking() { async fn update_john( diff --git a/store/test-store/src/block_store.rs b/store/test-store/src/block_store.rs index 54c613535bc..dafb2aebed5 100644 --- a/store/test-store/src/block_store.rs +++ b/store/test-store/src/block_store.rs @@ -33,6 +33,9 @@ lazy_static! { pub static ref BLOCK_THREE: FakeBlock = BLOCK_TWO.make_child("7347afe69254df06729e123610b00b8b11f15cfae3241f9366fb113aec07489c", None); pub static ref BLOCK_THREE_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(3, "fa9ebe3f74de4c56908b49f5c4044e85825f7350f3fa08a19151de82a82a7313"); pub static ref BLOCK_THREE_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", Some(U256::from(1657712166))); + // This block is special and serializes in a slightly different way, this is needed to simulate non-ethereum behaviour at the store level. If you're not sure + // what you are doing, don't use this block for other tests. + pub static ref BLOCK_THREE_NO_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", None); pub static ref BLOCK_FOUR: FakeBlock = BLOCK_THREE.make_child("7cce080f5a49c2997a6cc65fc1cee9910fd8fc3721b7010c0b5d0873e2ac785e", None); pub static ref BLOCK_FIVE: FakeBlock = BLOCK_FOUR.make_child("7b0ea919e258eb2b119eb32de56b85d12d50ac6a9f7c5909f843d6172c8ba196", None); pub static ref BLOCK_SIX_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(6, "6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b"); @@ -112,7 +115,23 @@ impl Block for FakeBlock { } fn data(&self) -> Result<serde_json::Value, serde_json::Error> { - serde_json::to_value(self.as_ethereum_block()) + let mut value: serde_json::Value = serde_json::to_value(self.as_ethereum_block())?; + if !self.eq(&BLOCK_THREE_NO_TIMESTAMP) { + return Ok(value); + }; + + // Remove the timestamp for block BLOCK_THREE_NO_TIMESTAMP in order to simulate the non EVM behaviour + // In these cases timestamp is not there at all but LightEthereumBlock uses U256 as timestamp so it + // can never be null and therefore impossible to test without manipulating the JSON blob directly. + if let serde_json::Value::Object(ref mut map) = value { + map.entry("block").and_modify(|ref mut block| { + if let serde_json::Value::Object(ref mut block) = block { + block.remove_entry("timestamp"); + } + }); + }; + + Ok(value) } }