Skip to content

Commit

Permalink
pool: event verification cache
Browse files Browse the repository at this point in the history
Event verification is a heavy job, so keep track of verified events to avoid double verification.

Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
  • Loading branch information
yukibtc committed Feb 8, 2025
1 parent bbf133b commit 346f5d0
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
### Added

* nostr: add `EventBuilder::allow_self_tagging` ([Yuki Kishimoto])
* pool: event verification cache ([Yuki Kishimoto])

### Fixed

Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/nostr-relay-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ nip11 = ["nostr/nip11"]
async-utility.workspace = true
async-wsocket = { workspace = true, features = ["socks"] }
atomic-destructor.workspace = true
lru = { version = "0.13", default-features = false }
negentropy = { workspace = true, features = ["std"] }
negentropy-deprecated = { workspace = true, features = ["std"] }
nostr = { workspace = true, features = ["std"] }
Expand Down
13 changes: 11 additions & 2 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,8 +1033,17 @@ impl InnerRelay {

// Check if event exists
if let DatabaseEventStatus::NotExistent = status {
// Verify event
event.verify()?;
// Check if event was already verified
//
// This is useful if someone continue to send the same invalid event:
// since invalid events aren't stored in the database,
// skipping this check would result in the re-verification of the event.
// This may also be useful to avoid double verification if the event is received at the exact same time by many different Relay instances.
//
// This is important since event signature verification is a heavy job!
if !self.state.verified(&event.id)? {
event.verify()?;
}

// Save into database
self.state.database().save_event(&event).await?;
Expand Down
56 changes: 46 additions & 10 deletions crates/nostr-relay-pool/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
// Copyright (c) 2023-2025 Rust Nostr Developers
// Distributed under the MIT software license

use std::collections::hash_map::DefaultHasher;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use lru::LruCache;
use nostr::prelude::IntoNostrSigner;
use nostr::NostrSigner;
use nostr::{EventId, NostrSigner};
use nostr_database::{IntoNostrDatabase, MemoryDatabase, NostrDatabase};
use tokio::sync::RwLock;

Expand All @@ -16,9 +20,12 @@ use crate::transport::websocket::{
};
use crate::{RelayFiltering, RelayFilteringMode};

const MAX_VERIFICATION_CACHE_SIZE: usize = 4_000_000;

#[derive(Debug)]
pub enum SharedStateError {
SignerNotConfigured,
MutexPoisoned,
}

impl std::error::Error for SharedStateError {}
Expand All @@ -27,6 +34,7 @@ impl fmt::Display for SharedStateError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SignerNotConfigured => write!(f, "signer not configured"),
Self::MutexPoisoned => write!(f, "mutex poisoned"),
}
}
}
Expand All @@ -40,19 +48,20 @@ pub struct SharedState {
nip42_auto_authentication: Arc<AtomicBool>,
min_pow_difficulty: Arc<AtomicU8>,
pub(crate) filtering: RelayFiltering,
verification_cache: Arc<Mutex<LruCache<u64, ()>>>,
// TODO: add a semaphore to limit number of concurrent websocket connections attempts?
}

impl Default for SharedState {
fn default() -> Self {
Self {
database: MemoryDatabase::new().into_nostr_database(),
transport: DefaultWebsocketTransport.into_transport(),
signer: Arc::new(RwLock::new(None)),
nip42_auto_authentication: Arc::new(AtomicBool::new(true)),
min_pow_difficulty: Arc::new(AtomicU8::new(0)),
filtering: RelayFiltering::default(),
}
Self::new(
MemoryDatabase::new().into_nostr_database(),
DefaultWebsocketTransport.into_transport(),
None,
RelayFilteringMode::default(),
true,
0,
)
}
}

Expand All @@ -65,13 +74,18 @@ impl SharedState {
nip42_auto_authentication: bool,
min_pow_difficulty: u8,
) -> Self {
let max_verification_cache_size: NonZeroUsize =
NonZeroUsize::new(MAX_VERIFICATION_CACHE_SIZE)
.expect("MAX_VERIFICATION_CACHE_SIZE must be greater than 0");

Self {
database,
transport,
signer: Arc::new(RwLock::new(signer)),
nip42_auto_authentication: Arc::new(AtomicBool::new(nip42_auto_authentication)),
filtering: RelayFiltering::new(filtering_mode),
min_pow_difficulty: Arc::new(AtomicU8::new(min_pow_difficulty)),
verification_cache: Arc::new(Mutex::new(LruCache::new(max_verification_cache_size))),
}
}

Expand Down Expand Up @@ -153,4 +167,26 @@ impl SharedState {
pub fn filtering(&self) -> &RelayFiltering {
&self.filtering
}

pub(crate) fn verified(&self, id: &EventId) -> Result<bool, SharedStateError> {
let mut cache = self
.verification_cache
.lock()
.map_err(|_| SharedStateError::MutexPoisoned)?;

// Hash event ID
let id: u64 = hash(&id);

// Returns `Some(T)` if the key already exists
Ok(cache.put(id, ()).is_some())
}
}

fn hash<T>(val: &T) -> u64
where
T: Hash,
{
let mut hasher: DefaultHasher = DefaultHasher::new();
val.hash(&mut hasher);
hasher.finish()
}

0 comments on commit 346f5d0

Please sign in to comment.