Skip to content

Commit

Permalink
Improve Nostr filters.
Browse files Browse the repository at this point in the history
- support multiple filters in the same REQ
- filter by `since` and `until`
- filter by `limit`
  • Loading branch information
ibz committed Dec 1, 2024
1 parent 97afe8a commit eef019c
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 120 deletions.
156 changes: 69 additions & 87 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,19 @@ async fn handle_websocket(
mut ws: WebSocketConnection,
) -> tide::Result<()> {
while let Some(Ok(Message::Text(message))) = async_std::stream::StreamExt::next(&mut ws).await {
let parsed: nostr::Message = serde_json::from_str(&message).unwrap();
match parsed {
nostr::Message::Event(cmd) => {
log::debug!("WS RECV: {}", message);
let nostr_message = nostr::Message::from_str(&message);
if nostr_message.is_err() {
log::warn!("Cannot parse: {}", message);
continue;
}
match nostr_message.unwrap() {
nostr::Message::Event { event } => {
{
if let Some(site) = get_site(&request) {
if let Some(site_pubkey) = site.config.pubkey {
if cmd.event.pubkey != site_pubkey {
log::info!(
"Ignoring event for unknown pubkey: {}.",
cmd.event.pubkey
);
if event.pubkey != site_pubkey {
log::info!("Ignoring event for unknown pubkey: {}.", event.pubkey);
continue;
}
} else {
Expand All @@ -151,92 +153,72 @@ async fn handle_websocket(
}
}

if cmd.event.validate_sig().is_err() {
if event.validate_sig().is_err() {
log::info!("Ignoring invalid event.");
continue;
}

if cmd.event.kind == nostr::EVENT_KIND_DELETE {
let post_removed: bool;
if let Some(site) = get_site(&request) {
post_removed = site.remove_content(&cmd.event);
if let Some(site) = get_site(&request) {
if event.kind == nostr::EVENT_KIND_DELETE {
let post_removed = site.remove_content(&event);
log::info!(
"Incoming DELETE event: {}. status: {}",
event.id,
post_removed
);
ws.send_json(&json!(vec![
serde_json::Value::String("OK".to_string()),
serde_json::Value::String(event.id.to_string()),
serde_json::Value::Bool(post_removed),
serde_json::Value::String("".to_string())
]))
.await
.unwrap();
} else {
return Ok(());
site.add_content(&event);
log::info!("Incoming event: {}.", event.id);
ws.send_json(&json!(vec![
serde_json::Value::String("OK".to_string()),
serde_json::Value::String(event.id.to_string()),
serde_json::Value::Bool(true),
serde_json::Value::String("".to_string())
]))
.await
.unwrap();
}
log::info!(
"Incoming DELETE event: {}. status: {}",
cmd.event.id,
post_removed
);
ws.send_json(&json!(vec![
serde_json::Value::String("OK".to_string()),
serde_json::Value::String(cmd.event.id.to_string()),
serde_json::Value::Bool(post_removed),
serde_json::Value::String("".to_string())
]))
.await
.unwrap();
} else {
if let Some(site) = get_site(&request) {
site.add_content(&cmd.event);
} else {
return Ok(());
}
log::info!("Incoming event: {}.", cmd.event.id);
ws.send_json(&json!(vec![
serde_json::Value::String("OK".to_string()),
serde_json::Value::String(cmd.event.id.to_string()),
serde_json::Value::Bool(true),
serde_json::Value::String("".to_string())
]))
.await
.unwrap();
return Ok(());
}
}
nostr::Message::Req(cmd) => {
let mut events: Vec<nostr::Event> = vec![];
let mut filter_by_authors = false;
let mut filter_authors: Vec<String> = vec![];
let mut filter_by_kinds = false;
let mut filter_kinds: Vec<i64> = vec![];
for (filter_by, filter) in &cmd.filter.extra {
if filter_by == "authors" {
filter_by_authors = true;
filter_authors = filter
.as_array()
.unwrap()
.iter()
.map(|f| f.as_str().unwrap().to_string())
.collect();
} else if filter_by == "kinds" {
filter_by_kinds = true;
filter_kinds = filter
.as_array()
.unwrap()
.iter()
.map(|f| f.as_i64().unwrap())
.collect();
} else {
log::info!("Ignoring unknown filter: {}.", filter_by);
continue;
}
}
nostr::Message::Req { sub_id, filters } => {
let mut events: Vec<nostr::Event> = vec![]; // Hashmap? (unique)

if let Some(site) = get_site(&request) {
let matches_site_author =
!filter_by_authors || filter_authors.contains(&site.config.pubkey.unwrap());
if matches_site_author {
for event_ref in site.events.read().unwrap().values() {
let matches_kind =
!filter_by_kinds || filter_kinds.contains(&event_ref.kind);
if matches_kind {
if let Some((front_matter, content)) = event_ref.read() {
if let Some(event) = nostr::parse_event(&front_matter, &content)
{
let matches_author = !filter_by_authors
|| filter_authors.contains(&event.pubkey);
if matches_author {
events.push(event);
let site_pubkey = site.config.pubkey.unwrap();
for filter in filters.iter() {
for (k, _) in &filter.extra {
log::warn!("Ignoring unknown filter: {}.", k);
}

log::info!("Requested filter: {}", filter);

if filter.matches_author(&site_pubkey) {
for event_ref in site.events.read().unwrap().values() {
if filter.matches_kind(&event_ref.kind)
&& filter.matches_time(&event_ref.created_at)
{
if let Some((front_matter, content)) = event_ref.read() {
if let Some(event) =
nostr::parse_event(&front_matter, &content)
{
if filter.matches_author(&event.pubkey) {
events.push(event);
if let Some(limit) = filter.limit {
if events.len() >= limit {
break;
}
}
}
}
}
}
Expand All @@ -250,25 +232,25 @@ async fn handle_websocket(
for event in &events {
ws.send_json(&json!([
serde_json::Value::String("EVENT".to_string()),
serde_json::Value::String(cmd.subscription_id.to_string()),
serde_json::Value::String(sub_id.to_string()),
event.to_json(),
]))
.await
.unwrap();
}
ws.send_json(&json!(vec!["EOSE", &cmd.subscription_id.to_string()]))
ws.send_json(&json!(vec!["EOSE", &sub_id.to_string()]))
.await
.unwrap();
log::info!(
"Sent {} events back for subscription {}.",
events.len(),
cmd.subscription_id
sub_id
);
// TODO: At this point we should save the subscription and notify this client later if other posts appear.
// For that, we probably need to introduce a dispatcher thread.
// See: https://stackoverflow.com/questions/35673702/chat-using-rust-websocket/35785414#35785414
}
nostr::Message::Close(_cmd) => {
nostr::Message::Close { .. } => {
// Nothing to do here, since we don't actually store subscriptions!
}
}
Expand Down
Loading

0 comments on commit eef019c

Please sign in to comment.