Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-willow): Event subscriptions #2681

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions iroh-willow/src/engine/actor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, thread::JoinHandle};

use anyhow::Result;
use futures_lite::stream::Stream;
use futures_lite::{stream::Stream, StreamExt};
use iroh_base::key::NodeId;
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -16,14 +16,16 @@ use crate::{
net::ConnHandle,
proto::{
data_model::{AuthorisedEntry, Path, SubspaceId},
grouping::Range3d,
grouping::{Area, Range3d},
keys::{NamespaceId, NamespaceKind, UserId, UserSecretKey},
meadowcap::{self, AccessMode},
},
session::{intents::Intent, run_session, Error, EventSender, SessionHandle},
store::{
entry::EntryOrigin,
traits::{EntryReader, SecretStorage, Storage},
traits::{
EntryOrigin, EntryReader, EntryStorage, SecretStorage, Storage, StoreEvent,
SubscribeParams,
},
Store,
},
};
Expand Down Expand Up @@ -212,6 +214,42 @@ impl ActorHandle {
reply_rx.await?;
Ok(())
}

pub async fn subscribe_area(
&self,
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
) -> Result<()> {
self.send(Input::SubscribeArea {
namespace,
area,
params,
sender,
})
.await?;
Ok(())
}

pub async fn resume_subscription(
&self,
progress_id: u64,
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
) -> Result<()> {
self.send(Input::ResumeSubscription {
progress_id,
namespace,
area,
params,
sender,
})
.await?;
Ok(())
}
}

impl Drop for ActorHandle {
Expand Down Expand Up @@ -300,6 +338,19 @@ pub enum Input {
#[debug(skip)]
reply: Option<oneshot::Sender<()>>,
},
SubscribeArea {
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
},
ResumeSubscription {
progress_id: u64,
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -436,7 +487,7 @@ impl<S: Storage> Actor<S> {
origin,
reply,
} => {
let res = self.store.entries().ingest(&authorised_entry, origin);
let res = self.store.entries().ingest_entry(&authorised_entry, origin);
send_reply(reply, res)
}
Input::InsertEntry { entry, auth, reply } => {
Expand Down Expand Up @@ -480,6 +531,44 @@ impl<S: Storage> Actor<S> {
let res = self.store.auth().resolve_interests(interests);
send_reply(reply, res.map_err(anyhow::Error::from))
}
Input::SubscribeArea {
namespace,
area,
params,
sender,
} => {
let store = self.store.clone();
self.tasks.spawn_local(async move {
let mut stream = store.entries().subscribe_area(namespace, area, params);
while let Some(event) = stream.next().await {
if let Err(_) = sender.send(event).await {
break;
}
}
});
Ok(())
}
Input::ResumeSubscription {
progress_id,
namespace,
area,
params,
sender,
} => {
let store = self.store.clone();
self.tasks.spawn_local(async move {
let mut stream =
store
.entries()
.resume_subscription(progress_id, namespace, area, params);
while let Some(event) = stream.next().await {
if let Err(_) = sender.send(event).await {
break;
}
}
});
Ok(())
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions iroh-willow/src/proto/grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub type AreaOfInterest = willow_data_model::grouping::AreaOfInterest<
>;

/// Extension methods for [`AreaOfInterest`].
// TODO: Upstream to willow-rs as methods on [`AreaOfInterest].
pub trait AreaOfInterestExt {
/// Creates a new area of interest with the specified area and no other limits.
fn with_area(area: Area) -> AreaOfInterest;
Expand All @@ -53,6 +54,7 @@ impl AreaOfInterestExt for AreaOfInterest {
}

/// Extension methods for [`Area`].
// TODO: Upstream to willow-rs as methods on [`Area`].
pub trait AreaExt {
/// Returns `true` if the area contains `point`.
fn includes_point(&self, point: &Point) -> bool;
Expand Down Expand Up @@ -93,6 +95,7 @@ impl AreaExt for Area {
/// A single point in the 3D range space.
///
/// I.e. an entry.
// TODO: Upstream to willow-rs.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct Point {
#[serde(with = "data_model::serde_encoding::path")]
Expand Down
47 changes: 24 additions & 23 deletions iroh-willow/src/session/data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures_lite::StreamExt;
use tokio::sync::broadcast;

use crate::{
proto::{
Expand All @@ -8,8 +7,7 @@ use crate::{
},
session::{channels::ChannelSenders, static_tokens::StaticTokens, Error, SessionId},
store::{
entry::{EntryChannel, EntryOrigin},
traits::Storage,
traits::{EntryOrigin, EntryStorage, Storage, StoreEvent, SubscribeParams},
Store,
},
util::stream::CancelableReceiver,
Expand Down Expand Up @@ -51,29 +49,36 @@ impl<S: Storage> DataSender<S> {
}
}
pub async fn run(mut self) -> Result<(), Error> {
let mut entry_stream = self.store.entries().subscribe(self.session_id);
let mut entry_stream = futures_concurrency::stream::StreamGroup::new();
loop {
tokio::select! {
input = self.inbox.next() => {
let Some(input) = input else {
break;
};
let Input::AoiIntersection(intersection) = input;
self.store.entries().watch_area(
self.session_id,
intersection.namespace,
intersection.intersection.area.clone(),
);
let params = SubscribeParams::default().ingest_only().ignore_remote(self.session_id);
// TODO: We could start at the progress id at the beginning of the session.
let stream = self
.store
.entries()
.subscribe_area(
intersection.namespace,
intersection.intersection.area.clone(),
params,
)
.filter_map(|event| match event {
StoreEvent::Ingested(_id, entry, _origin) => Some(entry),
// We get only Ingested events because we set ingest_only() param above.
_ => unreachable!("expected only Ingested event but got another event"),
});
entry_stream.insert(stream);
},
entry = entry_stream.recv() => {
entry = entry_stream.next(), if !entry_stream.is_empty() => {
match entry {
Ok(entry) => self.send_entry(entry).await?,
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_count)) => {
// TODO: Queue another reconciliation
}
Some(entry) => self.send_entry(entry).await?,
None => break,
}

}
}
}
Expand Down Expand Up @@ -149,13 +154,9 @@ impl<S: Storage> DataReceiver<S> {
message.dynamic_token,
)
.await?;
self.store.entries().ingest(
&authorised_entry,
EntryOrigin::Remote {
session: self.session_id,
channel: EntryChannel::Data,
},
)?;
self.store
.entries()
.ingest_entry(&authorised_entry, EntryOrigin::Remote(self.session_id))?;
let (entry, _token) = authorised_entry.into_parts();
// TODO: handle offset
self.current_payload.set(
Expand Down
10 changes: 3 additions & 7 deletions iroh-willow/src/session/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use crate::{
Error, Role, SessionId,
},
store::{
entry::{EntryChannel, EntryOrigin},
traits::{EntryReader, EntryStorage, SplitAction, SplitOpts, Storage},
traits::{EntryOrigin, EntryReader, EntryStorage, SplitAction, SplitOpts, Storage},
Store,
},
util::{
Expand Down Expand Up @@ -164,12 +163,9 @@ impl<S: Storage> Reconciler<S> {
authorised_entry.entry().payload_length(),
message.entry.available,
)?;
self.shared.store.entries().ingest(
self.shared.store.entries().ingest_entry(
&authorised_entry,
EntryOrigin::Remote {
session: self.shared.session_id,
channel: EntryChannel::Reconciliation,
},
EntryOrigin::Remote(self.shared.session_id),
)?;
}
ReconciliationMessage::SendPayload(message) => {
Expand Down
3 changes: 0 additions & 3 deletions iroh-willow/src/session/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,6 @@ pub(crate) async fn run_session<S: Storage>(
.try_join()
.await;

// Unsubscribe from the store.
store.entries().unsubscribe(&session_id);

// Track if we closed the session by triggering the cancel token, or if the remote peer closed
// the session by closing the control channel.
let we_cancelled = close_session_token.is_cancelled();
Expand Down
27 changes: 13 additions & 14 deletions iroh-willow/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use anyhow::{anyhow, Context, Result};
use rand_core::CryptoRngCore;
use traits::EntryStorage;

use crate::{
form::{AuthForm, EntryForm, EntryOrForm, SubspaceForm, TimestampForm},
Expand All @@ -22,42 +23,40 @@ use crate::{
use self::auth::{Auth, AuthError};
use self::traits::Storage;

pub(crate) use self::entry::{EntryOrigin, WatchableEntryStore};
pub(crate) use self::traits::EntryOrigin;

pub(crate) mod auth;
pub(crate) mod entry;
pub mod memory;
pub mod traits;

/// Storage for the Willow engine.
///
/// Wraps a `Storage` instance and adds the [`Auth`] struct that uses the secret and caps storage to provide
/// authentication when inserting entries.
#[derive(Debug, Clone)]
pub(crate) struct Store<S: Storage> {
entries: WatchableEntryStore<S::Entries>,
secrets: S::Secrets,
payloads: S::Payloads,
storage: S,
auth: Auth<S>,
}

impl<S: Storage> Store<S> {
pub fn new(storage: S) -> Self {
Self {
entries: WatchableEntryStore::new(storage.entries().clone()),
secrets: storage.secrets().clone(),
payloads: storage.payloads().clone(),
auth: Auth::new(storage.secrets().clone(), storage.caps().clone()),
storage,
}
}

pub fn entries(&self) -> &WatchableEntryStore<S::Entries> {
&self.entries
pub fn entries(&self) -> &S::Entries {
self.storage.entries()
}

pub fn secrets(&self) -> &S::Secrets {
&self.secrets
self.storage.secrets()
}

pub fn payloads(&self) -> &S::Payloads {
&self.payloads
self.storage.payloads()
}

pub fn auth(&self) -> &Auth<S> {
Expand Down Expand Up @@ -97,7 +96,7 @@ impl<S: Storage> Store<S> {
let authorised_entry = AuthorisedEntry::new_unchecked(entry, token);
let inserted = self
.entries()
.ingest(&authorised_entry, EntryOrigin::Local)?;
.ingest_entry(&authorised_entry, EntryOrigin::Local)?;
Ok((authorised_entry, inserted))
}

Expand All @@ -118,7 +117,7 @@ impl<S: Storage> Store<S> {
/// the provided [`Store`].
///
/// `user_id` must be set to the user who is authenticating the entry.
pub async fn form_to_entry(
async fn form_to_entry(
&self,
form: EntryForm,
user_id: UserId, // auth: AuthForm,
Expand Down
3 changes: 1 addition & 2 deletions iroh-willow/src/store/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ impl<S: Storage> Auth<S> {
} else {
None
};
let pack =
CapabilityPack::Read(ReadAuthorisation::new(new_read_cap, new_subspace_cap));
let pack = CapabilityPack::Read(ReadAuthorisation::new(new_read_cap, new_subspace_cap));
Ok(pack)
}

Expand Down
Loading
Loading