From d2d7ba68f09784ece94778d1b124731b7acc5344 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 20 Oct 2023 15:50:28 -0400 Subject: [PATCH] feat: waku_filter_subscribe --- Cargo.lock | 4 +- examples/Cargo.lock | 5 +- examples/toy-chat/src/main.rs | 12 ++-- waku-bindings/Cargo.toml | 4 +- waku-bindings/src/encrypt.rs | 4 +- waku-bindings/src/events/mod.rs | 4 +- waku-bindings/src/general/mod.rs | 79 +++++++++++++++++++---- waku-bindings/src/lib.rs | 7 ++- waku-bindings/src/node/discovery.rs | 10 +-- waku-bindings/src/node/filter.rs | 86 ++++++++++++++++---------- waku-bindings/src/node/legacyfilter.rs | 2 - waku-bindings/src/node/mod.rs | 46 ++++++++------ waku-bindings/src/node/relay.rs | 42 ++++++------- waku-bindings/src/utils.rs | 24 ++++--- waku-sys/Cargo.toml | 2 +- waku-sys/vendor | 2 +- 16 files changed, 208 insertions(+), 125 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 77b0b01..3a6fb9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1674,7 +1674,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "waku-bindings" -version = "0.3.0" +version = "0.4.0" dependencies = [ "aes-gcm", "base64 0.21.0", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "waku-sys" -version = "0.3.0" +version = "0.4.0" dependencies = [ "bindgen", ] diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 7b78dab..8195d8a 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -1497,12 +1497,13 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "waku-bindings" -version = "0.1.1" +version = "0.4.0" dependencies = [ "aes-gcm", "base64 0.21.0", "enr", "hex", + "libc", "multiaddr", "once_cell", "rand", @@ -1517,7 +1518,7 @@ dependencies = [ [[package]] name = "waku-sys" -version = "0.1.0" +version = "0.3.0" dependencies = [ "bindgen", ] diff --git a/examples/toy-chat/src/main.rs b/examples/toy-chat/src/main.rs index 2e5f3fe..dbbc5b2 100644 --- a/examples/toy-chat/src/main.rs +++ b/examples/toy-chat/src/main.rs @@ -23,8 +23,8 @@ use tui::{ }; use unicode_width::UnicodeWidthStr; use waku_bindings::{ - waku_new, waku_set_event_callback, ContentFilter, Multiaddr, PagingOptions, ProtocolId, - Running, StoreQuery, WakuMessage, WakuNodeHandle, + waku_new, waku_set_event_callback, Multiaddr, PagingOptions, ProtocolId, Running, + StoreQuery, WakuMessage, WakuNodeHandle, waku_default_pubsub_topic, ContentFilter, }; enum InputMode { @@ -76,7 +76,7 @@ fn retrieve_history( let result = node_handle.store_query( &StoreQuery { pubsub_topic: None, - content_filters: vec![ContentFilter::new(TOY_CHAT_CONTENT_TOPIC.clone())], + content_topics: vec![TOY_CHAT_CONTENT_TOPIC.clone()], start_time: Some( (Duration::from_secs(Utc::now().timestamp() as u64) - Duration::from_secs(60 * 60 * 24)) @@ -110,7 +110,9 @@ fn setup_node_handle() -> std::result::Result, Box( ); if let Err(e) = app.node_handle - .relay_publish_message(&waku_message, None, None) + .relay_publish_message(&waku_message, Some(waku_default_pubsub_topic()), None) { let mut out = std::io::stderr(); write!(out, "{e:?}").unwrap(); diff --git a/waku-bindings/Cargo.toml b/waku-bindings/Cargo.toml index 3814e1c..1ac48c7 100644 --- a/waku-bindings/Cargo.toml +++ b/waku-bindings/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "waku-bindings" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = [ "Daniel Sanchez Quiros " @@ -26,7 +26,7 @@ serde_json = "1.0" sscanf = "0.4" smart-default = "0.6" url = "2.3" -waku-sys = { version = "0.3.0", path = "../waku-sys" } +waku-sys = { version = "0.4.0", path = "../waku-sys" } libc = "0.2" [dev-dependencies] diff --git a/waku-bindings/src/encrypt.rs b/waku-bindings/src/encrypt.rs index 3d09417..0933c86 100644 --- a/waku-bindings/src/encrypt.rs +++ b/waku-bindings/src/encrypt.rs @@ -17,7 +17,7 @@ pub fn waku_encode_asymmetric( let pk = hex::encode(public_key.serialize_uncompressed()); let sk = signing_key .map(|signing_key| hex::encode(signing_key.secret_bytes())) - .unwrap_or_else(String::new); + .unwrap_or_default(); let message_ptr = CString::new( serde_json::to_string(&message) .expect("WakuMessages should always be able to success serializing"), @@ -63,7 +63,7 @@ pub fn waku_encode_symmetric( let symk = hex::encode(symmetric_key.as_slice()); let sk = signing_key .map(|signing_key| hex::encode(signing_key.secret_bytes())) - .unwrap_or_else(String::new); + .unwrap_or_default(); let message_ptr = CString::new( serde_json::to_string(&message) .expect("WakuMessages should always be able to success serializing"), diff --git a/waku-bindings/src/events/mod.rs b/waku-bindings/src/events/mod.rs index 445fc49..4a0ed0a 100644 --- a/waku-bindings/src/events/mod.rs +++ b/waku-bindings/src/events/mod.rs @@ -5,7 +5,7 @@ //! When an event is emitted, this callback will be triggered receiving a [`Signal`] // std -use std::ffi::{c_char, c_void, CStr}; +use std::ffi::{c_char, c_int, c_void, CStr}; use std::ops::Deref; use std::sync::Mutex; // crates @@ -79,7 +79,7 @@ fn set_callback(f: F) { /// Wrapper callback, it transformst the `*const c_char` into a [`Signal`] /// and executes the [`CALLBACK`] funtion with it -extern "C" fn callback(data: *const c_char, _user_data: *mut c_void) { +extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c_void) { let raw_response = unsafe { CStr::from_ptr(data) } .to_str() .expect("Not null ptr"); diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index 7ab0e2c..cfa1659 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -199,12 +199,12 @@ impl DecodedPayload { /// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type) #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct ContentFilter { +pub struct LegacyContentFilter { /// The content topic of a Waku message content_topic: WakuContentTopic, } -impl ContentFilter { +impl LegacyContentFilter { pub fn new(content_topic: WakuContentTopic) -> Self { Self { content_topic } } @@ -217,7 +217,6 @@ impl ContentFilter { /// The criteria to create subscription to a light node in JSON Format /// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) #[derive(Clone, Serialize, Deserialize, Debug)] -#[deprecated] #[serde(rename_all = "camelCase")] pub struct LegacyFilterSubscription { /// Array of [`ContentFilter`] being subscribed to / unsubscribed from @@ -244,20 +243,19 @@ impl LegacyFilterSubscription { } /// The criteria to create subscription to a filter full node matching a content filter. -/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct FilterSubscription { - /// mandatory, at least one required, with a max of 10 - content_topics: Vec, +pub struct ContentFilter { /// optional if using autosharding, mandatory if using static or named sharding. pubsub_topic: Option, + /// mandatory, at least one required, with a max of 10 + content_topics: Vec, } -impl FilterSubscription { +impl ContentFilter { pub fn new( - content_topics: Vec, pubsub_topic: Option, + content_topics: Vec, ) -> Self { Self { content_topics, @@ -274,14 +272,73 @@ impl FilterSubscription { } } +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct FilterSubscriptionDetail { + #[serde(rename = "peerID")] + peer_id: PeerId, + content_topics: Vec, + pubsub_topic: WakuPubSubTopic, +} + +impl FilterSubscriptionDetail { + pub fn new( + peer_id: PeerId, + content_topics: Vec, + pubsub_topic: WakuPubSubTopic, + ) -> Self { + Self { + peer_id, + content_topics, + pubsub_topic, + } + } + + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } + + pub fn content_topics(&self) -> &[WakuContentTopic] { + &self.content_topics + } + + pub fn pubsub_topic(&self) -> &WakuPubSubTopic { + &self.pubsub_topic + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct FilterSubscriptionResult { + subscriptions: Vec, + error: Option, +} + +impl FilterSubscriptionResult { + pub fn new(subscriptions: Vec, error: Option) -> Self { + Self { + subscriptions, + error, + } + } + + pub fn subscriptions(&self) -> &[FilterSubscriptionDetail] { + &self.subscriptions + } + + pub fn error(&self) -> &Option { + &self.error + } +} + /// Criteria used to retrieve historical messages #[derive(Clone, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct StoreQuery { /// The pubsub topic on which messages are published pub pubsub_topic: Option, - /// Array of [`ContentFilter`] to query for historical messages - pub content_filters: Vec, + /// Array of [`WakuContentTopic`] to query for historical messages + pub content_topics: Vec, /// The inclusive lower bound on the timestamp of queried messages. /// This field holds the Unix epoch time in nanoseconds pub start_time: Option, diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index be7f049..0589b7d 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -16,9 +16,10 @@ pub use node::{ }; pub use general::{ - ContentFilter, DecodedPayload, Encoding, FilterSubscription, LegacyFilterSubscription, - MessageId, MessageIndex, PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, - WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic, + ContentFilter, DecodedPayload, Encoding, FilterSubscriptionDetail, FilterSubscriptionResult, + LegacyContentFilter, LegacyFilterSubscription, MessageId, MessageIndex, PagingOptions, PeerId, + ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage, + WakuMessageVersion, WakuPubSubTopic, }; pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent}; diff --git a/waku-bindings/src/node/discovery.rs b/waku-bindings/src/node/discovery.rs index a650a73..5a38af5 100644 --- a/waku-bindings/src/node/discovery.rs +++ b/waku-bindings/src/node/discovery.rs @@ -31,13 +31,9 @@ pub fn waku_dns_discovery( let url = CString::new(url.to_string()) .expect("CString should build properly from a valid Url") .into_raw(); - let server = CString::new( - server - .map(|host| host.to_string()) - .unwrap_or_else(|| "".to_string()), - ) - .expect("CString should build properly from a String nameserver") - .into_raw(); + let server = CString::new(server.map(|host| host.to_string()).unwrap_or_default()) + .expect("CString should build properly from a String nameserver") + .into_raw(); let mut result: String = Default::default(); let result_cb = |v: &str| result = v.to_string(); diff --git a/waku-bindings/src/node/filter.rs b/waku-bindings/src/node/filter.rs index 40cb21c..70336f2 100644 --- a/waku-bindings/src/node/filter.rs +++ b/waku-bindings/src/node/filter.rs @@ -7,21 +7,21 @@ use std::time::Duration; use libc::*; // internal use crate::general::Result; -use crate::general::{FilterSubscription, PeerId}; -use crate::utils::{get_trampoline, handle_response, handle_no_response}; +use crate::general::{ContentFilter, FilterSubscriptionResult, PeerId}; +use crate::utils::{get_trampoline, handle_json_response, handle_no_response}; /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) pub fn waku_filter_subscribe( - filter_subscription: &FilterSubscription, + content_filter: &ContentFilter, peer_id: Option, - timeout: Duration, -) -> Result { - let filter_subscription_ptr = CString::new( - serde_json::to_string(filter_subscription) - .expect("FilterSubscription should always succeed to serialize"), + timeout: Option, +) -> Result { + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), ) - .expect("FilterSubscription should always be able to be serialized") + .expect("ContentFilter should always be able to be serialized") .into_raw(); let peer_id_ptr = match peer_id { None => CString::new(""), @@ -36,29 +36,32 @@ pub fn waku_filter_subscribe( let mut closure = response_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_filter_subscribe( - filter_subscription_ptr, + content_filter_ptr, peer_id_ptr, timeout - .as_millis() - .try_into() - .expect("Duration as milliseconds should fit in a i32"), + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(content_filter_ptr)); drop(CString::from_raw(peer_id_ptr)); out }; - TODO: extract the peerID from here? - handle_response(code, &response) + handle_json_response(code, &response) } /// Used to know if a service node has an active subscription for this client /// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol -pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> { +pub fn waku_filter_ping(peer_id: PeerId, timeout: Option) -> Result<()> { let peer_id_ptr = CString::new(peer_id) .expect("PeerId should always be able to be serialized") .into_raw(); @@ -71,9 +74,13 @@ pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> { let out = waku_sys::waku_filter_ping( peer_id_ptr, timeout - .as_millis() - .try_into() - .expect("Duration as milliseconds should fit in a i32"), + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), cb, &mut closure as *mut _ as *mut c_void, ); @@ -90,13 +97,13 @@ pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> { /// It might be used to modify an existing subscription by providing a subset of the original filter /// criteria pub fn waku_filter_unsubscribe( - filter_subscription: &FilterSubscription, + content_filter: &ContentFilter, peer_id: PeerId, - timeout: Duration, + timeout: Option, ) -> Result<()> { - let filter_subscription_ptr = CString::new( - serde_json::to_string(filter_subscription) - .expect("FilterSubscription should always succeed to serialize"), + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), ) .expect("CString should build properly from the serialized filter subscription") .into_raw(); @@ -110,17 +117,21 @@ pub fn waku_filter_unsubscribe( let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_filter_unsubscribe( - filter_subscription_ptr, + content_filter_ptr, peer_id_ptr, timeout - .as_millis() - .try_into() - .expect("Duration as milliseconds should fit in a i32"), + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(content_filter_ptr)); drop(CString::from_raw(peer_id_ptr)); out @@ -132,7 +143,10 @@ pub fn waku_filter_unsubscribe( /// Sends a requests to a service node (or all service nodes) to stop pushing messages /// peerID should contain the ID of a peer this client is subscribed to, or can be None /// to stop all active subscriptions -pub fn waku_filter_unsubscribe_all(peer_id: Option, timeout: Duration) -> Result<()> { +pub fn waku_filter_unsubscribe_all( + peer_id: Option, + timeout: Option, +) -> Result<()> { let peer_id_ptr = match peer_id { None => CString::new(""), Some(t) => CString::new(t), @@ -148,9 +162,13 @@ pub fn waku_filter_unsubscribe_all(peer_id: Option, timeout: Duration) - let out = waku_sys::waku_filter_unsubscribe_all( peer_id_ptr, timeout - .as_millis() - .try_into() - .expect("Duration as milliseconds should fit in a i32"), + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), cb, &mut closure as *mut _ as *mut c_void, ); diff --git a/waku-bindings/src/node/legacyfilter.rs b/waku-bindings/src/node/legacyfilter.rs index 4803daf..35ef177 100644 --- a/waku-bindings/src/node/legacyfilter.rs +++ b/waku-bindings/src/node/legacyfilter.rs @@ -12,7 +12,6 @@ use crate::utils::{get_trampoline, handle_no_response}; /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_legacy_filter_subscribechar-filterjson-char-peerid-int-timeoutms) -#[deprecated] pub fn waku_legacy_filter_subscribe( filter_subscription: &LegacyFilterSubscription, peer_id: PeerId, @@ -55,7 +54,6 @@ pub fn waku_legacy_filter_subscribe( /// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) -#[deprecated] pub fn waku_legacy_filter_unsubscribe( filter_subscription: &LegacyFilterSubscription, timeout: Duration, diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 22f4a3f..e991dba 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -21,8 +21,8 @@ use std::time::Duration; // internal use crate::general::{ - FilterSubscription, LegacyFilterSubscription, MessageId, PeerId, ProtocolId, Result, - StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic, + ContentFilter, FilterSubscriptionResult, LegacyFilterSubscription, MessageId, PeerId, + ProtocolId, Result, StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic, }; pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams}; @@ -149,8 +149,9 @@ impl WakuNodeHandle { peers::waku_peers() } - /// Publish a message using Waku Relay + /// Publish a message using Waku Relay. /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) + /// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic. pub fn relay_publish_message( &self, message: &WakuMessage, @@ -165,14 +166,20 @@ impl WakuNodeHandle { relay::waku_enough_peers(pubsub_topic) } - /// Subscribe to a Waku Relay pubsub topic to receive messages - pub fn relay_subscribe(&self, pubsub_topic: Option) -> Result<()> { - relay::waku_relay_subscribe(pubsub_topic) + /// Subscribe to WakuRelay to receive messages matching a content filter. + pub fn relay_subscribe( + &self, + content_filter: &ContentFilter, + ) -> Result<()> { + relay::waku_relay_subscribe(content_filter) } - /// Closes the pubsub subscription to a pubsub topic. No more messages will be received from this pubsub topic - pub fn relay_unsubscribe(&self, pubsub_topic: Option) -> Result<()> { - relay::waku_relay_unsubscribe(pubsub_topic) + /// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic + pub fn relay_unsubscribe( + &self, + content_filter: &ContentFilter, + ) -> Result<()> { + relay::waku_relay_unsubscribe(content_filter) } /// Returns the list of pubsub topics the node is subscribed to in Waku Relay @@ -201,8 +208,9 @@ impl WakuNodeHandle { store::waku_local_store_query(query) } - /// Publish a message using Waku Lightpush + /// Publish a message using Waku Lightpush. /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publishchar-messagejson-char-topic-char-peerid-int-timeoutms) + /// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic. pub fn lightpush_publish( &self, message: &WakuMessage, @@ -240,15 +248,15 @@ impl WakuNodeHandle { /// Returns the PeerId on which the filter subscription was created pub fn filter_subscribe( &self, - filter_subscription: &FilterSubscription, + content_filter: &ContentFilter, peer_id: Option, - timeout: Duration, - ) -> Result { - filter::waku_filter_subscribe(filter_subscription, peer_id, timeout) + timeout: Option, + ) -> Result { + filter::waku_filter_subscribe(content_filter, peer_id, timeout) } /// Used to know if a service node has an active subscription for this client - pub fn filter_ping(&self, peer_id: PeerId, timeout: Duration) -> Result<()> { + pub fn filter_ping(&self, peer_id: PeerId, timeout: Option) -> Result<()> { filter::waku_filter_ping(peer_id, timeout) } @@ -257,18 +265,18 @@ impl WakuNodeHandle { /// criteria pub fn filter_unsubscribe( &self, - filter_subscription: &FilterSubscription, + content_filter: &ContentFilter, peer_id: PeerId, - timeout: Duration, + timeout: Option, ) -> Result<()> { - filter::waku_filter_unsubscribe(filter_subscription, peer_id, timeout) + filter::waku_filter_unsubscribe(content_filter, peer_id, timeout) } /// Sends a requests to a service node (or all service nodes) to stop pushing messages pub fn filter_unsubscribe_all( &self, peer_id: Option, - timeout: Duration, + timeout: Option, ) -> Result<()> { filter::waku_filter_unsubscribe_all(peer_id, timeout) } diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index bce3483..7bf600c 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -6,7 +6,7 @@ use std::time::Duration; // crates use libc::*; // internal -use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic}; +use crate::general::{ContentFilter, Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic}; use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response}; /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) @@ -191,27 +191,25 @@ pub fn waku_enough_peers(pubsub_topic: Option) -> Result handle_response(code, &result) } -pub fn waku_relay_subscribe(pubsub_topic: Option) -> Result<()> { - let pubsub_topic = pubsub_topic - .unwrap_or_else(waku_default_pubsub_topic) - .to_string(); - - let pubsub_topic_ptr = CString::new(pubsub_topic) - .expect("CString should build properly from pubsub topic") - .into_raw(); - +pub fn waku_relay_subscribe(content_filter: &ContentFilter) -> Result<()> { + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), + ) + .expect("ContentFilter should always be able to be serialized") + .into_raw(); let mut error: String = Default::default(); let error_cb = |v: &str| error = v.to_string(); let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( - pubsub_topic_ptr, + content_filter_ptr, cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); + drop(CString::from_raw(content_filter_ptr)); out }; @@ -219,27 +217,25 @@ pub fn waku_relay_subscribe(pubsub_topic: Option) -> Result<()> handle_no_response(code, &error) } -pub fn waku_relay_unsubscribe(pubsub_topic: Option) -> Result<()> { - let pubsub_topic = pubsub_topic - .unwrap_or_else(waku_default_pubsub_topic) - .to_string(); - - let pubsub_topic_ptr = CString::new(pubsub_topic) - .expect("CString should build properly from pubsub topic") - .into_raw(); - +pub fn waku_relay_unsubscribe(content_filter: &ContentFilter) -> Result<()> { + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), + ) + .expect("ContentFilter should always be able to be serialized") + .into_raw(); let mut error: String = Default::default(); let error_cb = |v: &str| error = v.to_string(); let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( - pubsub_topic_ptr, + content_filter_ptr, cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); + drop(CString::from_raw(content_filter_ptr)); out }; diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index 07be7a4..e945e1e 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -10,21 +10,27 @@ pub fn decode(input: &str) -> Result { } unsafe extern "C" fn trampoline( + _ret_code: ::std::os::raw::c_int, data: *const ::std::os::raw::c_char, user_data: *mut ::std::os::raw::c_void, ) where F: FnMut(&str), { let user_data = &mut *(user_data as *mut F); - let response = unsafe { CStr::from_ptr(data) } - .to_str() - .map_err(|err| { - format!( - "could not retrieve response from pointer returned by waku: {}", - err - ) - }) - .expect("could not retrieve response"); + + let response = if data.is_null() { + "" + } else { + unsafe { CStr::from_ptr(data) } + .to_str() + .map_err(|err| { + format!( + "could not retrieve response from pointer returned by waku: {}", + err + ) + }) + .expect("could not retrieve response") + }; user_data(response); } diff --git a/waku-sys/Cargo.toml b/waku-sys/Cargo.toml index 71d43dc..c4fb72a 100644 --- a/waku-sys/Cargo.toml +++ b/waku-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "waku-sys" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = [ "Daniel Sanchez Quiros " diff --git a/waku-sys/vendor b/waku-sys/vendor index b3bd45f..f29008f 160000 --- a/waku-sys/vendor +++ b/waku-sys/vendor @@ -1 +1 @@ -Subproject commit b3bd45f01f1211cb18fb44ced5277758ab38eee7 +Subproject commit f29008f7278720a34ac281200075e7d5ecaa1e4b