From b62e90409f43aa06cbfb1f45a2ee2f9ae2af77de Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 10 May 2024 20:41:23 +0200 Subject: [PATCH] feat(iroh-net): Own the public QUIC API (#2279) ## Description This export a lot of quinn types directly from iroh-net::magic_endpoint. These are all the types we need to interact with iroh-net in our own code. The goal is that users should not need to figure out how to add their own (iroh-)quinn dependency to use iroh-net, instead all types should be provided by iroh-net. Not all APIs are re-exported however, to avoid exporting too much. Hopefully what iroh itself needs is a reasonable indication of what is needed, we can always add more. ## Breaking Changes ## Notes & open questions - This ducks re-writing the iroh-blobs examples. - This leaves quinn as a dependency of iroh itself, this is still needed because quic-rpc likewise does not provide all the needed types in it's public API. We should probably do a similar effort there. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- Cargo.lock | 4 --- iroh-blobs/Cargo.toml | 8 ++++- iroh-blobs/src/downloader.rs | 4 +-- iroh-blobs/src/downloader/get.rs | 11 +++++-- iroh-blobs/src/get.rs | 55 +++++++++++++++++--------------- iroh-blobs/src/get/db.rs | 11 +++---- iroh-blobs/src/get/error.rs | 47 +++++++++++++++------------ iroh-blobs/src/get/request.rs | 9 +++--- iroh-blobs/src/protocol.rs | 11 ++++--- iroh-blobs/src/provider.rs | 12 +++---- iroh-cli/Cargo.toml | 1 - iroh-cli/src/commands/doctor.rs | 24 +++++++------- iroh-docs/Cargo.toml | 3 +- iroh-gossip/Cargo.toml | 3 +- iroh-gossip/src/net.rs | 16 ++++++---- iroh-net/bench/Cargo.toml | 1 - iroh-net/bench/src/bin/bulk.rs | 9 ++++-- iroh-net/bench/src/lib.rs | 11 ++++--- iroh-net/src/magic_endpoint.rs | 6 ++-- 19 files changed, 131 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c760c4a5d2..e925ef3be4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2515,7 +2515,6 @@ dependencies = [ "indicatif", "iroh", "iroh-metrics", - "iroh-quinn", "nix 0.27.1", "parking_lot", "pkarr", @@ -2606,7 +2605,6 @@ dependencies = [ "iroh-blake3", "iroh-metrics", "iroh-net", - "iroh-quinn", "iroh-test", "lru", "num_enum", @@ -2645,7 +2643,6 @@ dependencies = [ "iroh-blake3", "iroh-metrics", "iroh-net", - "iroh-quinn", "iroh-test", "postcard", "rand", @@ -2789,7 +2786,6 @@ dependencies = [ "clap", "hdrhistogram", "iroh-net", - "iroh-quinn", "tokio", "tracing", "tracing-subscriber", diff --git a/iroh-blobs/Cargo.toml b/iroh-blobs/Cargo.toml index aa4fba33c8..a66aa98ed8 100644 --- a/iroh-blobs/Cargo.toml +++ b/iroh-blobs/Cargo.toml @@ -34,7 +34,6 @@ iroh-net = { version = "0.15.0", path = "../iroh-net" } num_cpus = "1.15.0" parking_lot = { version = "0.12.1", optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quinn = { package = "iroh-quinn", version = "0.10" } rand = "0.8" range-collections = "0.4.0" redb = { version = "2.0.0", optional = true } @@ -65,6 +64,13 @@ rustls = { version = "0.21.11", default-features = false, features = ["quic"] } tempfile = "3.10.0" futures-util = "0.3.30" +[dev-dependencies.quinn] +# This allows writing the examples without relying on iroh-net. +# Though as they still depend on iroh-quinn this is perhaps not very +# useful right now. Changing them is a bit more work however. +package = "iroh-quinn" +version = "0.10" + [features] default = ["fs-store"] downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] diff --git a/iroh-blobs/src/downloader.rs b/iroh-blobs/src/downloader.rs index 481a2dc304..dc788195f4 100644 --- a/iroh-blobs/src/downloader.rs +++ b/iroh-blobs/src/downloader.rs @@ -40,7 +40,7 @@ use std::{ use futures_lite::{future::BoxedLocal, Stream, StreamExt}; use hashlink::LinkedHashSet; use iroh_base::hash::{BlobFormat, Hash, HashAndFormat}; -use iroh_net::{MagicEndpoint, NodeAddr, NodeId}; +use iroh_net::{magic_endpoint, MagicEndpoint, NodeAddr, NodeId}; use tokio::{ sync::{mpsc, oneshot}, task::JoinSet, @@ -1452,7 +1452,7 @@ impl Queue { } impl Dialer for iroh_net::dialer::Dialer { - type Connection = quinn::Connection; + type Connection = magic_endpoint::Connection; fn queue_dial(&mut self, node_id: NodeId) { self.queue_dial(node_id, crate::protocol::ALPN) diff --git a/iroh-blobs/src/downloader/get.rs b/iroh-blobs/src/downloader/get.rs index 6cf4edbcd8..bd7cb04931 100644 --- a/iroh-blobs/src/downloader/get.rs +++ b/iroh-blobs/src/downloader/get.rs @@ -1,4 +1,6 @@ -//! [`Getter`] implementation that performs requests over [`quinn::Connection`]s. +//! [`Getter`] implementation that performs requests over [`Connection`]s. +//! +//! [`Connection`]: iroh_net::magic_endpoint::Connection use crate::{ get::{db::get_to_db, error::GetError}, @@ -7,6 +9,7 @@ use crate::{ use futures_lite::FutureExt; #[cfg(feature = "metrics")] use iroh_metrics::{inc, inc_by}; +use iroh_net::magic_endpoint; #[cfg(feature = "metrics")] use crate::metrics::Metrics; @@ -27,13 +30,15 @@ impl From for FailureAction { } } -/// [`Getter`] implementation that performs requests over [`quinn::Connection`]s. +/// [`Getter`] implementation that performs requests over [`Connection`]s. +/// +/// [`Connection`]: iroh_net::magic_endpoint::Connection pub(crate) struct IoGetter { pub store: S, } impl Getter for IoGetter { - type Connection = quinn::Connection; + type Connection = magic_endpoint::Connection; fn get( &mut self, diff --git a/iroh-blobs/src/get.rs b/iroh-blobs/src/get.rs index 9975007d39..cf9378355a 100644 --- a/iroh-blobs/src/get.rs +++ b/iroh-blobs/src/get.rs @@ -20,7 +20,7 @@ use crate::Hash; use anyhow::Result; use bao_tree::io::fsm::BaoContentItem; use bao_tree::ChunkNum; -use quinn::RecvStream; +use iroh_net::magic_endpoint::{self, RecvStream, SendStream}; use serde::{Deserialize, Serialize}; use tracing::{debug, error}; @@ -72,6 +72,7 @@ pub mod fsm { }; use derive_more::From; use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader}; + use iroh_net::magic_endpoint::Connection; use tokio::io::AsyncWriteExt; type WrappedRecvStream = TrackingReader>; @@ -85,7 +86,7 @@ pub mod fsm { } /// The entry point of the get response machine - pub fn start(connection: quinn::Connection, request: GetRequest) -> AtInitial { + pub fn start(connection: Connection, request: GetRequest) -> AtInitial { AtInitial::new(connection, request) } @@ -125,7 +126,7 @@ pub mod fsm { /// Initial state of the get response machine #[derive(Debug)] pub struct AtInitial { - connection: quinn::Connection, + connection: Connection, request: GetRequest, } @@ -134,7 +135,7 @@ pub mod fsm { /// /// `connection` is an existing connection /// `request` is the request to be sent - pub fn new(connection: quinn::Connection, request: GetRequest) -> Self { + pub fn new(connection: Connection, request: GetRequest) -> Self { Self { connection, request, @@ -142,7 +143,7 @@ pub mod fsm { } /// Initiate a new bidi stream to use for the get response - pub async fn next(self) -> Result { + pub async fn next(self) -> Result { let start = Instant::now(); let (writer, reader) = self.connection.open_bi().await?; let reader = TrackingReader::new(TokioStreamReader::new(reader)); @@ -161,7 +162,7 @@ pub mod fsm { pub struct AtConnected { start: Instant, reader: WrappedRecvStream, - writer: TrackingWriter, + writer: TrackingWriter, request: GetRequest, } @@ -185,9 +186,9 @@ pub mod fsm { /// The serialized request is too long to be sent #[error("request too big")] RequestTooBig, - /// Error when writing the request to the [`quinn::SendStream`] + /// Error when writing the request to the [`SendStream`]. #[error("write: {0}")] - Write(#[from] quinn::WriteError), + Write(#[from] magic_endpoint::WriteError), /// A generic io error #[error("io {0}")] Io(io::Error), @@ -196,7 +197,7 @@ pub mod fsm { impl ConnectedNextError { fn from_io(cause: io::Error) -> Self { if let Some(inner) = cause.get_ref() { - if let Some(e) = inner.downcast_ref::() { + if let Some(e) = inner.downcast_ref::() { Self::Write(e.clone()) } else { Self::Io(cause) @@ -295,7 +296,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtStartRoot { ranges: ChunkRanges, - reader: TrackingReader>, + reader: TrackingReader>, misc: Box, hash: Hash, } @@ -304,7 +305,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtStartChild { ranges: ChunkRanges, - reader: TrackingReader>, + reader: TrackingReader>, misc: Box, child_offset: u64, } @@ -379,7 +380,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtBlobHeader { ranges: ChunkRanges, - reader: TrackingReader>, + reader: TrackingReader>, misc: Box, hash: Hash, } @@ -394,7 +395,7 @@ pub mod fsm { NotFound, /// Quinn read error when reading the size header #[error("read: {0}")] - Read(quinn::ReadError), + Read(magic_endpoint::ReadError), /// Generic io error #[error("io: {0}")] Io(io::Error), @@ -420,7 +421,7 @@ pub mod fsm { AtBlobHeaderNextError::NotFound } else if let Some(e) = cause .get_ref() - .and_then(|x| x.downcast_ref::()) + .and_then(|x| x.downcast_ref::()) { AtBlobHeaderNextError::Read(e.clone()) } else { @@ -525,8 +526,8 @@ pub mod fsm { /// decoding the response, e.g. from [`AtBlobContent::next`]. /// /// This is similar to [`bao_tree::io::DecodeError`], but takes into account - /// that we are reading from a [`quinn::RecvStream`], so read errors will be - /// propagated as [`DecodeError::Read`], containing a [`quinn::ReadError`]. + /// that we are reading from a [`RecvStream`], so read errors will be + /// propagated as [`DecodeError::Read`], containing a [`ReadError`]. /// This carries more concrete information about the error than an [`io::Error`]. /// /// When the provider finds that it does not have a chunk that we requested, @@ -541,7 +542,9 @@ pub mod fsm { /// not behaving correctly. /// /// The [`DecodeError::Io`] variant is just a fallback for any other io error that - /// is not actually a [`quinn::ReadError`]. + /// is not actually a [`ReadError`]. + /// + /// [`ReadError`]: magic_endpoint::ReadError #[derive(Debug, thiserror::Error)] pub enum DecodeError { /// A chunk was not found or invalid, so the provider stopped sending data @@ -561,7 +564,7 @@ pub mod fsm { LeafHashMismatch(ChunkNum), /// Error when reading from the stream #[error("read: {0}")] - Read(quinn::ReadError), + Read(magic_endpoint::ReadError), /// A generic io error #[error("io: {0}")] Io(#[from] io::Error), @@ -602,7 +605,7 @@ pub mod fsm { bao_tree::io::DecodeError::LeafHashMismatch(chunk) => Self::LeafHashMismatch(chunk), bao_tree::io::DecodeError::Io(cause) => { if let Some(inner) = cause.get_ref() { - if let Some(e) = inner.downcast_ref::() { + if let Some(e) = inner.downcast_ref::() { Self::Read(e.clone()) } else { Self::Io(cause) @@ -844,7 +847,7 @@ pub mod fsm { } /// Finish the get response, returning statistics - pub async fn next(self) -> result::Result { + pub async fn next(self) -> result::Result { // Shut down the stream let (reader, bytes_read) = self.reader.into_parts(); let mut reader = reader.into_inner(); @@ -881,13 +884,13 @@ pub mod fsm { pub enum GetResponseError { /// Error when opening a stream #[error("connection: {0}")] - Connection(#[from] quinn::ConnectionError), + Connection(#[from] magic_endpoint::ConnectionError), /// Error when writing the handshake or request to the stream #[error("write: {0}")] - Write(#[from] quinn::WriteError), + Write(#[from] magic_endpoint::WriteError), /// Error when reading from the stream #[error("read: {0}")] - Read(#[from] quinn::ReadError), + Read(#[from] magic_endpoint::ReadError), /// Error when decoding, e.g. hash mismatch #[error("decode: {0}")] Decode(bao_tree::io::DecodeError), @@ -908,13 +911,13 @@ impl From for GetResponseError { bao_tree::io::DecodeError::Io(cause) => { // try to downcast to specific quinn errors if let Some(source) = cause.source() { - if let Some(error) = source.downcast_ref::() { + if let Some(error) = source.downcast_ref::() { return Self::Connection(error.clone()); } - if let Some(error) = source.downcast_ref::() { + if let Some(error) = source.downcast_ref::() { return Self::Read(error.clone()); } - if let Some(error) = source.downcast_ref::() { + if let Some(error) = source.downcast_ref::() { return Self::Write(error.clone()); } } diff --git a/iroh-blobs/src/get/db.rs b/iroh-blobs/src/get/db.rs index 85db7d08bd..5fe0a8e796 100644 --- a/iroh-blobs/src/get/db.rs +++ b/iroh-blobs/src/get/db.rs @@ -7,6 +7,7 @@ use std::num::NonZeroU64; use futures_lite::StreamExt; use iroh_base::hash::Hash; use iroh_base::rpc::RpcError; +use iroh_net::magic_endpoint::Connection; use serde::{Deserialize, Serialize}; use crate::hashseq::parse_hash_seq; @@ -44,7 +45,7 @@ use tracing::trace; pub async fn get_to_db< D: BaoStore, C: FnOnce() -> F, - F: Future>, + F: Future>, >( db: &D, get_conn: C, @@ -62,11 +63,7 @@ pub async fn get_to_db< /// /// We need to create our own files and handle the case where an outboard /// is not needed. -async fn get_blob< - D: BaoStore, - C: FnOnce() -> F, - F: Future>, ->( +async fn get_blob F, F: Future>>( db: &D, get_conn: C, hash: &Hash, @@ -305,7 +302,7 @@ async fn blob_infos(db: &D, hash_seq: &[Hash]) -> io::Result F, - F: Future>, + F: Future>, >( db: &D, get_conn: C, diff --git a/iroh-blobs/src/get/error.rs b/iroh-blobs/src/get/error.rs index 06d3d99f9d..fcaae08e22 100644 --- a/iroh-blobs/src/get/error.rs +++ b/iroh-blobs/src/get/error.rs @@ -1,5 +1,7 @@ //! Error returned from get operations +use iroh_net::magic_endpoint; + use crate::util::progress::ProgressSendError; /// Failures for a get operation @@ -33,39 +35,40 @@ impl From for GetError { } } -impl From for GetError { - fn from(value: quinn::ConnectionError) -> Self { +impl From for GetError { + fn from(value: magic_endpoint::ConnectionError) -> Self { // explicit match just to be sure we are taking everything into account + use magic_endpoint::ConnectionError; match value { - e @ quinn::ConnectionError::VersionMismatch => { + e @ ConnectionError::VersionMismatch => { // > The peer doesn't implement any supported version // unsupported version is likely a long time error, so this peer is not usable GetError::NoncompliantNode(e.into()) } - e @ quinn::ConnectionError::TransportError(_) => { + e @ ConnectionError::TransportError(_) => { // > The peer violated the QUIC specification as understood by this implementation // bad peer we don't want to keep around GetError::NoncompliantNode(e.into()) } - e @ quinn::ConnectionError::ConnectionClosed(_) => { + e @ ConnectionError::ConnectionClosed(_) => { // > The peer's QUIC stack aborted the connection automatically // peer might be disconnecting or otherwise unavailable, drop it GetError::Io(e.into()) } - e @ quinn::ConnectionError::ApplicationClosed(_) => { + e @ ConnectionError::ApplicationClosed(_) => { // > The peer closed the connection // peer might be disconnecting or otherwise unavailable, drop it GetError::Io(e.into()) } - e @ quinn::ConnectionError::Reset => { + e @ ConnectionError::Reset => { // > The peer is unable to continue processing this connection, usually due to having restarted GetError::RemoteReset(e.into()) } - e @ quinn::ConnectionError::TimedOut => { + e @ ConnectionError::TimedOut => { // > Communication with the peer has lapsed for longer than the negotiated idle timeout GetError::Io(e.into()) } - e @ quinn::ConnectionError::LocallyClosed => { + e @ ConnectionError::LocallyClosed => { // > The local application closed the connection // TODO(@divma): don't see how this is reachable but let's just not use the peer GetError::Io(e.into()) @@ -74,14 +77,15 @@ impl From for GetError { } } -impl From for GetError { - fn from(value: quinn::ReadError) -> Self { +impl From for GetError { + fn from(value: magic_endpoint::ReadError) -> Self { + use magic_endpoint::ReadError; match value { - e @ quinn::ReadError::Reset(_) => GetError::RemoteReset(e.into()), - quinn::ReadError::ConnectionLost(conn_error) => conn_error.into(), - quinn::ReadError::UnknownStream - | quinn::ReadError::IllegalOrderedRead - | quinn::ReadError::ZeroRttRejected => { + e @ ReadError::Reset(_) => GetError::RemoteReset(e.into()), + ReadError::ConnectionLost(conn_error) => conn_error.into(), + ReadError::UnknownStream + | ReadError::IllegalOrderedRead + | ReadError::ZeroRttRejected => { // all these errors indicate the peer is not usable at this moment GetError::Io(value.into()) } @@ -89,12 +93,13 @@ impl From for GetError { } } -impl From for GetError { - fn from(value: quinn::WriteError) -> Self { +impl From for GetError { + fn from(value: magic_endpoint::WriteError) -> Self { + use magic_endpoint::WriteError; match value { - e @ quinn::WriteError::Stopped(_) => GetError::RemoteReset(e.into()), - quinn::WriteError::ConnectionLost(conn_error) => conn_error.into(), - quinn::WriteError::UnknownStream | quinn::WriteError::ZeroRttRejected => { + e @ WriteError::Stopped(_) => GetError::RemoteReset(e.into()), + WriteError::ConnectionLost(conn_error) => conn_error.into(), + WriteError::UnknownStream | WriteError::ZeroRttRejected => { // all these errors indicate the peer is not usable at this moment GetError::Io(value.into()) } diff --git a/iroh-blobs/src/get/request.rs b/iroh-blobs/src/get/request.rs index a24534ef16..27f2e208d4 100644 --- a/iroh-blobs/src/get/request.rs +++ b/iroh-blobs/src/get/request.rs @@ -8,6 +8,7 @@ use crate::{ }; use bao_tree::{ChunkNum, ChunkRanges}; use bytes::Bytes; +use iroh_net::magic_endpoint::Connection; use rand::Rng; use super::{fsm, Stats}; @@ -17,7 +18,7 @@ use super::{fsm, Stats}; /// This is just reading the size header and then immediately closing the connection. /// It can be used to check if a peer has any data at all. pub async fn get_unverified_size( - connection: &quinn::Connection, + connection: &Connection, hash: &Hash, ) -> anyhow::Result<(u64, Stats)> { let request = GetRequest::new( @@ -40,7 +41,7 @@ pub async fn get_unverified_size( /// This asks for the last chunk of the blob and validates the response. /// Note that this does not validate that the peer has all the data. pub async fn get_verified_size( - connection: &quinn::Connection, + connection: &Connection, hash: &Hash, ) -> anyhow::Result<(u64, Stats)> { tracing::trace!("Getting verified size of {}", hash.to_hex()); @@ -83,7 +84,7 @@ pub async fn get_verified_size( /// /// This can be used to compute the total size when requesting a hash seq. pub async fn get_hash_seq_and_sizes( - connection: &quinn::Connection, + connection: &Connection, hash: &Hash, max_size: u64, ) -> anyhow::Result<(HashSeq, Arc<[u64]>)> { @@ -139,7 +140,7 @@ pub async fn get_hash_seq_and_sizes( /// /// This is used to check if a peer has a specific chunk. pub async fn get_chunk_probe( - connection: &quinn::Connection, + connection: &Connection, hash: &Hash, chunk: ChunkNum, ) -> anyhow::Result { diff --git a/iroh-blobs/src/protocol.rs b/iroh-blobs/src/protocol.rs index 82b90b085a..5579022de0 100644 --- a/iroh-blobs/src/protocol.rs +++ b/iroh-blobs/src/protocol.rs @@ -340,7 +340,7 @@ //! keep a connection open and reuse it for multiple requests. use bao_tree::{ChunkNum, ChunkRanges}; use derive_more::From; -use quinn::VarInt; +use iroh_net::magic_endpoint::VarInt; use serde::{Deserialize, Serialize}; mod range_spec; pub use range_spec::{NonEmptyRequestRangeSpecIter, RangeSpec, RangeSpecSeq}; @@ -427,11 +427,14 @@ impl GetRequest { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[repr(u16)] pub enum Closed { - /// The [`quinn::RecvStream`] was dropped. + /// The [`RecvStream`] was dropped. /// - /// Used implicitly when a [`quinn::RecvStream`] is dropped without explicit call to - /// [`quinn::RecvStream::stop`]. We don't use this explicitly but this is here as + /// Used implicitly when a [`RecvStream`] is dropped without explicit call to + /// [`RecvStream::stop`]. We don't use this explicitly but this is here as /// documentation as to what happened to `0`. + /// + /// [`RecvStream`]: iroh_net::magic_endpoint::RecvStream + /// [`RecvStream::stop`]: iroh_net::magic_endpoint::RecvStream::stop StreamDropped = 0, /// The provider is terminating. /// diff --git a/iroh-blobs/src/provider.rs b/iroh-blobs/src/provider.rs index bdba0315d6..854964ffd4 100644 --- a/iroh-blobs/src/provider.rs +++ b/iroh-blobs/src/provider.rs @@ -11,7 +11,7 @@ use iroh_io::stats::{ SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter, }; use iroh_io::{AsyncSliceReader, AsyncStreamWriter, TokioStreamWriter}; -use iroh_net::magic_endpoint; +use iroh_net::magic_endpoint::{self, RecvStream, SendStream}; use serde::{Deserialize, Serialize}; use tokio_util::task::LocalPoolHandle; use tracing::{debug, debug_span, info, trace, warn}; @@ -159,7 +159,7 @@ pub enum AddProgress { /// contains more data than the Request, or if no valid request is sent. /// /// When successful, the buffer is empty after this function call. -pub async fn read_request(mut reader: quinn::RecvStream) -> Result { +pub async fn read_request(mut reader: RecvStream) -> Result { let payload = reader .read_to_end(crate::protocol::MAX_MESSAGE_SIZE) .await?; @@ -318,7 +318,7 @@ pub async fn handle_connection( async fn handle_stream( db: D, - reader: quinn::RecvStream, + reader: RecvStream, writer: ResponseWriter, ) -> Result<()> { // 1. Decode the request. @@ -398,15 +398,13 @@ pub async fn handle_get( /// A helper struct that combines a quinn::SendStream with auxiliary information #[derive(Debug)] pub struct ResponseWriter { - inner: quinn::SendStream, + inner: SendStream, events: E, connection_id: u64, } impl ResponseWriter { - fn tracking_writer( - &mut self, - ) -> TrackingStreamWriter> { + fn tracking_writer(&mut self) -> TrackingStreamWriter> { TrackingStreamWriter::new(TokioStreamWriter(&mut self.inner)) } diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 9cec697935..5ae4aa4d8c 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -45,7 +45,6 @@ pkarr = { version = "1.1.5", default-features = false } portable-atomic = "1" postcard = "1.0.8" quic-rpc = { version = "0.9.0", features = ["flume-transport", "quinn-transport"] } -quinn = { package = "iroh-quinn", version = "0.10.2"} rand = "0.8.5" rustyline = "12.0.0" serde = { version = "1.0.197", features = ["derive"] } diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 06ee1ba504..492806fcb3 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -32,7 +32,8 @@ use iroh::{ }, dns::default_resolver, key::{PublicKey, SecretKey}, - magic_endpoint, netcheck, portmapper, + magic_endpoint::{self, Connection, RecvStream, SendStream}, + netcheck, portmapper, relay::{RelayMap, RelayMode, RelayUrl}, ticket::NodeTicket, util::AbortingJoinHandle, @@ -248,8 +249,8 @@ fn update_pb( /// handle a test stream request async fn handle_test_request( - mut send: quinn::SendStream, - mut recv: quinn::RecvStream, + mut send: SendStream, + mut recv: RecvStream, gui: &Gui, ) -> anyhow::Result<()> { let mut buf = [0u8; TestStreamRequest::POSTCARD_MAX_SIZE]; @@ -477,7 +478,7 @@ Ipv6: } async fn active_side( - connection: quinn::Connection, + connection: Connection, config: &TestConfig, gui: Option<&Gui>, ) -> anyhow::Result<()> { @@ -504,7 +505,7 @@ async fn active_side( } async fn send_test_request( - send: &mut quinn::SendStream, + send: &mut SendStream, request: &TestStreamRequest, ) -> anyhow::Result<()> { let mut buf = [0u8; TestStreamRequest::POSTCARD_MAX_SIZE]; @@ -514,7 +515,7 @@ async fn send_test_request( } async fn echo_test( - connection: &quinn::Connection, + connection: &Connection, config: &TestConfig, pb: Option<&indicatif::ProgressBar>, ) -> anyhow::Result { @@ -535,7 +536,7 @@ async fn echo_test( } async fn send_test( - connection: &quinn::Connection, + connection: &Connection, config: &TestConfig, pb: Option<&indicatif::ProgressBar>, ) -> anyhow::Result { @@ -559,7 +560,7 @@ async fn send_test( } async fn recv_test( - connection: &quinn::Connection, + connection: &Connection, config: &TestConfig, pb: Option<&indicatif::ProgressBar>, ) -> anyhow::Result { @@ -586,10 +587,7 @@ async fn recv_test( } /// Passive side that just accepts connections and answers requests (echo, drain or send) -async fn passive_side( - endpoint: MagicEndpoint, - connection: quinn::Connection, -) -> anyhow::Result<()> { +async fn passive_side(endpoint: MagicEndpoint, connection: Connection) -> anyhow::Result<()> { let remote_peer_id = magic_endpoint::get_remote_node_id(&connection)?; let gui = Gui::new(endpoint, remote_peer_id); loop { @@ -626,7 +624,7 @@ async fn make_endpoint( ); tracing::info!("relay map {:#?}", relay_map); - let mut transport_config = quinn::TransportConfig::default(); + let mut transport_config = magic_endpoint::TransportConfig::default(); transport_config.keep_alive_interval(Some(Duration::from_secs(5))); transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index fb517fb62e..6b9d5dba81 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -43,7 +43,6 @@ tempfile = { version = "3.4" } iroh-net = { version = "0.15.0", optional = true, path = "../iroh-net" } tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} -quinn = { package = "iroh-quinn", version = "0.10", optional = true } futures-util = { version = "0.3.25", optional = true } lru = "0.12" self_cell = "1.0.3" @@ -58,7 +57,7 @@ test-strategy = "0.3.1" [features] default = ["net", "metrics"] -net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:quinn", "dep:futures-util"] +net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"] metrics = ["dep:iroh-metrics"] [package.metadata.docs.rs] diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index 172a6e5e74..56f13bdcd6 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -33,7 +33,6 @@ iroh-base = { version = "0.15.0", path = "../iroh-base" } # net dependencies (optional) futures-lite = { version = "2.3", optional = true } iroh-net = { path = "../iroh-net", version = "0.15.0", optional = true, default-features = false } -quinn = { package = "iroh-quinn", version = "0.10", optional = true } tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] } tokio-util = { version = "0.7.8", optional = true, features = ["codec"] } genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] } @@ -47,7 +46,7 @@ url = "2.4.0" [features] default = ["net"] -net = ["dep:futures-lite", "dep:iroh-net", "dep:quinn", "dep:tokio", "dep:tokio-util"] +net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"] [[example]] name = "chat" diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 06f67911c6..ec72f65466 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -5,8 +5,10 @@ use bytes::{Bytes, BytesMut}; use futures_lite::stream::Stream; use genawaiter::sync::{Co, Gen}; use iroh_net::{ - dialer::Dialer, key::PublicKey, magic_endpoint::get_remote_node_id, AddrInfo, MagicEndpoint, - NodeAddr, + dialer::Dialer, + key::PublicKey, + magic_endpoint::{get_remote_node_id, Connection}, + AddrInfo, MagicEndpoint, NodeAddr, }; use rand::rngs::StdRng; use rand_core::SeedableRng; @@ -229,10 +231,10 @@ impl Gossip { } } - /// Handle an incoming [`quinn::Connection`]. + /// Handle an incoming [`Connection`]. /// /// Make sure to check the ALPN protocol yourself before passing the connection. - pub async fn handle_connection(&self, conn: quinn::Connection) -> anyhow::Result<()> { + pub async fn handle_connection(&self, conn: Connection) -> anyhow::Result<()> { let peer_id = get_remote_node_id(&conn)?; self.send(ToActor::ConnIncoming(peer_id, ConnOrigin::Accept, conn)) .await?; @@ -297,7 +299,7 @@ enum ConnOrigin { enum ToActor { /// Handle a new QUIC connection, either from accept (external to the actor) or from connect /// (happens internally in the actor). - ConnIncoming(PublicKey, ConnOrigin, #[debug(skip)] quinn::Connection), + ConnIncoming(PublicKey, ConnOrigin, #[debug(skip)] Connection), /// Join a topic with a list of peers. Reply with oneshot once at least one peer joined. Join( TopicId, @@ -344,7 +346,7 @@ struct Actor { /// Queued timers timers: Timers, /// Currently opened quinn connections to peers - conns: HashMap, + conns: HashMap, /// Channels to send outbound messages into the connection loops conn_send_tx: HashMap>, /// Queued messages that were to be sent before a dial completed @@ -603,7 +605,7 @@ async fn wait_for_neighbor_up(mut sub: broadcast::Receiver) -> anyhow::Re async fn connection_loop( from: PublicKey, - conn: quinn::Connection, + conn: Connection, origin: ConnOrigin, mut send_rx: mpsc::Receiver, in_event_tx: &mpsc::Sender, diff --git a/iroh-net/bench/Cargo.toml b/iroh-net/bench/Cargo.toml index 1415a6cbe3..112b97b069 100644 --- a/iroh-net/bench/Cargo.toml +++ b/iroh-net/bench/Cargo.toml @@ -10,7 +10,6 @@ anyhow = "1.0.22" bytes = "1" hdrhistogram = { version = "7.2", default-features = false } iroh-net = { path = ".." } -quinn = { package = "iroh-quinn", version = "0.10"} clap = { version = "4", features = ["derive"] } tokio = { version = "1.0.1", features = ["rt", "sync"] } tracing = "0.1" diff --git a/iroh-net/bench/src/bin/bulk.rs b/iroh-net/bench/src/bin/bulk.rs index a62dc01a55..3639d98cf6 100644 --- a/iroh-net/bench/src/bin/bulk.rs +++ b/iroh-net/bench/src/bin/bulk.rs @@ -5,7 +5,10 @@ use std::{ use anyhow::{Context, Result}; use clap::Parser; -use iroh_net::{MagicEndpoint, NodeAddr}; +use iroh_net::{ + magic_endpoint::{self, Connection}, + MagicEndpoint, NodeAddr, +}; use tokio::sync::Semaphore; use tracing::{info, trace}; @@ -72,7 +75,7 @@ async fn server(endpoint: MagicEndpoint, opt: Opt) -> Result<()> { server_tasks.push(tokio::spawn(async move { loop { let (mut send_stream, mut recv_stream) = match connection.accept_bi().await { - Err(quinn::ConnectionError::ApplicationClosed(_)) => break, + Err(magic_endpoint::ConnectionError::ApplicationClosed(_)) => break, Err(e) => { eprintln!("accepting stream failed: {e:?}"); break; @@ -167,7 +170,7 @@ async fn client(server_addr: NodeAddr, opt: Opt) -> Result { } async fn handle_client_stream( - connection: Arc, + connection: Arc, upload_size: u64, read_unordered: bool, ) -> Result<(TransferResult, TransferResult)> { diff --git a/iroh-net/bench/src/lib.rs b/iroh-net/bench/src/lib.rs index 09adfb4d97..e317aed0f3 100644 --- a/iroh-net/bench/src/lib.rs +++ b/iroh-net/bench/src/lib.rs @@ -3,6 +3,7 @@ use std::{net::SocketAddr, num::ParseIntError, str::FromStr}; use anyhow::{Context, Result}; use bytes::Bytes; use clap::Parser; +use iroh_net::magic_endpoint::{self, Connection, RecvStream, SendStream}; use iroh_net::{relay::RelayMode, MagicEndpoint, NodeAddr}; use tokio::runtime::{Builder, Runtime}; use tracing::trace; @@ -42,7 +43,7 @@ pub fn server_endpoint(rt: &tokio::runtime::Runtime, opt: &Opt) -> (NodeAddr, Ma pub async fn connect_client( server_addr: NodeAddr, opt: Opt, -) -> Result<(MagicEndpoint, quinn::Connection)> { +) -> Result<(MagicEndpoint, Connection)> { let endpoint = MagicEndpoint::builder() .alpns(vec![ALPN.to_vec()]) .relay_mode(RelayMode::Disabled) @@ -64,7 +65,7 @@ pub async fn connect_client( Ok((endpoint, connection)) } -pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool) -> Result { +pub async fn drain_stream(stream: &mut RecvStream, read_unordered: bool) -> Result { let mut read = 0; if read_unordered { @@ -93,7 +94,7 @@ pub async fn drain_stream(stream: &mut quinn::RecvStream, read_unordered: bool) Ok(read) } -pub async fn send_data_on_stream(stream: &mut quinn::SendStream, stream_size: u64) -> Result<()> { +pub async fn send_data_on_stream(stream: &mut SendStream, stream_size: u64) -> Result<()> { const DATA: &[u8] = &[0xAB; 1024 * 1024]; let bytes_data = Bytes::from_static(DATA); @@ -123,10 +124,10 @@ pub fn rt() -> Runtime { Builder::new_current_thread().enable_all().build().unwrap() } -pub fn transport_config(opt: &Opt) -> quinn::TransportConfig { +pub fn transport_config(opt: &Opt) -> magic_endpoint::TransportConfig { // High stream windows are chosen because the amount of concurrent streams // is configurable as a parameter. - let mut config = quinn::TransportConfig::default(); + let mut config = magic_endpoint::TransportConfig::default(); config.max_concurrent_uni_streams(opt.max_streams.try_into().unwrap()); config.initial_mtu(opt.initial_mtu); diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index ba4f760672..77ed98b81a 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -14,7 +14,6 @@ use std::time::Duration; use anyhow::{anyhow, bail, ensure, Context, Result}; use derive_more::Debug; use futures_lite::StreamExt; -use quinn::VarInt; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use tracing::{debug, info_span, trace, warn}; @@ -33,7 +32,10 @@ mod rtt_actor; use self::rtt_actor::RttMessage; -pub use quinn::Connection; +pub use quinn::{ + Connection, ConnectionError, ReadError, RecvStream, SendStream, TransportConfig, VarInt, + WriteError, +}; pub use super::magicsock::{ ConnectionInfo, ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo,