-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iroh-willow): Event subscriptions #2682
Conversation
Documentation for this PR has been generated and is available at: https://n0-computer.github.io/iroh/pr/2682/docs/iroh/ Last updated: 2024-09-09T17:31:26Z |
0a93a71
to
de494dd
Compare
de494dd
to
a2d8316
Compare
I think I've played with this enough in tauri-todos to confirm the subscription bits are working. Still need to take a look at the actual code, but honestly, that'll take a bit until I warm up to the codebase generally. |
iroh-willow/src/engine/actor.rs
Outdated
self.tasks.spawn_local(async move { | ||
let mut stream = store.entries().subscribe_area(namespace, area, params); | ||
while let Some(event) = stream.next().await { | ||
if sender.send(event).await.is_err() { | ||
break; | ||
} | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of forwarding of events seems slightly weird to me. Although I guess it's either this or more invasive changes to the trait Storage
/trait EntryStorage
(allowing you to just give it a sender instead of it returning a stream).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this is not ideal. We do similar things already currently in the RPC boundary for old iroh-docs.
Maybe it makes sense for the subscribe methods on the store trait to take in a sender instead of returning a stream.
This brings up the question though which trait it should be generic over: Sink? ufotofu::Consumer? a custom trait that has only async fn send(item: Event)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment for now.
@@ -93,6 +95,7 @@ impl AreaExt for Area { | |||
/// A single point in the 3D range space. | |||
/// | |||
/// I.e. an entry. | |||
// TODO: Upstream to willow-rs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes these are really good TODOs. Also need this on PathExt
and EntryExt
in the data_model.rs
file.
@@ -132,3 +136,87 @@ async fn spaces_smoke() -> Result<()> { | |||
|
|||
Ok(()) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test fails on this branch, but works in the willow
branch.
It's waiting on syncs.complete_all().await
, which never completes (neither with SessionMode::ReconcileOnce
nor SessionMode::Continuous
).
The test succeeds if we share the ticket using RestrictArea::None
instead of RestrictArea::Restrict(...)
.
#[tokio::test] | |
async fn test_regression_restricted_area_sync() -> testresult::TestResult { | |
iroh_test::logging::setup_multithreaded(); | |
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); | |
let (alfie_addr, alfie) = spawn_node().await; | |
let (betty_addr, betty) = spawn_node().await; | |
info!("alfie is {}", alfie_addr.node_id.fmt_short()); | |
info!("betty is {}", betty_addr.node_id.fmt_short()); | |
let alfie_user = alfie.spaces().create_user().await?; | |
let betty_user = betty.spaces().create_user().await?; | |
let alfie_space = alfie | |
.spaces() | |
.create(NamespaceKind::Owned, alfie_user) | |
.await?; | |
let space_ticket = alfie_space | |
.share( | |
betty_user, | |
AccessMode::Write, | |
// RestrictArea::None, // succeeds with this | |
RestrictArea::Restrict(Area::new_subspace(betty_user)), | |
) | |
.await?; | |
let (betty_space, syncs) = betty | |
.spaces() | |
.import_and_sync(space_ticket, SessionMode::ReconcileOnce) | |
.await?; | |
let completion = tokio::time::timeout(TIMEOUT, syncs.complete_all()).await?; | |
println!("Completed syncs: {completion:#?}"); | |
let stream = betty_space.get_many(Range3d::new_full()).await?; | |
let entries: Vec<_> = stream.try_collect().await?; | |
println!("{entries:#?}"); | |
Ok(()) | |
} | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also seen Space::sync_once
just hang when two nodes try to sync that have never exchanged capabilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the test failed to pass because this branch was missing a commit from main: #2683
Or at least the test passes just fine for me after rebasing this branch on willow!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also seen Space::sync_once just hang when two nodes try to sync that have never exchanged capabilities.
I think this is currently expected behavior: PAI waits forever for matching capabilities to be submitted. We need to close our side after we have submitted everything I think. Aljoscha made some recent modifications to WGPS to support this properly (I still have to check them out)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test passes reliably for me.
a2d8316
to
46c386c
Compare
Description
EntryStorage
traitNotes & open questions
Should look into remaining differences between our trait and willow-rs with regard to event subscriptions, and see if anything from here should be proposed for change in the willow-rs trait. Notably:
subscribe_area
takesSubscribeOpts
notQueryOpts
, with option to ignore ingested event for some session id, and to receive only Ingested event. This lets me replace the previous impl easily in the WGPS impl, and it makes sense IMO.Has a few TODOs on optimizing things.
Will need much more tests.
Replaces #2681 (had wrong branch)
Change checklist