Skip to content

Commit

Permalink
expose timestamp on meta block
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Aug 17, 2022
1 parent 649c61a commit e3c50b0
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 24 deletions.
8 changes: 8 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,14 @@ pub trait QueryStore: Send + Sync {

fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError>;

/// Returns the blocknumber as well as the timestamp. Timestamp depends on the chain block type
/// and can have multiple formats, it can also not be prevent. For now this is only available
/// for EVM chains both firehose and rpc.
fn block_number_with_timestamp(
&self,
block_hash: &BlockHash,
) -> Result<Option<(BlockNumber, Option<String>)>, StoreError>;

fn wait_stats(&self) -> Result<PoolWaitStats, StoreError>;

async fn has_deterministic_errors(&self, block: BlockNumber) -> Result<bool, StoreError>;
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
let query_res = execute_query(
query.clone(),
Some(selection_set),
resolver.block_ptr.clone(),
resolver.block_ptr.as_ref().map(Into::into).clone(),
QueryExecutionOptions {
resolver,
deadline: ENV_VARS.graphql.query_timeout.map(|t| Instant::now() + t),
Expand Down
84 changes: 70 additions & 14 deletions graphql/src/store/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,34 @@ pub struct StoreResolver {
logger: Logger,
pub(crate) store: Arc<dyn QueryStore>,
subscription_manager: Arc<dyn SubscriptionManager>,
pub(crate) block_ptr: Option<BlockPtr>,
pub(crate) block_ptr: Option<BlockPtrTs>,
deployment: DeploymentHash,
has_non_fatal_errors: bool,
error_policy: ErrorPolicy,
graphql_metrics: Arc<GraphQLMetrics>,
}

#[derive(Clone, Debug)]
pub(crate) struct BlockPtrTs {
pub ptr: BlockPtr,
pub timestamp: Option<String>,
}

impl From<BlockPtr> for BlockPtrTs {
fn from(ptr: BlockPtr) -> Self {
Self {
ptr,
timestamp: None,
}
}
}

impl From<&BlockPtrTs> for BlockPtr {
fn from(ptr: &BlockPtrTs) -> Self {
ptr.ptr.cheap_clone()
}
}

impl CheapClone for StoreResolver {}

impl StoreResolver {
Expand Down Expand Up @@ -79,7 +100,7 @@ impl StoreResolver {
let block_ptr = Self::locate_block(store_clone.as_ref(), bc, state).await?;

let has_non_fatal_errors = store
.has_deterministic_errors(block_ptr.block_number())
.has_deterministic_errors(block_ptr.ptr.block_number())
.await?;

let resolver = StoreResolver {
Expand All @@ -98,15 +119,16 @@ impl StoreResolver {
pub fn block_number(&self) -> BlockNumber {
self.block_ptr
.as_ref()
.map(|ptr| ptr.number as BlockNumber)
.map(|ptr| ptr.ptr.number as BlockNumber)
.unwrap_or(BLOCK_NUMBER_MAX)
}

/// locate_block returns the block pointer and it's timestamp when available.
async fn locate_block(
store: &dyn QueryStore,
bc: BlockConstraint,
state: &DeploymentState,
) -> Result<BlockPtr, QueryExecutionError> {
) -> Result<BlockPtrTs, QueryExecutionError> {
fn block_queryable(
state: &DeploymentState,
block: BlockNumber,
Expand All @@ -116,23 +138,39 @@ impl StoreResolver {
.map_err(|msg| QueryExecutionError::ValueParseError("block.number".to_owned(), msg))
}

fn get_block_ts(
store: &dyn QueryStore,
ptr: &BlockPtr,
) -> Result<Option<String>, QueryExecutionError> {
match store
.block_number_with_timestamp(&ptr.hash)
.map_err(Into::<QueryExecutionError>::into)?
{
Some((_, Some(ts))) => Ok(Some(ts)),
_ => Ok(None),
}
}

match bc {
BlockConstraint::Hash(hash) => {
let ptr = store
.block_number(&hash)
.block_number_with_timestamp(&hash)
.map_err(Into::into)
.and_then(|number| {
number
.and_then(|result| {
result
.ok_or_else(|| {
QueryExecutionError::ValueParseError(
"block.hash".to_owned(),
"no block with that hash found".to_owned(),
)
})
.map(|number| BlockPtr::new(hash, number))
.map(|(number, ts)| BlockPtrTs {
ptr: BlockPtr::new(hash, number),
timestamp: ts,
})
})?;

block_queryable(state, ptr.number)?;
block_queryable(state, ptr.ptr.number)?;
Ok(ptr)
}
BlockConstraint::Number(number) => {
Expand All @@ -143,7 +181,7 @@ impl StoreResolver {
// always return an all zeroes hash when users specify
// a block number
// See 7a7b9708-adb7-4fc2-acec-88680cb07ec1
Ok(BlockPtr::from((web3::types::H256::zero(), number as u64)))
Ok(BlockPtr::from((web3::types::H256::zero(), number as u64)).into())
}
BlockConstraint::Min(min) => {
let ptr = state.latest_block.cheap_clone();
Expand All @@ -157,9 +195,18 @@ impl StoreResolver {
),
));
}
Ok(ptr)
let timestamp = get_block_ts(store, &state.latest_block)?;

Ok(BlockPtrTs { ptr, timestamp })
}
BlockConstraint::Latest => {
let timestamp = get_block_ts(store, &state.latest_block)?;

Ok(BlockPtrTs {
ptr: state.latest_block.cheap_clone(),
timestamp,
})
}
BlockConstraint::Latest => Ok(state.latest_block.cheap_clone()),
}
}

Expand All @@ -180,7 +227,7 @@ impl StoreResolver {
// locate_block indicates that we do not have a block hash
// by setting the hash to `zero`
// See 7a7b9708-adb7-4fc2-acec-88680cb07ec1
let hash_h256 = ptr.hash_as_h256();
let hash_h256 = ptr.ptr.hash_as_h256();
if hash_h256 == web3::types::H256::zero() {
None
} else {
Expand All @@ -191,12 +238,21 @@ impl StoreResolver {
let number = self
.block_ptr
.as_ref()
.map(|ptr| r::Value::Int((ptr.number as i32).into()))
.map(|ptr| r::Value::Int((ptr.ptr.number as i32).into()))
.unwrap_or(r::Value::Null);

let timestamp = self.block_ptr.as_ref().map(|ptr| {
ptr.timestamp
.clone()
.map(|ts| r::Value::String(ts))
.unwrap_or(r::Value::Null)
});

let mut map = BTreeMap::new();
let block = object! {
hash: hash,
number: number,
timestamp: timestamp,
__typename: BLOCK_FIELD_TYPE
};
map.insert("prefetch:block".into(), r::Value::List(vec![block]));
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async fn execute_subscription_event(
Err(e) => return Arc::new(e.into()),
};

let block_ptr = resolver.block_ptr.clone();
let block_ptr = resolver.block_ptr.as_ref().map(Into::into);

// Create a fresh execution context with deadline.
let ctx = Arc::new(ExecutionContext {
Expand Down
6 changes: 3 additions & 3 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,14 +606,14 @@ mod data {
b::table
.select((b::number, sql(TIMESTAMP_QUERY)))
.filter(b::hash.eq(format!("{:x}", hash)))
.first::<(i64, String)>(conn)
.first::<(i64, Option<String>)>(conn)
.optional()?
}
Storage::Private(Schema { blocks, .. }) => blocks
.table()
.select((blocks.number(), sql(TIMESTAMP_QUERY)))
.filter(blocks.hash().eq(hash.as_slice()))
.first::<(i64, String)>(conn)
.first::<(i64, Option<String>)>(conn)
.optional()?,
};

Expand All @@ -622,7 +622,7 @@ mod data {
Some((number, ts)) => {
let number = BlockNumber::try_from(number)
.map_err(|e| StoreError::QueryExecutionError(e.to_string()))?;
Ok(Some((number, Some(ts))))
Ok(Some((number, ts)))
}
}
}
Expand Down
15 changes: 11 additions & 4 deletions store/postgres/src/query_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ impl QueryStoreTrait for QueryStore {
async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
self.store.block_ptr(self.site.cheap_clone()).await
}

fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError> {
fn block_number_with_timestamp(
&self,
block_hash: &BlockHash,
) -> Result<Option<(BlockNumber, Option<String>)>, StoreError> {
// We should also really check that the block with the given hash is
// on the chain starting at the subgraph's current head. That check is
// very expensive though with the data structures we have currently
Expand All @@ -68,9 +70,9 @@ impl QueryStoreTrait for QueryStore {
let subgraph_network = self.network_name();
self.chain_store
.block_number(block_hash)?
.map(|(network_name, number, _timestamp)| {
.map(|(network_name, number, timestamp)| {
if network_name == subgraph_network {
Ok(number)
Ok((number, timestamp))
} else {
Err(StoreError::QueryExecutionError(format!(
"subgraph {} belongs to network {} but block {:x} belongs to network {}",
Expand All @@ -81,6 +83,11 @@ impl QueryStoreTrait for QueryStore {
.transpose()
}

fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError> {
self.block_number_with_timestamp(block_hash)
.map(|opt| opt.map(|(number, _)| number))
}

fn wait_stats(&self) -> Result<PoolWaitStats, StoreError> {
self.store.wait_stats(self.replica_id)
}
Expand Down
31 changes: 31 additions & 0 deletions store/postgres/tests/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,37 @@ fn parse_timestamp() {
})
}

#[test]
/// checks if retrieving the timestamp from the data blob works.
/// on ethereum, the block has timestamp as U256 so it will always have a value
fn parse_null_timestamp() {
run_test(|store, _, _| async move {
use block_store::*;
// The test subgraph is at block 2. Since we don't ever delete
// the genesis block, the only block eligible for cleanup is BLOCK_ONE
// and the first retained block is block 2.
block_store::set_chain(
vec![
&*GENESIS_BLOCK,
&*BLOCK_ONE,
&*BLOCK_TWO,
&*BLOCK_THREE_NO_TIMESTAMP,
],
NETWORK_NAME,
);
let chain_store = store
.block_store()
.chain_store(NETWORK_NAME)
.expect("fake chain store");

let (_network, number, timestamp) = chain_store
.block_number(&BLOCK_THREE_NO_TIMESTAMP.block_hash())
.expect("block_number to return correct number and timestamp")
.unwrap();
assert_eq!(number, 3);
assert_eq!(true, timestamp.is_none());
})
}
#[test]
fn reorg_tracking() {
async fn update_john(
Expand Down
21 changes: 20 additions & 1 deletion store/test-store/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ lazy_static! {
pub static ref BLOCK_THREE: FakeBlock = BLOCK_TWO.make_child("7347afe69254df06729e123610b00b8b11f15cfae3241f9366fb113aec07489c", None);
pub static ref BLOCK_THREE_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(3, "fa9ebe3f74de4c56908b49f5c4044e85825f7350f3fa08a19151de82a82a7313");
pub static ref BLOCK_THREE_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", Some(U256::from(1657712166)));
// This block is special and serializes in a slightly different way, this is needed to simulate non-ethereum behaviour at the store level. If you're not sure
// what you are doing, don't use this block for other tests.
pub static ref BLOCK_THREE_NO_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", None);
pub static ref BLOCK_FOUR: FakeBlock = BLOCK_THREE.make_child("7cce080f5a49c2997a6cc65fc1cee9910fd8fc3721b7010c0b5d0873e2ac785e", None);
pub static ref BLOCK_FIVE: FakeBlock = BLOCK_FOUR.make_child("7b0ea919e258eb2b119eb32de56b85d12d50ac6a9f7c5909f843d6172c8ba196", None);
pub static ref BLOCK_SIX_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(6, "6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b");
Expand Down Expand Up @@ -112,7 +115,23 @@ impl Block for FakeBlock {
}

fn data(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self.as_ethereum_block())
let mut value: serde_json::Value = serde_json::to_value(self.as_ethereum_block())?;
if !self.eq(&BLOCK_THREE_NO_TIMESTAMP) {
return Ok(value);
};

// Remove the timestamp for block BLOCK_THREE_NO_TIMESTAMP in order to simulate the non EVM behaviour
// In these cases timestamp is not there at all but LightEthereumBlock uses U256 as timestamp so it
// can never be null and therefore impossible to test without manipulating the JSON blob directly.
if let serde_json::Value::Object(ref mut map) = value {
map.entry("block").and_modify(|ref mut block| {
if let serde_json::Value::Object(ref mut block) = block {
block.remove_entry("timestamp");
}
});
};

Ok(value)
}
}

Expand Down

0 comments on commit e3c50b0

Please sign in to comment.