Skip to content

Commit

Permalink
util/io: Add sync_bridge with ReadBridge and WriteBridge
Browse files Browse the repository at this point in the history
I'm doing quite a bit of stuff inside `spawn_blocking` because
I need to use some large synchronous APIs.

I found it not entirely obvious how to "bridge" the world of
async I/O with sync I/O.

Since we have a handy "tokio-util" module with an "io" feature,
these small helpers seem like a good fit hopefully!

Extracted these from my code in https://github.com/ostreedev/ostree-rs-ext/blob/main/lib/src/async_util.rs
  • Loading branch information
cgwalters committed Oct 1, 2021
1 parent 9ff7d8c commit 5040268
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
8 changes: 7 additions & 1 deletion tokio-util/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
//! Helpers for IO related tasks.
//!
//! These types are often used in combination with hyper or reqwest, as they
//! The stream types are often used in combination with hyper or reqwest, as they
//! allow converting between a hyper [`Body`] and [`AsyncRead`].
//!
//! The [`ReadBridge`] and [`WriteBridge`] convert from the world of async I/O
//! to synchronous I/O; this may often come up when using synchronous APIs
//! inside [`tokio::task::spawn_blocking`].
//!
//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
//! [`AsyncRead`]: tokio::io::AsyncRead
mod read_buf;
mod reader_stream;
mod stream_reader;
mod sync_bridge;

pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;
pub use self::sync_bridge::{ReadBridge, WriteBridge};
pub use crate::util::{poll_read_buf, poll_write_buf};
58 changes: 58 additions & 0 deletions tokio-util/src/io/sync_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::io::{Read, Write};
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

/// A [`std::io::Read`] implementation backed by an asynchronous source.
#[derive(Debug)]
pub struct ReadBridge<T> {
reader: Pin<Box<T>>,
rt: tokio::runtime::Handle,
}

impl<T: AsyncRead> Read for ReadBridge<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let reader = &mut self.reader;
self.rt.block_on(async { reader.read(buf).await })
}
}

impl<T: AsyncRead> ReadBridge<T> {
/// Create a [`std::io::Read`] implementation backed by an asynchronous source.
///
/// This is useful with e.g. [`tokio::task::spawn_blocking`].
pub fn new(reader: T) -> Self {
let reader = Box::pin(reader);
let rt = tokio::runtime::Handle::current();
ReadBridge { reader, rt }
}
}

/// A [`std::io::Write`] implementation backed by an asynchronous source.
#[derive(Debug)]
pub struct WriteBridge<T> {
w: Pin<Box<T>>,
rt: tokio::runtime::Handle,
}

impl<T: AsyncWrite> Write for WriteBridge<T> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let w = &mut self.w;
self.rt.block_on(async { w.write(buf).await })
}

fn flush(&mut self) -> std::io::Result<()> {
let w = &mut self.w;
self.rt.block_on(async { w.flush().await })
}
}

impl<T: AsyncWrite> WriteBridge<T> {
/// Create a [`std::io::Write`] implementation backed by an asynchronous source.
///
/// This is useful with e.g. [`tokio::task::spawn_blocking`].
pub fn new(reader: T) -> Self {
let w = Box::pin(reader);
let rt = tokio::runtime::Handle::current();
WriteBridge { w, rt }
}
}
26 changes: 26 additions & 0 deletions tokio-util/tests/io_sync_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::io::{Cursor, Read, Result as IoResult};
use tokio::io::AsyncRead;
use tokio_util::io::ReadBridge;

async fn test_reader_len(
r: impl AsyncRead + Unpin + Send + 'static,
expected_len: usize,
) -> IoResult<()> {
let mut r = ReadBridge::new(r);
let res = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
r.read_to_end(&mut buf)?;
Ok::<_, std::io::Error>(buf)
})
.await?;
assert_eq!(res?.len(), expected_len);
Ok(())
}

#[tokio::test]
async fn test_async_read_to_sync() -> Result<(), Box<dyn std::error::Error>> {
test_reader_len(tokio::io::empty(), 0).await?;
let buf = b"hello world";
test_reader_len(Cursor::new(buf), buf.len()).await?;
Ok(())
}

0 comments on commit 5040268

Please sign in to comment.