Skip to content

Commit

Permalink
Enables the StreamingRawReader to detect incomplete text (#784)
Browse files Browse the repository at this point in the history
* Enables the StreamingRawReader to detect incomplete text
* Fixes bug in EncodedBinaryValueData_1_0::body_span
* Fixes indententation for annotated text values
  • Loading branch information
zslayton authored Jun 5, 2024
1 parent 1a0a33e commit 62d0755
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 29 deletions.
18 changes: 11 additions & 7 deletions src/lazy/any_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,7 @@ impl<'data> LazyRawReader<'data, AnyEncoding> for LazyRawAnyReader<'data> {

#[inline]
fn save_state(&self) -> <AnyEncoding as Decoder>::ReaderSavedState {
use RawReaderKind::*;
match &self.encoding {
Text_1_0(_) => IonEncoding::Text_1_0,
Binary_1_0(_) => IonEncoding::Binary_1_0,
Text_1_1(_) => IonEncoding::Text_1_1,
Binary_1_1(_) => IonEncoding::Binary_1_1,
}
self.encoding()
}

fn position(&self) -> usize {
Expand All @@ -402,6 +396,16 @@ impl<'data> LazyRawReader<'data, AnyEncoding> for LazyRawAnyReader<'data> {
Binary_1_1(r) => r.position(),
}
}

fn encoding(&self) -> IonEncoding {
use RawReaderKind::*;
match &self.encoding {
Text_1_0(_) => IonEncoding::Text_1_0,
Binary_1_0(_) => IonEncoding::Binary_1_0,
Text_1_1(_) => IonEncoding::Text_1_1,
Binary_1_1(_) => IonEncoding::Binary_1_1,
}
}
}

// ===== Values ======
Expand Down
5 changes: 5 additions & 0 deletions src/lazy/binary/raw/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::lazy::raw_stream_item::{EndPosition, LazyRawStreamItem, RawStreamItem
use crate::result::IonFailure;
use crate::IonResult;

use crate::lazy::any_encoding::IonEncoding;
use bumpalo::Bump as BumpAllocator;

/// A binary Ion 1.0 reader that yields [`LazyRawBinaryValue_1_0`]s representing the top level values found
Expand Down Expand Up @@ -135,6 +136,10 @@ impl<'data> LazyRawReader<'data, BinaryEncoding_1_0> for LazyRawBinaryReader_1_0
fn position(&self) -> usize {
self.data.buffer.offset() + self.data.bytes_to_skip
}

fn encoding(&self) -> IonEncoding {
IonEncoding::Binary_1_0
}
}

/// Wraps an [`ImmutableBuffer`], allowing the reader to advance each time an item is successfully
Expand Down
5 changes: 5 additions & 0 deletions src/lazy/binary/raw/v1_1/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::lazy::raw_stream_item::{EndPosition, LazyRawStreamItem, RawStreamItem
use crate::result::IonFailure;
use crate::IonResult;

use crate::lazy::any_encoding::IonEncoding;
use bumpalo::Bump as BumpAllocator;

pub struct LazyRawBinaryReader_1_1<'data> {
Expand Down Expand Up @@ -164,6 +165,10 @@ impl<'data> LazyRawReader<'data, BinaryEncoding_1_1> for LazyRawBinaryReader_1_1
fn position(&self) -> usize {
self.data.offset() + self.bytes_to_skip
}

fn encoding(&self) -> IonEncoding {
IonEncoding::Binary_1_1
}
}

#[cfg(test)]
Expand Down
10 changes: 5 additions & 5 deletions src/lazy/binary/raw/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,11 @@ impl<'a, 'top> EncodedBinaryValueData_1_0<'a, 'top> {
/// Returns the encoded bytes representing the value's body (that is: the content of the value
/// that follows its opcode and length).
pub fn body_span(&self) -> Span<'top> {
let stream_range = self.body_range();
let offset = self.value.input.offset();
let local_range = stream_range.start - offset..stream_range.end - offset;
let bytes = &self.span().bytes()[local_range];
Span::with_offset(stream_range.start, bytes)
let body_range = self.body_range();
let body_length = body_range.len();
let value_bytes = self.span().bytes();
let body_bytes = &value_bytes[value_bytes.len() - body_length..];
Span::with_offset(body_range.start, body_bytes)
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/lazy/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::ops::Range;

use bumpalo::Bump as BumpAllocator;

use crate::lazy::any_encoding::IonEncoding;
use crate::lazy::encoding::{BinaryEncoding_1_0, RawValueLiteral, TextEncoding_1_0};
use crate::lazy::expanded::macro_evaluator::RawEExpression;
use crate::lazy::raw_stream_item::LazyRawStreamItem;
Expand Down Expand Up @@ -348,6 +349,7 @@ pub trait LazyRawReader<'data, D: Decoder>: Sized {

fn resume_at_offset(data: &'data [u8], offset: usize, saved_state: D::ReaderSavedState)
-> Self;

fn next<'top>(
&'top mut self,
allocator: &'top BumpAllocator,
Expand All @@ -363,6 +365,8 @@ pub trait LazyRawReader<'data, D: Decoder>: Sized {
/// This position is not necessarily the first byte of the next value; it may be (e.g.) a NOP,
/// a comment, or whitespace that the reader will traverse as part of matching the next item.
fn position(&self) -> usize;

fn encoding(&self) -> IonEncoding;
}

pub trait LazyRawContainer<'top, D: Decoder> {
Expand Down
19 changes: 16 additions & 3 deletions src/lazy/encoder/text/v1_0/value_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct TextValueWriter_1_0<'value, W: Write + 'value> {
// (i.e. following an indented field name) which is the only time we don't write
// indentation before the value.
pub(crate) parent_type: ParentType,
// If `true`, this value had annotations and so should not write its own indentation.
pub(crate) has_annotations: bool,
}

pub(crate) fn write_symbol_token<O: Write, A: AsRawSymbolRef>(
Expand Down Expand Up @@ -60,15 +62,18 @@ impl<'value, W: Write + 'value> TextValueWriter_1_0<'value, W> {
depth,
value_delimiter: delimiter,
parent_type,
has_annotations: false,
}
}

/// Writes the `indentation` string set in the whitespace config to output `depth` times.
fn write_indentation(&mut self) -> IonResult<()> {
let indentation = self.whitespace_config().indentation;
if self.parent_type == ParentType::Struct {
if self.parent_type == ParentType::Struct || self.has_annotations {
// If this value is part of a struct field, the indentation was written before the
// field name. There's nothing to do here.
// field name.
// If this value has annotations, the indentation was written before the annotations.
// Either way, there's nothing to do here.
return Ok(());
}
if !indentation.is_empty() {
Expand Down Expand Up @@ -111,7 +116,13 @@ pub struct TextAnnotatedValueWriter_1_0<'value, W: Write> {
}

impl<'value, W: Write> TextAnnotatedValueWriter_1_0<'value, W> {
fn encode_annotations(self) -> IonResult<TextValueWriter_1_0<'value, W>> {
fn encode_annotations(mut self) -> IonResult<TextValueWriter_1_0<'value, W>> {
// The inner ValueWriter knows the indentation depth; we'll have it write the indentation
// before we write the value.
self.value_writer.write_indentation()?;
// After indenting, we set the `has_annotations` flag to `true` so the value won't write
// indentation a second time.
self.value_writer.has_annotations = !self.annotations.is_empty();
let output = &mut self.value_writer.writer.output;
for annotation in self.annotations {
match annotation.as_raw_symbol_token_ref() {
Expand Down Expand Up @@ -247,6 +258,7 @@ impl<'a, W: Write> TextContainerWriter_1_0<'a, W> {
depth: self.depth + 1,
value_delimiter: self.value_delimiter,
parent_type: self.container_type.into(),
has_annotations: false,
}
}
}
Expand Down Expand Up @@ -425,6 +437,7 @@ impl<'value, W: Write> MakeValueWriter for TextStructWriter_1_0<'value, W> {
depth: self.container_writer.depth + 1,
value_delimiter: ",",
parent_type: ParentType::Struct,
has_annotations: false,
}
}
}
Expand Down
133 changes: 124 additions & 9 deletions src/lazy/streaming_raw_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bumpalo::Bump as BumpAllocator;
use crate::lazy::any_encoding::IonEncoding;
use crate::lazy::decoder::{Decoder, LazyRawReader};
use crate::lazy::raw_stream_item::LazyRawStreamItem;
use crate::{AnyEncoding, IonError, IonResult};
use crate::{AnyEncoding, IonError, IonResult, LazyRawValue};

/// Wraps an implementation of [`IonDataSource`] and reads one top level value at a time from the input.
pub struct StreamingRawReader<Encoding: Decoder, Input: IonInput> {
Expand Down Expand Up @@ -81,6 +81,7 @@ impl<Encoding: Decoder, Input: IonInput> StreamingRawReader<Encoding, Input> {
&'top mut self,
allocator: &'top BumpAllocator,
) -> IonResult<LazyRawStreamItem<'top, Encoding>> {
let mut input_source_exhausted = false;
loop {
// If the input buffer is empty, try to pull more data from the source before proceeding.
// It's important that we do this _before_ reading from the buffer; any item returned
Expand All @@ -107,6 +108,7 @@ impl<Encoding: Decoder, Input: IonInput> StreamingRawReader<Encoding, Input> {
// the borrow checker's limitation (described in a comment on the StreamingRawReader type)
// by getting a second (read-only) reference to the reader.
let slice_reader_ref = unsafe { &*unsafe_cell_reader.get() };
let encoding = slice_reader_ref.encoding();
let end_position = slice_reader_ref.position();
// For the RawAnyReader, remember what encoding we detected for next time.
self.saved_state = slice_reader_ref.save_state();
Expand All @@ -123,12 +125,63 @@ impl<Encoding: Decoder, Input: IonInput> StreamingRawReader<Encoding, Input> {
continue;
}
// If there's nothing available, return the result we got.
} else if let Ok(ref item) = result {
// We have successfully read something from the buffer.
//
// In binary encodings, stream items contain enough data for the reader to tell
// whether they are complete.
//
// In text encodings, it's possible for the buffer to end with data that looks like
// a complete item but is not. The only way to be certain is to try to read again
// from the input source to confirm there's no more data. Consider the following
// examples in which Ion is being pulled from a `File` into a `Vec<u8>`:
//
// foo /* comment */ ::bar::baz::1000
// └────────┬───────┘ └────────┬───────┘
// buffer contents remaining in File
//
// $ion _1_0
// └────────┬───────┘ └────────┬───────┘
// buffer contents remaining in File
//
// 75 1.20
// └────────┬───────┘ └────────┬───────┘
// buffer contents remaining in File
//
// To avoid this, we perform a final check for text readers who have emptied their
// buffer: we do not consider the item complete unless the input source is exhausted.
if encoding.is_text()
&& bytes_read == available_bytes.len()
&& !input_source_exhausted
{
use crate::lazy::raw_stream_item::RawStreamItem::*;
match item {
// Text containers and e-expressions have closing delimiters that allow us
// to tell that they're complete.
Value(v) if v.ion_type().is_container() => {}
EExpression(_eexp) => {}
// IVMs (which look like symbols), scalar values, and the end of the
// stream are all cases where the reader looking at a fixed slice of the
// buffer may reach the wrong conclusion.
_ => {
// Try to pull more data from the input source. This invalidates the `result`
// variable because `fill_buffer()` may cause the buffer to be reallocated,
// so we start this iteration over. This results in the last value being parsed
// a second time from the (potentially updated) buffer.
if input.fill_buffer()? == 0 {
input_source_exhausted = true;
}
continue;
}
}
}

// Mark those input bytes as having been consumed so they are not read again.
input.consume(bytes_read);
// Update the streaming reader's position to reflect the number of bytes we
// just read.
self.stream_position = end_position;
}
// Mark those input bytes as having been consumed so they are not read again.
input.consume(bytes_read);
// Update the streaming reader's position to reflect the number of bytes we
// just read.
self.stream_position = end_position;

return result;
}
Expand Down Expand Up @@ -229,7 +282,7 @@ pub struct IonStream<R: Read> {
impl<R: Read> IonStream<R> {
const DEFAULT_IO_BUFFER_SIZE: usize = 4 * 1024;

pub(crate) fn new(input: R) -> Self {
pub fn new(input: R) -> Self {
IonStream {
input,
buffer: vec![0u8; Self::DEFAULT_IO_BUFFER_SIZE],
Expand Down Expand Up @@ -365,9 +418,18 @@ impl<T: Read, U: Read> IonInput for io::Chain<T, U> {
}
}

impl IonInput for Box<dyn Read> {
type DataSource = IonStream<Self>;

fn into_data_source(self) -> Self::DataSource {
IonStream::new(self)
}
}

#[cfg(test)]
mod tests {
use std::io::{BufReader, Cursor};
use std::io;
use std::io::{BufReader, Cursor, Read};

use bumpalo::Bump as BumpAllocator;

Expand All @@ -376,7 +438,8 @@ mod tests {
use crate::lazy::raw_stream_item::LazyRawStreamItem;
use crate::lazy::raw_value_ref::RawValueRef;
use crate::lazy::streaming_raw_reader::{IonInput, StreamingRawReader};
use crate::{IonError, IonResult};
use crate::raw_symbol_ref::AsRawSymbolRef;
use crate::{v1_0, Decimal, IonError, IonResult, IonStream, RawSymbolRef, RawVersionMarker};

fn expect_value<'a, D: Decoder>(
actual: LazyRawStreamItem<'a, D>,
Expand Down Expand Up @@ -485,4 +548,56 @@ mod tests {
read_invalid_example_stream(slice)?;
read_invalid_example_stream(vec)
}

#[test]
fn incomplete_trailing_values() -> IonResult<()> {
// Each read() call will return these UTF-8 byte sequences in turn:
let input_chunks = [
"$ion", // $ion_1_0
"_1_0",
" 87", // 871.25
"1.25",
" foo ", // foo::bar::baz::quux
" ::bar :",
":baz",
"::quux",
];
// We achieve this by wrapping each string in an `io::Chain`.
let mut input: Box<dyn Read> = Box::new(io::empty());
for input_chunk in input_chunks {
input = Box::new(input.chain(Cursor::new(input_chunk)));
}
// This guarantees that there are several intermediate reading states in which the buffer
// contains incomplete data that could be misinterpreted by a reader.
let allocator = BumpAllocator::new();
let mut reader = StreamingRawReader::new(v1_0::Text, IonStream::new(input));

assert_eq!(reader.next(&allocator)?.expect_ivm()?.version(), (1, 0));
assert_eq!(
reader
.next(&allocator)?
.expect_value()?
.read()?
.expect_decimal()?,
Decimal::new(87125, -2)
);
let value = reader.next(&allocator)?.expect_value()?;
let annotations = value
.annotations()
.collect::<IonResult<Vec<RawSymbolRef>>>()?;
assert_eq!(
annotations,
vec![
"foo".as_raw_symbol_token_ref(),
"bar".as_raw_symbol_token_ref(),
"baz".as_raw_symbol_token_ref(),
]
);
assert_eq!(
value.read()?.expect_symbol()?,
"quux".as_raw_symbol_token_ref()
);

Ok(())
}
}
11 changes: 10 additions & 1 deletion src/lazy/text/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,16 @@ impl<'top> TextBufferView<'top> {
terminated(
whitespace_and_then(match_and_span(Self::match_symbol)),
whitespace_and_then(terminated(
complete_tag("::"),
// The complete_tag/tag pair allows the parser to recognize that:
//
// foo::bar::baz:
//
// is incomplete while:
//
// foo::bar::baz
//
// is a symbol with two annotations.
pair(complete_tag(":"), tag(":")),
Self::match_optional_comments_and_whitespace,
)),
)(self)
Expand Down
Loading

0 comments on commit 62d0755

Please sign in to comment.