From 955352bfd2b2a807b13659c6ce74ca2818070513 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 12 Feb 2020 13:23:08 +0100 Subject: [PATCH] Extract rw-stream-sink to its own repo (#1448) --- Cargo.toml | 1 - core/Cargo.toml | 2 +- misc/rw-stream-sink/Cargo.toml | 18 --- misc/rw-stream-sink/src/lib.rs | 202 -------------------------------- protocols/plaintext/Cargo.toml | 2 +- protocols/secio/Cargo.toml | 2 +- transports/websocket/Cargo.toml | 2 +- 7 files changed, 4 insertions(+), 225 deletions(-) delete mode 100644 misc/rw-stream-sink/Cargo.toml delete mode 100644 misc/rw-stream-sink/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 7b1f4c57934..82c7729c59a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,6 @@ members = [ "misc/multihash", "misc/multistream-select", "misc/peer-id-generator", - "misc/rw-stream-sink", "muxers/mplex", "muxers/yamux", "protocols/floodsub", diff --git a/core/Cargo.toml b/core/Cargo.toml index 4b846f7348b..83e6fd4c16d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -26,7 +26,7 @@ parking_lot = "0.10.0" pin-project = "0.4.6" prost = "0.6.1" rand = "0.7" -rw-stream-sink = { version = "0.2.0", path = "../misc/rw-stream-sink" } +rw-stream-sink = "0.2.0" sha2 = "0.8.0" smallvec = "1.0" thiserror = "1.0" diff --git a/misc/rw-stream-sink/Cargo.toml b/misc/rw-stream-sink/Cargo.toml deleted file mode 100644 index 07e1f81ebe9..00000000000 --- a/misc/rw-stream-sink/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "rw-stream-sink" -edition = "2018" -description = "Adaptator between Stream/Sink and AsyncRead/AsyncWrite" -version = "0.2.1" -authors = ["Parity Technologies "] -license = "MIT" -repository = "https://github.com/libp2p/rust-libp2p" -keywords = ["networking"] -categories = ["network-programming", "asynchronous"] - -[dependencies] -futures = "0.3.1" -pin-project = "0.4.6" -static_assertions = "1" - -[dev-dependencies] -async-std = "1.0" diff --git a/misc/rw-stream-sink/src/lib.rs b/misc/rw-stream-sink/src/lib.rs deleted file mode 100644 index 0eb2feb0835..00000000000 --- a/misc/rw-stream-sink/src/lib.rs +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! This crate provides the [`RwStreamSink`] type. It wraps around a [`Stream`] -//! and [`Sink`] that produces and accepts byte arrays, and implements -//! [`AsyncRead`] and [`AsyncWrite`]. -//! -//! Each call to [`AsyncWrite::poll_write`] will send one packet to the sink. -//! Calls to [`AsyncRead::poll_read`] will read from the stream's incoming packets. - -use futures::{prelude::*, ready}; -use std::{io::{self, Read}, pin::Pin, task::{Context, Poll}}; - -static_assertions::const_assert!(std::mem::size_of::() <= std::mem::size_of::()); - -/// Wraps a [`Stream`] and [`Sink`] whose items are buffers. -/// Implements [`AsyncRead`] and [`AsyncWrite`]. -#[pin_project::pin_project] -pub struct RwStreamSink { - #[pin] - inner: S, - current_item: Option::Ok>> -} - -impl RwStreamSink { - /// Wraps around `inner`. - pub fn new(inner: S) -> Self { - RwStreamSink { inner, current_item: None } - } -} - -impl AsyncRead for RwStreamSink -where - S: TryStream, - ::Ok: AsRef<[u8]> -{ - fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - let mut this = self.project(); - - // Grab the item to copy from. - let item_to_copy = loop { - if let Some(ref mut i) = this.current_item { - if i.position() < i.get_ref().as_ref().len() as u64 { - break i - } - } - *this.current_item = Some(match ready!(this.inner.as_mut().try_poll_next(cx)) { - Some(Ok(i)) => std::io::Cursor::new(i), - Some(Err(e)) => return Poll::Ready(Err(e)), - None => return Poll::Ready(Ok(0)) // EOF - }); - }; - - // Copy it! - Poll::Ready(Ok(item_to_copy.read(buf)?)) - } -} - -impl AsyncWrite for RwStreamSink -where - S: TryStream + Sink<::Ok, Error = io::Error>, - ::Ok: for<'r> From<&'r [u8]> -{ - fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - let mut this = self.project(); - ready!(this.inner.as_mut().poll_ready(cx)?); - let n = buf.len(); - if let Err(e) = this.inner.start_send(buf.into()) { - return Poll::Ready(Err(e)) - } - Poll::Ready(Ok(n)) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.project(); - this.inner.poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.project(); - this.inner.poll_close(cx) - } -} - -#[cfg(test)] -mod tests { - use async_std::task; - use futures::{channel::mpsc, prelude::*, stream}; - use std::{pin::Pin, task::{Context, Poll}}; - use super::RwStreamSink; - - // This struct merges a stream and a sink and is quite useful for tests. - struct Wrapper(St, Si); - - impl Stream for Wrapper - where - St: Stream + Unpin, - Si: Unpin - { - type Item = St::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.0.poll_next_unpin(cx) - } - } - - impl Sink for Wrapper - where - St: Unpin, - Si: Sink + Unpin, - { - type Error = Si::Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.1).poll_ready(cx) - } - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::new(&mut self.1).start_send(item) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.1).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.1).poll_close(cx) - } - } - - #[test] - fn basic_reading() { - let (tx1, _) = mpsc::channel::>(10); - let (mut tx2, rx2) = mpsc::channel(10); - - let mut wrapper = RwStreamSink::new(Wrapper(rx2.map(Ok), tx1)); - - task::block_on(async move { - tx2.send(Vec::from("hel")).await.unwrap(); - tx2.send(Vec::from("lo wor")).await.unwrap(); - tx2.send(Vec::from("ld")).await.unwrap(); - tx2.close().await.unwrap(); - - let mut data = Vec::new(); - wrapper.read_to_end(&mut data).await.unwrap(); - assert_eq!(data, b"hello world"); - }) - } - - #[test] - fn skip_empty_stream_items() { - let data: Vec<&[u8]> = vec![b"", b"foo", b"", b"bar", b"", b"baz", b""]; - let mut rws = RwStreamSink::new(stream::iter(data).map(Ok)); - let mut buf = [0; 9]; - task::block_on(async move { - assert_eq!(3, rws.read(&mut buf).await.unwrap()); - assert_eq!(3, rws.read(&mut buf[3..]).await.unwrap()); - assert_eq!(3, rws.read(&mut buf[6..]).await.unwrap()); - assert_eq!(0, rws.read(&mut buf).await.unwrap()); - assert_eq!(b"foobarbaz", &buf[..]) - }) - } - - #[test] - fn partial_read() { - let data: Vec<&[u8]> = vec![b"hell", b"o world"]; - let mut rws = RwStreamSink::new(stream::iter(data).map(Ok)); - let mut buf = [0; 3]; - task::block_on(async move { - assert_eq!(3, rws.read(&mut buf).await.unwrap()); - assert_eq!(b"hel", &buf[..3]); - assert_eq!(0, rws.read(&mut buf[..0]).await.unwrap()); - assert_eq!(1, rws.read(&mut buf).await.unwrap()); - assert_eq!(b"l", &buf[..1]); - assert_eq!(3, rws.read(&mut buf).await.unwrap()); - assert_eq!(b"o w", &buf[..3]); - assert_eq!(0, rws.read(&mut buf[..0]).await.unwrap()); - assert_eq!(3, rws.read(&mut buf).await.unwrap()); - assert_eq!(b"orl", &buf[..3]); - assert_eq!(1, rws.read(&mut buf).await.unwrap()); - assert_eq!(b"d", &buf[..1]); - assert_eq!(0, rws.read(&mut buf).await.unwrap()); - }) - } -} diff --git a/protocols/plaintext/Cargo.toml b/protocols/plaintext/Cargo.toml index ff7a12afdf1..38ba0850b6f 100644 --- a/protocols/plaintext/Cargo.toml +++ b/protocols/plaintext/Cargo.toml @@ -16,7 +16,7 @@ futures_codec = "0.3.4" libp2p-core = { version = "0.15.0", path = "../../core" } log = "0.4.8" prost = "0.6.1" -rw-stream-sink = { version = "0.2.0", path = "../../misc/rw-stream-sink" } +rw-stream-sink = "0.2.0" unsigned-varint = { version = "0.3", features = ["futures-codec"] } void = "1.0.2" diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index 43ac50f3fb3..b9d91d606f6 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -22,7 +22,7 @@ prost = "0.6.1" pin-project = "0.4.6" quicksink = "0.1" rand = "0.7" -rw-stream-sink = { version = "0.2.0", path = "../../misc/rw-stream-sink" } +rw-stream-sink = "0.2.0" sha2 = "0.8.0" static_assertions = "1" twofish = "0.2.0" diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 7ebb2d9cce2..8cb8fed76ec 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -18,7 +18,7 @@ libp2p-core = { version = "0.15.0", path = "../../core" } log = "0.4.8" quicksink = "0.1" rustls = "0.16" -rw-stream-sink = { version = "0.2.0", path = "../../misc/rw-stream-sink" } +rw-stream-sink = "0.2.0" soketto = { version = "0.3", features = ["deflate"] } url = "2.1" webpki = "0.21"