Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio 0.3 support #104

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "async-compression"
version = "0.3.5"
version = "0.3.6"
authors = ["Wim Looman <wim@nemo157.com>", "Allen Bui <fairingrey@gmail.com>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand All @@ -19,7 +19,7 @@ rustdoc-args = ["--cfg", "docsrs"]
# groups
default = []
all = ["all-implementations", "all-algorithms"]
all-implementations = ["futures-io", "stream", "tokio-02"]
all-implementations = ["futures-io", "stream", "tokio-02", "tokio-03"]
all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lzma", "xz", "zlib", "zstd"]

# implementations
Expand Down Expand Up @@ -50,6 +50,7 @@ libzstd = { package = "zstd", version = "0.5.0", optional = true, default-featur
zstd-safe = { version = "2.0.0", optional = true, default-features = false }
memchr = "2.2.1"
tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-features = false }
tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false }

[dev-dependencies]
proptest = "0.9.4"
Expand All @@ -61,6 +62,8 @@ ntest = "0.3.3"
timebomb = "0.1.2"
bytes = "0.5.0"
tokio-02 = { package = "tokio", version = "0.2.21", default-features = false, features = ["io-util", "stream"] }
tokio-03 = { package = "tokio", version = "0.3.0", default-features = false, features = ["io-util", "stream"] }
tokio-util = { version = "0.4.0", default-features = false, features = ["io"] }

[[test]]
name = "brotli"
Expand Down
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@
not(feature = "tokio-02"),
doc = "`tokio-02` (*inactive*) | `tokio::io::AsyncBufRead`, `tokio::io::AsyncWrite`"
)]
#![cfg_attr(
feature = "tokio-03",
doc = "[`tokio-03`](crate::tokio_03) | [`tokio::io::AsyncBufRead`](::tokio_03::io::AsyncBufRead), [`tokio::io::AsyncWrite`](::tokio_03::io::AsyncWrite)"
)]
#![cfg_attr(
not(feature = "tokio-03"),
doc = "`tokio-03` (*inactive*) | `tokio::io::AsyncBufRead`, `tokio::io::AsyncWrite`"
)]
//!

//! ## Compression algorithm
Expand Down Expand Up @@ -159,6 +167,9 @@ pub mod stream;
#[cfg(feature = "tokio-02")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-02")))]
pub mod tokio_02;
#[cfg(feature = "tokio-03")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-03")))]
pub mod tokio_03;

mod unshared;
mod util;
Expand Down
142 changes: 142 additions & 0 deletions src/tokio_03/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use std::io::Result;

use crate::{codec::Decode, util::PartialBuffer};
use futures_core::ready;
use pin_project_lite::pin_project;
use tokio_03::io::{AsyncBufRead, AsyncRead, ReadBuf};

#[derive(Debug)]
enum State {
Decoding,
Flushing,
Done,
Next,
}

pin_project! {
#[derive(Debug)]
pub struct Decoder<R, D: Decode> {
#[pin]
reader: R,
decoder: D,
state: State,
multiple_members: bool,
}
}

impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
pub fn new(reader: R, decoder: D) -> Self {
Self {
reader,
decoder,
state: State::Decoding,
multiple_members: false,
}
}

pub fn get_ref(&self) -> &R {
&self.reader
}

pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}

pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().reader
}

pub fn into_inner(self) -> R {
self.reader
}

pub fn multiple_members(&mut self, enabled: bool) {
self.multiple_members = enabled;
}

fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();

loop {
*this.state = match this.state {
State::Decoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
let done = this.decoder.decode(&mut input, output)?;
let len = input.written().len();
this.reader.as_mut().consume(len);
if done {
State::Flushing
} else {
State::Decoding
}
}
}

State::Flushing => {
if this.decoder.finish(output)? {
if *this.multiple_members {
this.decoder.reinit()?;
State::Next
} else {
State::Done
}
} else {
State::Flushing
}
}

State::Done => State::Done,

State::Next => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
State::Done
} else {
State::Decoding
}
}
};

if let State::Done = *this.state {
return Poll::Ready(Ok(()));
}
if output.unwritten().is_empty() {
return Poll::Ready(Ok(()));
}
}
}
}

impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}

let mut output = PartialBuffer::new(buf.initialize_unfilled());
match self.do_poll_read(cx, &mut output)? {
Poll::Pending if output.written().is_empty() => Poll::Pending,
_ => {
let len = output.written().len();
buf.advance(len);
Poll::Ready(Ok(()))
}
}
}
}
117 changes: 117 additions & 0 deletions src/tokio_03/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use std::io::Result;

use crate::{codec::Encode, util::PartialBuffer};
use futures_core::ready;
use pin_project_lite::pin_project;
use tokio_03::io::{AsyncBufRead, AsyncRead, ReadBuf};

#[derive(Debug)]
enum State {
Encoding,
Flushing,
Done,
}

pin_project! {
#[derive(Debug)]
pub struct Encoder<R, E: Encode> {
#[pin]
reader: R,
encoder: E,
state: State,
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
pub fn new(reader: R, encoder: E) -> Self {
Self {
reader,
encoder,
state: State::Encoding,
}
}

pub fn get_ref(&self) -> &R {
&self.reader
}

pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}

pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().reader
}

pub fn into_inner(self) -> R {
self.reader
}

fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();

loop {
*this.state = match this.state {
State::Encoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
this.encoder.encode(&mut input, output)?;
let len = input.written().len();
this.reader.as_mut().consume(len);
State::Encoding
}
}

State::Flushing => {
if this.encoder.finish(output)? {
State::Done
} else {
State::Flushing
}
}

State::Done => State::Done,
};

if let State::Done = *this.state {
return Poll::Ready(Ok(()));
}
if output.unwritten().is_empty() {
return Poll::Ready(Ok(()));
}
}
}
}

impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}

let mut output = PartialBuffer::new(buf.initialize_unfilled());
match self.do_poll_read(cx, &mut output)? {
Poll::Pending if output.written().is_empty() => Poll::Pending,
_ => {
let len = output.written().len();
buf.advance(len);
Poll::Ready(Ok(()))
}
}
}
}
4 changes: 4 additions & 0 deletions src/tokio_03/bufread/generic/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod decoder;
mod encoder;

pub use self::{decoder::Decoder, encoder::Encoder};
Loading