Skip to content

Commit

Permalink
feat: waku_filter_subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 27, 2023
1 parent 1f5fd80 commit d2d7ba6
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 125 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions examples/toy-chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use tui::{
};
use unicode_width::UnicodeWidthStr;
use waku_bindings::{
waku_new, waku_set_event_callback, ContentFilter, Multiaddr, PagingOptions, ProtocolId,
Running, StoreQuery, WakuMessage, WakuNodeHandle,
waku_new, waku_set_event_callback, Multiaddr, PagingOptions, ProtocolId, Running,
StoreQuery, WakuMessage, WakuNodeHandle, waku_default_pubsub_topic, ContentFilter,
};

enum InputMode {
Expand Down Expand Up @@ -76,7 +76,7 @@ fn retrieve_history(
let result = node_handle.store_query(
&StoreQuery {
pubsub_topic: None,
content_filters: vec![ContentFilter::new(TOY_CHAT_CONTENT_TOPIC.clone())],
content_topics: vec![TOY_CHAT_CONTENT_TOPIC.clone()],
start_time: Some(
(Duration::from_secs(Utc::now().timestamp() as u64)
- Duration::from_secs(60 * 60 * 24))
Expand Down Expand Up @@ -110,7 +110,9 @@ fn setup_node_handle() -> std::result::Result<WakuNodeHandle<Running>, Box<dyn E
let peerid = node_handle.add_peer(&address, ProtocolId::Relay)?;
node_handle.connect_peer_with_id(&peerid, None)?;
}
node_handle.relay_subscribe(None)?;

let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node_handle.relay_subscribe(&content_filter)?;
Ok(node_handle)
}

Expand Down Expand Up @@ -209,7 +211,7 @@ fn run_app<B: Backend>(
);
if let Err(e) =
app.node_handle
.relay_publish_message(&waku_message, None, None)
.relay_publish_message(&waku_message, Some(waku_default_pubsub_topic()), None)
{
let mut out = std::io::stderr();
write!(out, "{e:?}").unwrap();
Expand Down
4 changes: 2 additions & 2 deletions waku-bindings/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "waku-bindings"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"
Expand All @@ -26,7 +26,7 @@ serde_json = "1.0"
sscanf = "0.4"
smart-default = "0.6"
url = "2.3"
waku-sys = { version = "0.3.0", path = "../waku-sys" }
waku-sys = { version = "0.4.0", path = "../waku-sys" }
libc = "0.2"

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions waku-bindings/src/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn waku_encode_asymmetric(
let pk = hex::encode(public_key.serialize_uncompressed());
let sk = signing_key
.map(|signing_key| hex::encode(signing_key.secret_bytes()))
.unwrap_or_else(String::new);
.unwrap_or_default();
let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
Expand Down Expand Up @@ -63,7 +63,7 @@ pub fn waku_encode_symmetric(
let symk = hex::encode(symmetric_key.as_slice());
let sk = signing_key
.map(|signing_key| hex::encode(signing_key.secret_bytes()))
.unwrap_or_else(String::new);
.unwrap_or_default();
let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
Expand Down
4 changes: 2 additions & 2 deletions waku-bindings/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! When an event is emitted, this callback will be triggered receiving a [`Signal`]
// std
use std::ffi::{c_char, c_void, CStr};
use std::ffi::{c_char, c_int, c_void, CStr};
use std::ops::Deref;
use std::sync::Mutex;
// crates
Expand Down Expand Up @@ -79,7 +79,7 @@ fn set_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) {

/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`]
/// and executes the [`CALLBACK`] funtion with it
extern "C" fn callback(data: *const c_char, _user_data: *mut c_void) {
extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c_void) {
let raw_response = unsafe { CStr::from_ptr(data) }
.to_str()
.expect("Not null ptr");
Expand Down
79 changes: 68 additions & 11 deletions waku-bindings/src/general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ impl DecodedPayload {
/// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ContentFilter {
pub struct LegacyContentFilter {
/// The content topic of a Waku message
content_topic: WakuContentTopic,
}

impl ContentFilter {
impl LegacyContentFilter {
pub fn new(content_topic: WakuContentTopic) -> Self {
Self { content_topic }
}
Expand All @@ -217,7 +217,6 @@ impl ContentFilter {
/// The criteria to create subscription to a light node in JSON Format
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[deprecated]
#[serde(rename_all = "camelCase")]
pub struct LegacyFilterSubscription {
/// Array of [`ContentFilter`] being subscribed to / unsubscribed from
Expand All @@ -244,20 +243,19 @@ impl LegacyFilterSubscription {
}

/// The criteria to create subscription to a filter full node matching a content filter.
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscription {
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
pub struct ContentFilter {
/// optional if using autosharding, mandatory if using static or named sharding.
pubsub_topic: Option<WakuPubSubTopic>,
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
}

impl FilterSubscription {
impl ContentFilter {
pub fn new(
content_topics: Vec<WakuContentTopic>,
pubsub_topic: Option<WakuPubSubTopic>,
content_topics: Vec<WakuContentTopic>,
) -> Self {
Self {
content_topics,
Expand All @@ -274,14 +272,73 @@ impl FilterSubscription {
}
}

#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionDetail {
#[serde(rename = "peerID")]
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
}

impl FilterSubscriptionDetail {
pub fn new(
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
) -> Self {
Self {
peer_id,
content_topics,
pubsub_topic,
}
}

pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}

pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}

pub fn pubsub_topic(&self) -> &WakuPubSubTopic {
&self.pubsub_topic
}
}

#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionResult {
subscriptions: Vec<FilterSubscriptionDetail>,
error: Option<String>,
}

impl FilterSubscriptionResult {
pub fn new(subscriptions: Vec<FilterSubscriptionDetail>, error: Option<String>) -> Self {
Self {
subscriptions,
error,
}
}

pub fn subscriptions(&self) -> &[FilterSubscriptionDetail] {
&self.subscriptions
}

pub fn error(&self) -> &Option<String> {
&self.error
}
}

/// Criteria used to retrieve historical messages
#[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StoreQuery {
/// The pubsub topic on which messages are published
pub pubsub_topic: Option<WakuPubSubTopic>,
/// Array of [`ContentFilter`] to query for historical messages
pub content_filters: Vec<ContentFilter>,
/// Array of [`WakuContentTopic`] to query for historical messages
pub content_topics: Vec<WakuContentTopic>,
/// The inclusive lower bound on the timestamp of queried messages.
/// This field holds the Unix epoch time in nanoseconds
pub start_time: Option<usize>,
Expand Down
7 changes: 4 additions & 3 deletions waku-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ pub use node::{
};

pub use general::{
ContentFilter, DecodedPayload, Encoding, FilterSubscription, LegacyFilterSubscription,
MessageId, MessageIndex, PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse,
WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
ContentFilter, DecodedPayload, Encoding, FilterSubscriptionDetail, FilterSubscriptionResult,
LegacyContentFilter, LegacyFilterSubscription, MessageId, MessageIndex, PagingOptions, PeerId,
ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage,
WakuMessageVersion, WakuPubSubTopic,
};

pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent};
10 changes: 3 additions & 7 deletions waku-bindings/src/node/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ pub fn waku_dns_discovery(
let url = CString::new(url.to_string())
.expect("CString should build properly from a valid Url")
.into_raw();
let server = CString::new(
server
.map(|host| host.to_string())
.unwrap_or_else(|| "".to_string()),
)
.expect("CString should build properly from a String nameserver")
.into_raw();
let server = CString::new(server.map(|host| host.to_string()).unwrap_or_default())
.expect("CString should build properly from a String nameserver")
.into_raw();

let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
Expand Down
Loading

0 comments on commit d2d7ba6

Please sign in to comment.