Skip to content

Commit

Permalink
Add OneByteReadWrapper to rc-zip-tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
fasterthanlime committed Mar 19, 2024
1 parent 0c652c1 commit bf5e72e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
2 changes: 1 addition & 1 deletion rc-zip-sync/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn streaming() {
}
}

// This helps find bug in state machines!
// This helps find bugs in state machines!

struct OneByteReadWrapper<R>(R);

Expand Down
53 changes: 47 additions & 6 deletions rc-zip-tokio/tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use positioned_io::RandomAccessFile;
use positioned_io::{RandomAccessFile, Size};
use rc_zip::{
corpus::{self, zips_dir, Case, Files},
error::Error,
parse::Archive,
};
use rc_zip_tokio::{ArchiveHandle, HasCursor, ReadZip, ReadZipStreaming};
use tokio::io::AsyncReadExt;
use rc_zip_tokio::{ArchiveHandle, HasCursor, ReadZip, ReadZipStreaming, ReadZipWithSize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};

use std::sync::Arc;
use std::{pin::Pin, sync::Arc, task};

async fn check_case<F: HasCursor>(test: &Case, archive: Result<ArchiveHandle<'_, F>, Error>) {
corpus::check_case(test, archive.as_ref().map(|ar| -> &Archive { ar }));
Expand Down Expand Up @@ -49,8 +49,15 @@ async fn real_world_files() {

let guarded_path = case.absolute_path();
let file = Arc::new(RandomAccessFile::open(&guarded_path.path).unwrap());
let archive = file.read_zip().await;
check_case(&case, archive).await;
if let Ok("1") = std::env::var("ONE_BYTE_READ").as_deref() {
let size = file.size().unwrap().expect("file to have a size");
let file = OneByteReadWrapper(file);
let archive = file.read_zip_with_size(size).await;
check_case(&case, archive).await;
} else {
let archive = file.read_zip().await;
check_case(&case, archive).await;
}
drop(guarded_path)
}
}
Expand Down Expand Up @@ -79,3 +86,37 @@ async fn streaming() {
drop(guarded_path)
}
}

// This helps find bugs in state machines!

struct OneByteReadWrapper<R>(R);

impl<R> AsyncRead for OneByteReadWrapper<R>
where
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> task::Poll<std::io::Result<()>> {
let mut inner_buf = buf.take(1);
futures::ready!(
unsafe { self.map_unchecked_mut(|s| &mut s.0) }.poll_read(cx, &mut inner_buf)
)?;
let n = inner_buf.filled().len();
buf.set_filled(n);
Ok(()).into()
}
}

impl<R> HasCursor for OneByteReadWrapper<R>
where
R: HasCursor,
{
type Cursor<'a> = OneByteReadWrapper<<R as HasCursor>::Cursor<'a>> where R: 'a;

fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
OneByteReadWrapper(self.0.cursor_at(offset))
}
}

0 comments on commit bf5e72e

Please sign in to comment.