Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii-grpc): event subscription with multiple clauses #2555

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, Query};
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, Query};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

Expand Down Expand Up @@ -159,7 +159,7 @@
/// A direct stream to grpc subscribe starknet events
pub async fn on_starknet_event(
&self,
keys: Option<KeysClause>,
keys: Vec<EntityKeysClause>,

Check warning on line 162 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L162

Added line #L162 was not covered by tests
) -> Result<EventUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_events(keys).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ message RetrieveEventsResponse {
}

message SubscribeEventsRequest {
types.KeysClause keys = 1;
repeated types.EntityKeysClause keys = 1;
}

message SubscribeEventsResponse {
Expand Down
8 changes: 3 additions & 5 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query,
};
use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -206,9 +204,9 @@
/// Subscribe to the events of a World.
pub async fn subscribe_events(
&mut self,
keys: Option<KeysClause>,
keys: Vec<EntityKeysClause>,

Check warning on line 207 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L207

Added line #L207 was not covered by tests
) -> Result<EventUpdateStreaming, Error> {
let keys = keys.map(|c| c.into());
let keys = keys.into_iter().map(|c| c.into()).collect();

Check warning on line 209 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L209

Added line #L209 was not covered by tests

let stream = self
.inner
Expand Down
9 changes: 5 additions & 4 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,9 +834,11 @@

async fn subscribe_events(
&self,
clause: proto::types::KeysClause,
clause: Vec<proto::types::EntityKeysClause>,

Check warning on line 837 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L837

Added line #L837 was not covered by tests
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Receiver<Result<proto::world::SubscribeEventsResponse, tonic::Status>>, Error> {
self.event_manager.add_subscriber(clause.into()).await
self.event_manager
.add_subscriber(clause.into_iter().map(|keys| keys.into()).collect())
.await

Check warning on line 841 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L839-L841

Added lines #L839 - L841 were not covered by tests
}
}

Expand Down Expand Up @@ -1188,8 +1190,7 @@
&self,
request: Request<proto::world::SubscribeEventsRequest>,
) -> ServiceResult<Self::SubscribeEventsStream> {
let keys = request.into_inner().keys.unwrap_or_default();

let keys = request.into_inner().keys;

Check warning on line 1193 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1193

Added line #L1193 was not covered by tests
let rx = self.subscribe_events(keys).await.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEventsStream))
Expand Down
65 changes: 3 additions & 62 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
use torii_core::types::OptimisticEntity;
use tracing::{error, trace};

use super::match_entity_keys;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::types::{EntityKeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";

Expand Down Expand Up @@ -128,67 +129,7 @@

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !sub.clauses.is_empty()
&& !sub.clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&hashed)
}
EntityKeysClause::Keys(clause) => {
// if we have a model clause, then we need to check that the entity
// has an updated model and that the model name matches the clause
if let Some(updated_model) = &entity.updated_model {
let name = updated_model.name();
let (namespace, name) = name.split_once('-').unwrap();

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
// if namespace is specified and model is empty or * we should
// match all models in the
// namespace if namespace
// and model are specified, we should match the
// specific model
(clause_namespace.is_empty()
|| clause_namespace == namespace
|| clause_namespace == "*")
&& (clause_model.is_empty()
|| clause_model == name
|| clause_model == "*")
})
{
return false;
}
}

// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {

Check warning on line 132 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L132

Added line #L132 was not covered by tests
continue;
}

Expand Down
35 changes: 5 additions & 30 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
use torii_core::types::Event;
use tracing::{error, trace};

use super::match_keys;
use crate::proto;
use crate::proto::world::SubscribeEventsResponse;
use crate::types::{KeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event";

#[derive(Debug)]
pub struct EventSubscriber {
/// Event keys that the subscriber is interested in
keys: KeysClause,
keys: Vec<EntityKeysClause>,
/// The channel to send the response back to the subscriber.
sender: Sender<Result<proto::world::SubscribeEventsResponse, tonic::Status>>,
}
Expand All @@ -41,7 +42,7 @@
impl EventManager {
pub async fn add_subscriber(
&self,
keys: KeysClause,
keys: Vec<EntityKeysClause>,

Check warning on line 45 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L45

Added line #L45 was not covered by tests
) -> Result<Receiver<Result<proto::world::SubscribeEventsResponse, tonic::Status>>, Error> {
let id = rand::thread_rng().gen::<usize>();
let (sender, receiver) = channel(1);
Expand Down Expand Up @@ -108,33 +109,7 @@
.map_err(ParseError::from)?;

for (idx, sub) in subs.subscribers.read().await.iter() {
// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if sub.keys.pattern_matching == PatternMatching::FixedLen
&& keys.len() != sub.keys.keys.len()
{
continue;
}

if !keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber key pattern
// in this case we might want to list all events with the same
// key selector so we can match them all
let sub_key = sub.keys.keys.get(idx);

// if we have a key in the subscriber, it must match the key in the event
// unless its empty, which is a wildcard
match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
}) {
if !match_keys(&keys, &sub.keys) {

Check warning on line 112 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L112

Added line #L112 was not covered by tests
continue;
}

Expand Down
65 changes: 3 additions & 62 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
use tracing::{error, trace};

use super::entity::EntitiesSubscriber;
use super::match_entity_keys;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::types::{EntityKeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message";

Expand Down Expand Up @@ -120,67 +121,7 @@

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !sub.clauses.is_empty()
&& !sub.clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&hashed)
}
EntityKeysClause::Keys(clause) => {
// if we have a model clause, then we need to check that the entity
// has an updated model and that the model name matches the clause
if let Some(updated_model) = &entity.updated_model {
let name = updated_model.name();
let (namespace, name) = name.split_once('-').unwrap();

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
// if namespace is specified and model is empty or * we should
// match all models in the
// namespace if namespace
// and model are specified, we should match the
// specific model
(clause_namespace.is_empty()
|| clause_namespace == namespace
|| clause_namespace == "*")
&& (clause_model.is_empty()
|| clause_model == name
|| clause_model == "*")
})
{
return false;
}
}

// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {

Check warning on line 124 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L124

Added line #L124 was not covered by tests
continue;
}

Expand Down
Loading
Loading