Skip to content

Commit

Permalink
feat: add filterv2 functions
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 1, 2023
1 parent 4e7ee9f commit 1f5fd80
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 20 deletions.
36 changes: 34 additions & 2 deletions waku-bindings/src/general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,16 @@ impl ContentFilter {
/// The criteria to create subscription to a light node in JSON Format
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[deprecated]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscription {
pub struct LegacyFilterSubscription {

Check warning on line 222 in waku-bindings/src/general/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

use of deprecated struct `general::LegacyFilterSubscription`

Check warning on line 222 in waku-bindings/src/general/mod.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

use of deprecated struct `general::LegacyFilterSubscription`

Check warning on line 222 in waku-bindings/src/general/mod.rs

View workflow job for this annotation

GitHub Actions / Test

use of deprecated struct `general::LegacyFilterSubscription`
/// Array of [`ContentFilter`] being subscribed to / unsubscribed from
content_filters: Vec<ContentFilter>,
/// Optional pubsub topic
pubsub_topic: Option<WakuPubSubTopic>,
}

impl FilterSubscription {
impl LegacyFilterSubscription {

Check warning on line 229 in waku-bindings/src/general/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

use of deprecated struct `general::LegacyFilterSubscription`

Check warning on line 229 in waku-bindings/src/general/mod.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

use of deprecated struct `general::LegacyFilterSubscription`

Check warning on line 229 in waku-bindings/src/general/mod.rs

View workflow job for this annotation

GitHub Actions / Test

use of deprecated struct `general::LegacyFilterSubscription`
pub fn new(content_filters: Vec<ContentFilter>, pubsub_topic: Option<WakuPubSubTopic>) -> Self {
Self {
content_filters,
Expand All @@ -242,6 +243,37 @@ impl FilterSubscription {
}
}

/// The criteria to create subscription to a filter full node matching a content filter.
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscription {
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
/// optional if using autosharding, mandatory if using static or named sharding.
pubsub_topic: Option<WakuPubSubTopic>,
}

impl FilterSubscription {
pub fn new(
content_topics: Vec<WakuContentTopic>,
pubsub_topic: Option<WakuPubSubTopic>,
) -> Self {
Self {
content_topics,
pubsub_topic,
}
}

pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}

pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> {
self.pubsub_topic.as_ref()
}
}

/// Criteria used to retrieve historical messages
#[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
Expand Down
6 changes: 3 additions & 3 deletions waku-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub use node::{
};

pub use general::{
ContentFilter, DecodedPayload, Encoding, FilterSubscription, MessageId, MessageIndex,
PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic,
WakuMessage, WakuMessageVersion, WakuPubSubTopic,
ContentFilter, DecodedPayload, Encoding, FilterSubscription, LegacyFilterSubscription,

Check warning on line 19 in waku-bindings/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

use of deprecated struct `general::LegacyFilterSubscription`

Check warning on line 19 in waku-bindings/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

use of deprecated struct `general::LegacyFilterSubscription`

Check warning on line 19 in waku-bindings/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test

use of deprecated struct `general::LegacyFilterSubscription`
MessageId, MessageIndex, PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse,
WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
};

pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent};
93 changes: 84 additions & 9 deletions waku-bindings/src/node/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,57 @@ use libc::*;
// internal
use crate::general::Result;
use crate::general::{FilterSubscription, PeerId};
use crate::utils::{get_trampoline, handle_no_response};
use crate::utils::{get_trampoline, handle_response, handle_no_response};

Check warning on line 11 in waku-bindings/src/node/filter.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

unused import: `handle_response`

Check warning on line 11 in waku-bindings/src/node/filter.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

unused import: `handle_response`

Check warning on line 11 in waku-bindings/src/node/filter.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `handle_response`

/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
pub fn waku_filter_subscribe(
filter_subscription: &FilterSubscription,
peer_id: PeerId,
peer_id: Option<PeerId>,
timeout: Duration,
) -> Result<()> {
) -> Result<PeerId> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("FilterSubscription should always be able to be serialized")
.into_raw();
let peer_id_ptr = match peer_id {
None => CString::new(""),
Some(t) => CString::new(t),
}
.expect("CString should build properly from peer id")
.into_raw();

let mut response: String = Default::default();
let response_cb = |v: &str| response = v.to_string();
let code = unsafe {
let mut closure = response_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_subscribe(
filter_subscription_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);

drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));

out
};

TODO: extract the peerID from here?

Check failure on line 55 in waku-bindings/src/node/filter.rs

View workflow job for this annotation

GitHub Actions / Rust lints

expected identifier, found `:`

Check failure on line 55 in waku-bindings/src/node/filter.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

expected identifier, found `:`

Check failure on line 55 in waku-bindings/src/node/filter.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

expected identifier, found `:`

Check failure on line 55 in waku-bindings/src/node/filter.rs

View workflow job for this annotation

GitHub Actions / Test

expected identifier, found `:`
handle_response(code, &response)
}

/// Used to know if a service node has an active subscription for this client
/// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol
pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> {
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();
Expand All @@ -32,8 +68,7 @@ pub fn waku_filter_subscribe(
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_subscribe(
filter_subscription_ptr,
let out = waku_sys::waku_filter_ping(
peer_id_ptr,
timeout
.as_millis()
Expand All @@ -43,7 +78,6 @@ pub fn waku_filter_subscribe(
&mut closure as *mut _ as *mut c_void,
);

drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));

out
Expand All @@ -52,10 +86,12 @@ pub fn waku_filter_subscribe(
handle_no_response(code, &error)
}

/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
/// Sends a requests to a service node to stop pushing messages matching this filter to this client.
/// It might be used to modify an existing subscription by providing a subset of the original filter
/// criteria
pub fn waku_filter_unsubscribe(
filter_subscription: &FilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
Expand All @@ -64,14 +100,18 @@ pub fn waku_filter_unsubscribe(
)
.expect("CString should build properly from the serialized filter subscription")
.into_raw();
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();

let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_unsubscribe(
let out = waku_sys::waku_filter_unsubscribe(
filter_subscription_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
Expand All @@ -81,6 +121,41 @@ pub fn waku_filter_unsubscribe(
);

drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));

out
};

handle_no_response(code, &error)
}

/// Sends a requests to a service node (or all service nodes) to stop pushing messages
/// peerID should contain the ID of a peer this client is subscribed to, or can be None
/// to stop all active subscriptions
pub fn waku_filter_unsubscribe_all(peer_id: Option<PeerId>, timeout: Duration) -> Result<()> {
let peer_id_ptr = match peer_id {
None => CString::new(""),
Some(t) => CString::new(t),
}
.expect("CString should build properly from peer id")
.into_raw();

let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_unsubscribe_all(
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);

drop(CString::from_raw(peer_id_ptr));

out
};
Expand Down
91 changes: 91 additions & 0 deletions waku-bindings/src/node/legacyfilter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! Waku [filter](https://rfc.vac.dev/spec/36/#waku-filter) protocol related methods
// std
use std::ffi::CString;
use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::Result;
use crate::general::{LegacyFilterSubscription, PeerId};
use crate::utils::{get_trampoline, handle_no_response};

/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_legacy_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
#[deprecated]
pub fn waku_legacy_filter_subscribe(
filter_subscription: &LegacyFilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("FilterSubscription should always be able to be serialized")
.into_raw();
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();

let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_subscribe(
filter_subscription_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);

drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));

out
};

handle_no_response(code, &error)
}

/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
#[deprecated]
pub fn waku_legacy_filter_unsubscribe(
filter_subscription: &LegacyFilterSubscription,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("CString should build properly from the serialized filter subscription")
.into_raw();

let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_unsubscribe(
filter_subscription_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);

drop(CString::from_raw(filter_subscription_ptr));

out
};

handle_no_response(code, &error)
}
Loading

0 comments on commit 1f5fd80

Please sign in to comment.