Skip to content

Commit

Permalink
feat: import and subscribe to a document in a single call (#2303)
Browse files Browse the repository at this point in the history
## Description

This adds the much-needed `import_and_subscribe` call to make live sync
events more reliable.

Did not test much yet but I think it should work as expected! 

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
Frando authored May 18, 2024
1 parent fc73502 commit 370075c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
35 changes: 32 additions & 3 deletions iroh/src/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash};
use iroh_docs::{
actor::OpenState,
store::{DownloadPolicy, Query},
AuthorId, CapabilityKind, ContentStatus, DocTicket, NamespaceId, PeerIdBytes, RecordIdentifier,
AuthorId, Capability, CapabilityKind, ContentStatus, DocTicket, NamespaceId, PeerIdBytes,
RecordIdentifier,
};
use iroh_net::NodeAddr;
use portable_atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -63,13 +64,41 @@ where
Ok(())
}

/// Import a document from a namespace capability.
///
/// This does not start sync automatically. Use [`Doc::start_sync`] to start sync.
pub async fn import_namespace(&self, capability: Capability) -> Result<Doc<C>> {
let res = self.rpc.rpc(DocImportRequest { capability }).await??;
let doc = Doc::new(self.rpc.clone(), res.doc_id);
Ok(doc)
}

/// Import a document from a ticket and join all peers in the ticket.
pub async fn import(&self, ticket: DocTicket) -> Result<Doc<C>> {
let res = self.rpc.rpc(DocImportRequest(ticket)).await??;
let doc = Doc::new(self.rpc.clone(), res.doc_id);
let DocTicket { capability, nodes } = ticket;
let doc = self.import_namespace(capability).await?;
doc.start_sync(nodes).await?;
Ok(doc)
}

/// Import a document from a ticket, create a subscription stream and join all peers in the ticket.
///
/// Returns the [`Doc`] and a [`Stream`] of [`LiveEvent`]s
///
/// The subscription stream is created before the sync is started, so the first call to this
/// method after starting the node is guaranteed to not miss any sync events.
pub async fn import_and_subscribe(
&self,
ticket: DocTicket,
) -> Result<(Doc<C>, impl Stream<Item = anyhow::Result<LiveEvent>>)> {
let DocTicket { capability, nodes } = ticket;
let res = self.rpc.rpc(DocImportRequest { capability }).await??;
let doc = Doc::new(self.rpc.clone(), res.doc_id);
let events = doc.subscribe().await?;
doc.start_sync(nodes).await?;
Ok((doc, events))
}

/// List all documents.
pub async fn list(&self) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>>> {
let stream = self.rpc.server_streaming(DocListRequest {}).await?;
Expand Down
6 changes: 1 addition & 5 deletions iroh/src/docs_engine/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,9 @@ impl Engine {
}

pub async fn doc_import(&self, req: DocImportRequest) -> RpcResult<DocImportResponse> {
let DocImportRequest(DocTicket {
capability,
nodes: peers,
}) = req;
let DocImportRequest { capability } = req;
let doc_id = self.sync.import_namespace(capability).await?;
self.sync.open(doc_id, Default::default()).await?;
self.start_sync(doc_id, peers).await?;
Ok(DocImportResponse { doc_id })
}

Expand Down
10 changes: 7 additions & 3 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use iroh_net::{
use iroh_docs::{
actor::OpenState,
store::{DownloadPolicy, Query},
Author, AuthorId, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes, SignedEntry,
Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
SignedEntry,
};
use quic_rpc::{
message::{BidiStreaming, BidiStreamingMsg, Msg, RpcMsg, ServerStreaming, ServerStreamingMsg},
Expand Down Expand Up @@ -548,9 +549,12 @@ pub struct DocCreateResponse {
pub id: NamespaceId,
}

/// Import a document from a ticket.
/// Import a document from a capability.
#[derive(Serialize, Deserialize, Debug)]
pub struct DocImportRequest(pub DocTicket);
pub struct DocImportRequest {
/// The namespace capability.
pub capability: Capability,
}

impl RpcMsg<RpcService> for DocImportRequest {
type Response = RpcResult<DocImportResponse>;
Expand Down

0 comments on commit 370075c

Please sign in to comment.