Skip to content

Commit

Permalink
feat: add tps metric to contracts table (#2468)
Browse files Browse the repository at this point in the history
* feat: add tps metric to contracts table

* set head

* refactor: tps set head

* wip

* tps

* fmt

* indexer updates subscription

* fmt

* naming refactor

* wild card felt zero

* cleamn

* fetch contract firsdt

* handle m,uiltiple contracts

* clean

* torii client
  • Loading branch information
Larkooo authored Oct 8, 2024
1 parent 18bddac commit 771639c
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 32 deletions.
16 changes: 15 additions & 1 deletion crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use starknet::core::types::Felt;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming};
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, Query};
Expand Down Expand Up @@ -156,6 +156,7 @@ impl Client {
Ok(())
}

/// A direct stream to grpc subscribe starknet events
pub async fn on_starknet_event(
&self,
keys: Option<KeysClause>,
Expand All @@ -164,4 +165,17 @@ impl Client {
let stream = grpc_client.subscribe_events(keys).await?;
Ok(stream)
}

/// Subscribe to indexer updates for a specific contract address.
/// If no contract address is provided, it will subscribe to updates for world contract.
pub async fn on_indexer_updated(
&self,
contract_address: Option<Felt>,
) -> Result<IndexerUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client
.subscribe_indexer(contract_address.unwrap_or(self.world_reader.address))
.await?;
Ok(stream)
}
}
28 changes: 24 additions & 4 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// use the start block provided by user if head is 0
let (head, _, _) = self.db.head().await?;
if head == 0 {
self.db.set_head(self.config.start_block)?;
self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?;
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}
Expand Down Expand Up @@ -389,6 +389,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

let timestamp = data.pending_block.timestamp;

let mut world_txns_count = 0;
for t in data.pending_block.transactions {
let transaction_hash = t.transaction.transaction_hash();
if let Some(tx) = last_pending_block_tx_cursor {
Expand All @@ -409,7 +410,14 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt.");
self.db.set_head(data.block_number - 1)?;
self.db
.set_head(
data.block_number - 1,
timestamp,
world_txns_count,
self.world.address,
)
.await?;
if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx))?;
}
Expand All @@ -430,6 +438,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}
}
Ok(true) => {
world_txns_count += 1;
last_pending_block_world_tx = Some(*transaction_hash);
last_pending_block_tx = Some(*transaction_hash);
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction.");
Expand All @@ -446,7 +455,9 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(data.block_number - 1)?;
self.db
.set_head(data.block_number - 1, timestamp, world_txns_count, self.world.address)
.await?;

if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx))?;
Expand All @@ -466,6 +477,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<EngineHead> {
// Process all transactions
let mut last_block = 0;
let transactions_count = data.transactions.len();
for ((block_number, transaction_hash), events) in data.transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
Expand Down Expand Up @@ -498,7 +510,15 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// Process parallelized events
self.process_tasks().await?;

self.db.set_head(data.latest_block_number)?;
let last_block_timestamp = self.get_block_timestamp(data.latest_block_number).await?;
self.db
.set_head(
data.latest_block_number,
last_block_timestamp,
transactions_count as u64,
self.world.address,
)
.await?;
self.db.set_last_pending_block_world_tx(None)?;
self.db.set_last_pending_block_tx(None)?;

Expand Down
44 changes: 42 additions & 2 deletions crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tracing::{debug, error};

use crate::simple_broker::SimpleBroker;
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted,
EventMessage as EventMessageUpdated, Model as ModelRegistered,
};

pub(crate) const LOG_TARGET: &str = "torii_core::executor";
Expand All @@ -31,6 +31,7 @@ pub enum Argument {

#[derive(Debug, Clone)]
pub enum BrokerMessage {
SetHead(ContractUpdated),
ModelRegistered(ModelRegistered),
EntityUpdated(EntityUpdated),
EventMessageUpdated(EventMessageUpdated),
Expand All @@ -45,8 +46,17 @@ pub struct DeleteEntityQuery {
pub ty: Ty,
}

#[derive(Debug, Clone)]
pub struct SetHeadQuery {
pub head: u64,
pub last_block_timestamp: u64,
pub txns_count: u64,
pub contract_address: Felt,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetHead(SetHeadQuery),
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
EventMessage(Ty),
Expand Down Expand Up @@ -178,6 +188,35 @@ impl<'c> Executor<'c> {
let tx = &mut self.transaction;

match query_type {
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;

let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count
};

query.execute(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;

let row = sqlx::query("UPDATE contracts SET tps = ? WHERE id = ? RETURNING *")
.bind(tps as i64)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?;

let contract = ContractUpdated::from_row(&row)?;
self.publish_queue.push(BrokerMessage::SetHead(contract));
}
QueryType::SetEntity(entity) => {
let row = query.fetch_one(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down Expand Up @@ -289,6 +328,7 @@ impl<'c> Executor<'c> {

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::SetHead(update) => SimpleBroker::publish(update),
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
Expand Down
33 changes: 24 additions & 9 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use starknet_crypto::poseidon_hash_many;
use tokio::sync::mpsc::UnboundedSender;

use crate::cache::{Model, ModelCache};
use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType};
use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType, SetHeadQuery};
use crate::utils::utc_dt_string_from_timestamp;

type IsEventMessage = bool;
Expand Down Expand Up @@ -86,17 +86,32 @@ impl Sql {
))
}

pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(
pub async fn set_head(
&mut self,
head: u64,
last_block_timestamp: u64,
world_txns_count: u64,
contract_address: Felt,
) -> Result<()> {
let head_arg = Argument::Int(
head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?,
);
let last_block_timestamp_arg =
Argument::Int(last_block_timestamp.try_into().map_err(|_| {
anyhow!("Last block timestamp value {} doesn't fit in i64", last_block_timestamp)
})?);
let id = Argument::FieldElement(self.world_address);
self.executor
.send(QueryMessage::other(
"UPDATE contracts SET head = ? WHERE id = ?".to_string(),
vec![head, id],
))
.map_err(|e| anyhow!("Failed to send set_head message: {}", e))?;

self.executor.send(QueryMessage::new(
"UPDATE contracts SET head = ?, last_block_timestamp = ? WHERE id = ?".to_string(),
vec![head_arg, last_block_timestamp_arg, id],
QueryType::SetHead(SetHeadQuery {
head,
last_block_timestamp,
txns_count: world_txns_count,
contract_address,
}),
))?;

Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,12 @@ pub struct Event {
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
}

#[derive(FromRow, Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "camelCase")]
pub struct Contract {
pub head: i64,
pub tps: i64,
pub last_block_timestamp: i64,
pub contract_address: String,
}
21 changes: 18 additions & 3 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import "google/protobuf/empty.proto";

// The World service provides information about the world.
service World {
// Subscribes to updates about the indexer. Like the head block number, tps, etc.
rpc SubscribeIndexer (SubscribeIndexerRequest) returns (stream SubscribeIndexerResponse);

// Retrieves metadata about the World including all the registered components and systems.
rpc WorldMetadata (MetadataRequest) returns (MetadataResponse);
rpc WorldMetadata (WorldMetadataRequest) returns (WorldMetadataResponse);

// Subscribes to models updates.
rpc SubscribeModels (SubscribeModelsRequest) returns (stream SubscribeModelsResponse);
Expand Down Expand Up @@ -38,14 +41,26 @@ service World {
rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse);
}

// A request to subscribe to indexer updates.
message SubscribeIndexerRequest {
bytes contract_address = 1;
}

// A response containing indexer updates.
message SubscribeIndexerResponse {
int64 head = 1;
int64 tps = 2;
int64 last_block_timestamp = 3;
bytes contract_address = 4;
}

// A request to retrieve metadata for a specific world ID.
message MetadataRequest {
message WorldMetadataRequest {

}

// The metadata response contains addresses and class hashes for the world.
message MetadataResponse {
message WorldMetadataResponse {
types.WorldMetadata metadata = 1;
}

Expand Down
49 changes: 43 additions & 6 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate};
use tonic::transport::Endpoint;

use crate::proto::world::{
world_client, MetadataRequest, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse,
SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest,
world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest,
RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query,
};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -68,7 +71,7 @@ impl WorldClient {
/// Retrieve the metadata of the World.
pub async fn metadata(&mut self) -> Result<dojo_types::WorldMetadata, Error> {
self.inner
.world_metadata(MetadataRequest {})
.world_metadata(WorldMetadataRequest {})
.await
.map_err(Error::Grpc)
.and_then(|res| {
Expand Down Expand Up @@ -107,6 +110,22 @@ impl WorldClient {
self.inner.retrieve_events(request).await.map_err(Error::Grpc).map(|res| res.into_inner())
}

/// Subscribe to indexer updates.
pub async fn subscribe_indexer(
&mut self,
contract_address: Felt,
) -> Result<IndexerUpdateStreaming, Error> {
let request =
SubscribeIndexerRequest { contract_address: contract_address.to_bytes_be().to_vec() };
let stream = self
.inner
.subscribe_indexer(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(IndexerUpdateStreaming(stream.map_ok(Box::new(|res| res.into()))))
}

/// Subscribe to entities updates of a World.
pub async fn subscribe_entities(
&mut self,
Expand Down Expand Up @@ -282,6 +301,24 @@ impl Stream for EventUpdateStreaming {
}
}

type IndexerMappedStream = MapOk<
tonic::Streaming<SubscribeIndexerResponse>,
Box<dyn Fn(SubscribeIndexerResponse) -> IndexerUpdate + Send>,
>;

#[derive(Debug)]
pub struct IndexerUpdateStreaming(IndexerMappedStream);

impl Stream for IndexerUpdateStreaming {
type Item = <IndexerMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

fn empty_state_update() -> StateUpdate {
StateUpdate {
block_hash: Felt::ZERO,
Expand Down
Loading

0 comments on commit 771639c

Please sign in to comment.