Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): Add NestedCheckpointReader for input format parser #6385

Merged
merged 8 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions common/datavalues/src/types/deserializations/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ impl TypeDeserializer for ArrayDeserializer {
}
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'[')?;
let mut idx = 0;
loop {
Expand Down Expand Up @@ -107,7 +111,7 @@ impl TypeDeserializer for ArrayDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'[')?;
Expand Down Expand Up @@ -137,7 +141,7 @@ impl TypeDeserializer for ArrayDeserializer {

fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> {
let reader = BufferReader::new(reader);
let mut reader = CheckpointReader::new(Box::new(reader));
let mut reader = NestedCheckpointReader::new(Box::new(reader));
self.de_text(&mut reader, format)
}

Expand Down
6 changes: 5 additions & 1 deletion common/datavalues/src/types/deserializations/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ impl TypeDeserializer for BooleanDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let v = if BufferReadExt::ignore_insensitive_bytes(reader, b"true")? {
Ok(true)
} else if BufferReadExt::ignore_insensitive_bytes(reader, b"false")? {
Expand Down
12 changes: 8 additions & 4 deletions common/datavalues/src/types/deserializations/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'\'')?;
Expand All @@ -100,7 +100,11 @@ where
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let date = reader.read_date_text()?;
let days = uniform(date);
check_date(days.as_i32())?;
Expand All @@ -110,7 +114,7 @@ where

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;
Expand All @@ -126,7 +130,7 @@ where

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'"')?;
Expand Down
12 changes: 8 additions & 4 deletions common/datavalues/src/types/deserializations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,31 @@ pub trait TypeDeserializer: Send + Sync {

fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()>;

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()>;
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()>;

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
self.de_text(reader, format)
}

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
self.de_text(reader, format)
}

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
self.de_text(reader, format)
Expand Down
6 changes: 5 additions & 1 deletion common/datavalues/src/types/deserializations/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ impl TypeDeserializer for NullDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, _reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
_reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.builder.append_default();
Ok(())
}
Expand Down
53 changes: 36 additions & 17 deletions common/datavalues/src/types/deserializations/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl TypeDeserializer for NullableDeserializer {

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
if reader.ignore_insensitive_bytes(b"null")? {
Expand All @@ -84,47 +84,66 @@ impl TypeDeserializer for NullableDeserializer {
}

// TODO: support null text setting
fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
if reader.ignore_insensitive_bytes(b"null")? {
reader.pop_checkpoint();
youngsofun marked this conversation as resolved.
Show resolved Hide resolved
self.de_default(format);
return Ok(());
} else {
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text(reader, format)?;
self.bitmap.push(true);
}
self.inner.de_text(reader, format)?;
self.bitmap.push(true);

Ok(())
}

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
reader.pop_checkpoint();
self.de_default(format);
return Ok(());
} else {
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text_quoted(reader, format)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can not construct a tmp CheckpointReader only when needed because the recursive call to de_text_quoted() , compiler complain about potential type CheckpointReader<CheckpointReader<CheckpointReader<...>>>

self.bitmap.push(true);
}
self.inner.de_text_quoted(reader, format)?;
self.bitmap.push(true);
Ok(())
}

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
if reader.eof()? {
self.de_default(format);
} else {
reader.ignore_insensitive_bytes(b"null")?;
let buffer = reader.fill_buf()?;
if !buffer.is_empty() && (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b',')
{
self.de_default(format);
} else {
self.inner.de_text_csv(reader, format)?;
self.bitmap.push(true);
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
let buffer = reader.fill_buf()?;
if !buffer.is_empty()
&& (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b',')
{
self.de_default(format);
reader.pop_checkpoint();
return Ok(());
}
}
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text_csv(reader, format)?;
self.bitmap.push(true);
}
Ok(())
}
Expand Down
8 changes: 6 additions & 2 deletions common/datavalues/src/types/deserializations/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ where
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let v: T = if !T::FLOATING {
reader.read_int_text()
} else {
Expand All @@ -101,7 +105,7 @@ where

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_settings: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;
Expand Down
10 changes: 7 additions & 3 deletions common/datavalues/src/types/deserializations/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ impl TypeDeserializer for StringDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_escaped_string_text(&mut self.buffer)?;
self.builder.append_value(self.buffer.as_slice());
Expand All @@ -93,7 +97,7 @@ impl TypeDeserializer for StringDeserializer {

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
Expand All @@ -104,7 +108,7 @@ impl TypeDeserializer for StringDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
settings: &FormatSettings,
) -> Result<()> {
let mut read_buffer = reader.fill_buf()?;
Expand Down
8 changes: 6 additions & 2 deletions common/datavalues/src/types/deserializations/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl TypeDeserializer for StructDeserializer {
}
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'(')?;
let mut values = Vec::with_capacity(self.inners.len());
for (idx, inner) in self.inners.iter_mut().enumerate() {
Expand All @@ -100,7 +104,7 @@ impl TypeDeserializer for StructDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
_reader: &mut R,
_reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
Err(ErrorCode::UnImplement("Unimplement error"))
Expand Down
12 changes: 8 additions & 4 deletions common/datavalues/src/types/deserializations/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TypeDeserializer for TimestampDeserializer {

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'\'')?;
Expand All @@ -92,7 +92,11 @@ impl TypeDeserializer for TimestampDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
let ts = reader.read_timestamp_text(&format.timezone)?;
let micros = ts.timestamp_micros();
check_timestamp(micros)?;
Expand All @@ -102,7 +106,7 @@ impl TypeDeserializer for TimestampDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;
Expand All @@ -118,7 +122,7 @@ impl TypeDeserializer for TimestampDeserializer {

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'"')?;
Expand Down
10 changes: 7 additions & 3 deletions common/datavalues/src/types/deserializations/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl TypeDeserializer for VariantDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_escaped_string_text(&mut self.buffer)?;
let val = serde_json::from_slice(self.buffer.as_slice())?;
Expand All @@ -91,7 +95,7 @@ impl TypeDeserializer for VariantDeserializer {

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
Expand All @@ -105,7 +109,7 @@ impl TypeDeserializer for VariantDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
Expand Down
Loading