Skip to content

Commit

Permalink
Implement VarIntAsyncReader
Browse files Browse the repository at this point in the history
  • Loading branch information
dermesser committed Feb 21, 2020
1 parent c9e1a80 commit 3e6e503
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 25 deletions.
91 changes: 67 additions & 24 deletions src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::io;
use std::io::{Read, Result};

use fixed::FixedInt;
use varint::{VarInt, MSB};
use crate::fixed::FixedInt;
use crate::varint::{VarInt, MSB};

use futures::io::AsyncReadExt;
use futures::prelude::*;

/// A trait for reading VarInts from any other `Reader`.
///
Expand All @@ -17,37 +20,77 @@ pub trait VarIntReader {
fn read_varint<VI: VarInt>(&mut self) -> Result<VI>;
}

impl<R: Read> VarIntReader for R {
fn read_varint<VI: VarInt>(&mut self) -> Result<VI> {
const BUFLEN: usize = 10;
let mut buf = [0 as u8; BUFLEN];
let mut i = 0;

loop {
if i >= BUFLEN {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unterminated varint",
));
}
/// Like a VarIntReader, but returns a future.
#[async_trait::async_trait]
pub trait VarIntAsyncReader {
async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI>;
}

/// VarIntProcessor encapsulates the logic for decoding a VarInt byte-by-byte.
#[derive(Default)]
pub struct VarIntProcessor {
buf: [u8; 10],
i: usize,
}

impl VarIntProcessor {
fn push(&mut self, b: u8) -> Result<()> {
if self.i >= 10 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unterminated varint",
));
}
self.buf[self.i] = b;
self.i += 1;
Ok(())
}
fn finished(&self) -> bool {
(self.i > 0 && (self.buf[self.i - 1] & MSB == 0))
}
fn decode<VI: VarInt>(&self) -> VI {
VI::decode_var(&self.buf[0..self.i]).0
}
}

let read = try!(self.read(&mut buf[i..i + 1]));
#[async_trait::async_trait]
impl<AR: AsyncRead + Unpin + Send> VarIntAsyncReader for AR {
async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI> {
let mut buf = [0 as u8; 1];
let mut p = VarIntProcessor::default();

while !p.finished() {
let read = self.read(&mut buf).await?;

// EOF
if read == 0 && i == 0 {
if read == 0 && p.i == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
}

if buf[i] & MSB == 0 {
break;
p.push(buf[0])?;
}

Ok(p.decode())
}
}

impl<R: Read> VarIntReader for R {
fn read_varint<VI: VarInt>(&mut self) -> Result<VI> {
let mut buf = [0 as u8; 1];
let mut p = VarIntProcessor::default();

while !p.finished() {
let read = self.read(&mut buf)?;

// EOF
if read == 0 && p.i == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
}

i += 1;
p.push(buf[0])?;
}

let (result, _) = VI::decode_var(&buf[0..i + 1]);

Ok(result)
Ok(p.decode())
}
}

Expand All @@ -63,7 +106,7 @@ impl<R: Read> FixedIntReader for R {
fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI> {
let mut buf = [0 as u8; 8];

let read = try!(self.read(&mut buf[0..FI::required_space()]));
let read = self.read(&mut buf[0..FI::required_space()])?;

if read == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
Expand Down
37 changes: 36 additions & 1 deletion src/varint_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(test)]
mod tests {
use crate::reader::VarIntReader;
use crate::reader::{VarIntAsyncReader, VarIntReader};
use crate::varint::VarInt;
use crate::writer::VarIntWriter;

Expand Down Expand Up @@ -105,4 +105,39 @@ mod tests {

assert!(reader.read_varint::<u32>().is_err());
}

#[test]
fn test_async_reader() {
let mut buf = Vec::with_capacity(128);

let i1: u32 = 1;
let i2: u32 = 65532;
let i3: u32 = 4200123456;
let i4: i64 = i3 as i64 * 1000;
let i5: i32 = -32456;

assert!(buf.write_varint(i1).is_ok());
assert!(buf.write_varint(i2).is_ok());
assert!(buf.write_varint(i3).is_ok());
assert!(buf.write_varint(i4).is_ok());
assert!(buf.write_varint(i5).is_ok());

let mut reader: &[u8] = buf.as_ref();

futures::executor::block_on(async {
assert_eq!(i1, reader.read_varint_async().await.unwrap());
assert_eq!(i2, reader.read_varint_async().await.unwrap());
assert_eq!(i3, reader.read_varint_async().await.unwrap());
assert_eq!(i4, reader.read_varint_async().await.unwrap());
assert_eq!(i5, reader.read_varint_async().await.unwrap());
assert!(reader.read_varint_async::<u32>().await.is_err());
});
}

#[test]
fn test_unterminated_varint() {
let mut buf = vec![0xff as u8; 12];
let mut read = buf.as_slice();
assert!(read.read_varint::<u64>().is_err());
}
}

0 comments on commit 3e6e503

Please sign in to comment.