Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: Index subscriptions by their parsed query instead of its string representation #1433

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ walkdir = { version = "2.3", default-features = false }
flex-error = { version = "0.4.4", default-features = false }
subtle = { version = "2", default-features = false }
semver = { version = "1.0", default-features = false }
ordered-float = { version = "4.0", default-features = false }

# Optional dependencies
async-tungstenite = { version = "0.24", default-features = false, features = ["tokio-runtime", "tokio-rustls-native-certs"], optional = true }
Expand Down
4 changes: 2 additions & 2 deletions rpc/src/client/transport/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl MockClientDriver {
self.subscribe(id, query, subscription_tx, result_tx);
}
DriverCommand::Unsubscribe { query, result_tx } => {
self.unsubscribe(query, result_tx);
self.unsubscribe(&query, result_tx);
}
DriverCommand::Publish(event) => self.publish(*event),
DriverCommand::Terminate => return Ok(()),
Expand All @@ -184,7 +184,7 @@ impl MockClientDriver {
result_tx.send(Ok(())).unwrap();
}

fn unsubscribe(&mut self, query: Query, result_tx: ChannelTx<Result<(), Error>>) {
fn unsubscribe(&mut self, query: &Query, result_tx: ChannelTx<Result<(), Error>>) {
self.router.remove_by_query(query);
result_tx.send(Ok(())).unwrap();
}
Expand Down
238 changes: 160 additions & 78 deletions rpc/src/client/transport/router.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
//! Event routing for subscriptions.

use core::str::FromStr;

use alloc::collections::{BTreeMap as HashMap, BTreeSet as HashSet};

use tracing::debug;

use crate::{client::subscription::SubscriptionTx, error::Error, event::Event, prelude::*};
use crate::client::subscription::SubscriptionTx;
use crate::error::Error;
use crate::event::Event;
use crate::prelude::*;
use crate::query::Query;

pub type SubscriptionQuery = String;
pub type SubscriptionQuery = Query;
pub type SubscriptionId = String;

#[cfg_attr(not(feature = "websocket"), allow(dead_code))]
Expand Down Expand Up @@ -53,7 +59,17 @@ impl SubscriptionRouter {
/// event is relevant, based on the associated query.
#[cfg_attr(not(feature = "websocket"), allow(dead_code))]
pub fn publish_event(&mut self, ev: Event) -> PublishResult {
self.publish(ev.query.clone(), Ok(ev))
let query = match Query::from_str(&ev.query) {
Ok(query) => query,
Err(e) => {
return PublishResult::Error(format!(
"Failed to parse query from event: {:?}, reason: {e}",
ev.query
));
},
};

self.publish(query, Ok(ev))
}

/// Publishes the given event/error to all of the subscriptions to which the
Expand Down Expand Up @@ -91,23 +107,15 @@ impl SubscriptionRouter {

/// Immediately add a new subscription to the router without waiting for
/// confirmation.
pub fn add(&mut self, id: impl ToString, query: impl ToString, tx: SubscriptionTx) {
let query = query.to_string();
let subs_for_query = match self.subscriptions.get_mut(&query) {
Some(s) => s,
None => {
self.subscriptions.insert(query.clone(), HashMap::new());
self.subscriptions.get_mut(&query).unwrap()
},
};

pub fn add(&mut self, id: impl ToString, query: SubscriptionQuery, tx: SubscriptionTx) {
let subs_for_query = self.subscriptions.entry(query).or_default();
subs_for_query.insert(id.to_string(), tx);
}

/// Removes all the subscriptions relating to the given query.
pub fn remove_by_query(&mut self, query: impl ToString) -> usize {
pub fn remove_by_query(&mut self, query: &SubscriptionQuery) -> usize {
self.subscriptions
.remove(&query.to_string())
.remove(query)
.map(|subs_for_query| subs_for_query.len())
.unwrap_or(0)
}
Expand All @@ -116,9 +124,9 @@ impl SubscriptionRouter {
#[cfg(feature = "websocket-client")]
impl SubscriptionRouter {
/// Returns the number of active subscriptions for the given query.
pub fn num_subscriptions_for_query(&self, query: impl ToString) -> usize {
pub fn num_subscriptions_for_query(&self, query: &SubscriptionQuery) -> usize {
self.subscriptions
.get(&query.to_string())
.get(query)
.map(|subs_for_query| subs_for_query.len())
.unwrap_or(0)
}
Expand All @@ -129,7 +137,8 @@ pub enum PublishResult {
Success,
NoSubscribers,
// All subscriptions for the given query have disconnected.
AllDisconnected(String),
AllDisconnected(SubscriptionQuery),
Error(String),
}

#[cfg(test)]
Expand Down Expand Up @@ -178,6 +187,107 @@ mod test {
}
}

async fn test_router_basic_pub_sub(mut ev: Event) {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

let query1: Query = "tm.event = 'Tx'".parse().unwrap();
let query2: Query = "tm.event = 'NewBlock'".parse().unwrap();

// Two subscriptions with the same query
router.add(subs1_id, query1.clone(), subs1_event_tx);
router.add(subs2_id, query1.clone(), subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, query2.clone(), subs3_event_tx);

ev.query = query1.to_string();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = query2.to_string();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
}

async fn test_router_pub_sub_diff_event_type_format(mut ev: Event) {
let mut router = SubscriptionRouter::default();

let subs1_id = uuid_str();
let (subs1_event_tx, mut subs1_event_rx) = unbounded();

let query1: Query = "tm.event = 'Tx'".parse().unwrap();
router.add(subs1_id, query1.clone(), subs1_event_tx);

// Query is equivalent but formatted slightly differently
ev.query = "tm.event='Tx'".to_string();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
assert_eq!(ev, subs1_ev);
}

async fn test_router_pub_sub_two_eq_queries_diff_format(mut ev1: Event, mut ev2: Event) {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

let query1: Query =
"tm.event = 'Tx' AND message.module = 'ibc_client' AND message.foo = 'bar'"
.parse()
.unwrap();
let query2: Query =
"message.module = 'ibc_client' AND message.foo = 'bar' AND tm.event = 'Tx'"
.parse()
.unwrap();

assert_eq!(query1, query2);

let query3: Query = "tm.event = 'NewBlock'".parse().unwrap();

router.add(subs1_id, query1.clone(), subs1_event_tx);
router.add(subs2_id, query2.clone(), subs2_event_tx);
router.add(subs3_id, query3.clone(), subs3_event_tx);

std::dbg!(&router);

// Queries are equivalent but formatted slightly differently
ev1.query =
"tm.event='Tx' AND message.module='ibc_client' AND message.foo='bar'".to_string();
router.publish_event(ev1.clone());

ev2.query =
"message.module='ibc_client' AND message.foo='bar' AND tm.event='Tx'".to_string();
router.publish_event(ev2.clone());

let subs1_ev1 = must_recv(&mut subs1_event_rx, 500).await.unwrap();
assert_eq!(ev1, subs1_ev1);
let subs2_ev1 = must_recv(&mut subs2_event_rx, 500).await.unwrap();
assert_eq!(ev1, subs2_ev1);

let subs1_ev2 = must_recv(&mut subs1_event_rx, 500).await.unwrap();
assert_eq!(ev2, subs1_ev2);
let subs2_ev2 = must_recv(&mut subs2_event_rx, 500).await.unwrap();
assert_eq!(ev2, subs2_ev2);

must_not_recv(&mut subs3_event_rx, 50).await;
}

mod v0_34 {
use super::*;

Expand All @@ -193,36 +303,22 @@ mod test {

#[tokio::test]
async fn router_basic_pub_sub() {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

// Two subscriptions with the same query
router.add(subs1_id, "query1", subs1_event_tx);
router.add(subs2_id, "query1", subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, "query2", subs3_event_tx);

let mut ev = read_event("subscribe_newblock_0").await;
ev.query = "query1".into();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = "query2".into();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await
}

#[tokio::test]
async fn router_pub_sub_diff_event_type_format() {
test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await)
.await
}

#[tokio::test]
async fn router_pub_sub_two_eq_queries_diff_format() {
test_router_pub_sub_two_eq_queries_diff_format(
read_event("subscribe_newblock_0").await,
read_event("subscribe_newblock_1").await,
)
.await
}
}

Expand All @@ -241,36 +337,22 @@ mod test {

#[tokio::test]
async fn router_basic_pub_sub() {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

// Two subscriptions with the same query
router.add(subs1_id, "query1", subs1_event_tx);
router.add(subs2_id, "query1", subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, "query2", subs3_event_tx);

let mut ev = read_event("subscribe_newblock_0").await;
ev.query = "query1".into();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = "query2".into();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
test_router_basic_pub_sub(read_event("subscribe_newblock_0").await).await
}

#[tokio::test]
async fn router_pub_sub_diff_event_type_format() {
test_router_pub_sub_diff_event_type_format(read_event("subscribe_newblock_0").await)
.await
}

#[tokio::test]
async fn router_pub_sub_two_eq_queries_diff_format() {
test_router_pub_sub_two_eq_queries_diff_format(
read_event("subscribe_newblock_0").await,
read_event("subscribe_newblock_1").await,
)
.await
}
}
}
Loading
Loading