Skip to content

Commit

Permalink
Merge pull request #6385 from youngsofun/fmt
Browse files Browse the repository at this point in the history
fix(query):  Add NestedCheckpointReader for input format parser
  • Loading branch information
databend-bot authored Jul 1, 2022
2 parents e3d3f09 + 2674061 commit bff565e
Show file tree
Hide file tree
Showing 28 changed files with 395 additions and 185 deletions.
10 changes: 7 additions & 3 deletions common/datavalues/src/types/deserializations/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ impl TypeDeserializer for ArrayDeserializer {
}
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'[')?;
let mut idx = 0;
loop {
Expand Down Expand Up @@ -107,7 +111,7 @@ impl TypeDeserializer for ArrayDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'[')?;
Expand Down Expand Up @@ -137,7 +141,7 @@ impl TypeDeserializer for ArrayDeserializer {

fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> {
let reader = BufferReader::new(reader);
let mut reader = CheckpointReader::new(Box::new(reader));
let mut reader = NestedCheckpointReader::new(Box::new(reader));
self.de_text(&mut reader, format)
}

Expand Down
6 changes: 5 additions & 1 deletion common/datavalues/src/types/deserializations/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ impl TypeDeserializer for BooleanDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let v = if BufferReadExt::ignore_insensitive_bytes(reader, b"true")? {
Ok(true)
} else if BufferReadExt::ignore_insensitive_bytes(reader, b"false")? {
Expand Down
12 changes: 8 additions & 4 deletions common/datavalues/src/types/deserializations/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'\'')?;
Expand All @@ -100,7 +100,11 @@ where
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let date = reader.read_date_text()?;
let days = uniform(date);
check_date(days.as_i32())?;
Expand All @@ -110,7 +114,7 @@ where

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;
Expand All @@ -126,7 +130,7 @@ where

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'"')?;
Expand Down
12 changes: 8 additions & 4 deletions common/datavalues/src/types/deserializations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,31 @@ pub trait TypeDeserializer: Send + Sync {

fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()>;

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()>;
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()>;

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
self.de_text(reader, format)
}

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
self.de_text(reader, format)
}

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
self.de_text(reader, format)
Expand Down
6 changes: 5 additions & 1 deletion common/datavalues/src/types/deserializations/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ impl TypeDeserializer for NullDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, _reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
_reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.builder.append_default();
Ok(())
}
Expand Down
58 changes: 41 additions & 17 deletions common/datavalues/src/types/deserializations/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,60 +71,84 @@ impl TypeDeserializer for NullableDeserializer {

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
reader.pop_checkpoint();
self.de_default(format);
return Ok(());
}
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text_json(reader, format)?;
self.bitmap.push(true);
Ok(())
}

// TODO: support null text setting
fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
reader.pop_checkpoint();
self.de_default(format);
return Ok(());
} else {
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text(reader, format)?;
self.bitmap.push(true);
}
self.inner.de_text(reader, format)?;
self.bitmap.push(true);

Ok(())
}

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
reader.pop_checkpoint();
self.de_default(format);
return Ok(());
} else {
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text_quoted(reader, format)?;
self.bitmap.push(true);
}
self.inner.de_text_quoted(reader, format)?;
self.bitmap.push(true);
Ok(())
}

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
if reader.eof()? {
self.de_default(format);
} else {
reader.ignore_insensitive_bytes(b"null")?;
let buffer = reader.fill_buf()?;
if !buffer.is_empty() && (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b',')
{
self.de_default(format);
} else {
self.inner.de_text_csv(reader, format)?;
self.bitmap.push(true);
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
let buffer = reader.fill_buf()?;
if !buffer.is_empty()
&& (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b',')
{
self.de_default(format);
reader.pop_checkpoint();
return Ok(());
}
}
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text_csv(reader, format)?;
self.bitmap.push(true);
}
Ok(())
}
Expand Down
8 changes: 6 additions & 2 deletions common/datavalues/src/types/deserializations/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ where
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
let v: T = if !T::FLOATING {
reader.read_int_text()
} else {
Expand All @@ -101,7 +105,7 @@ where

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_settings: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;
Expand Down
10 changes: 7 additions & 3 deletions common/datavalues/src/types/deserializations/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ impl TypeDeserializer for StringDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_escaped_string_text(&mut self.buffer)?;
self.builder.append_value(self.buffer.as_slice());
Expand All @@ -93,7 +97,7 @@ impl TypeDeserializer for StringDeserializer {

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
Expand All @@ -104,7 +108,7 @@ impl TypeDeserializer for StringDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
settings: &FormatSettings,
) -> Result<()> {
let mut read_buffer = reader.fill_buf()?;
Expand Down
8 changes: 6 additions & 2 deletions common/datavalues/src/types/deserializations/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl TypeDeserializer for StructDeserializer {
}
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'(')?;
let mut values = Vec::with_capacity(self.inners.len());
for (idx, inner) in self.inners.iter_mut().enumerate() {
Expand All @@ -100,7 +104,7 @@ impl TypeDeserializer for StructDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
_reader: &mut R,
_reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
Err(ErrorCode::UnImplement("Unimplement error"))
Expand Down
12 changes: 8 additions & 4 deletions common/datavalues/src/types/deserializations/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TypeDeserializer for TimestampDeserializer {

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'\'')?;
Expand All @@ -92,7 +92,11 @@ impl TypeDeserializer for TimestampDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
let ts = reader.read_timestamp_text(&format.timezone)?;
let micros = ts.timestamp_micros();
check_timestamp(micros)?;
Expand All @@ -102,7 +106,7 @@ impl TypeDeserializer for TimestampDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;
Expand All @@ -118,7 +122,7 @@ impl TypeDeserializer for TimestampDeserializer {

fn de_text_json<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'"')?;
Expand Down
10 changes: 7 additions & 3 deletions common/datavalues/src/types/deserializations/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl TypeDeserializer for VariantDeserializer {
Ok(())
}

fn de_text<R: BufferRead>(&mut self, reader: &mut R, _format: &FormatSettings) -> Result<()> {
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_escaped_string_text(&mut self.buffer)?;
let val = serde_json::from_slice(self.buffer.as_slice())?;
Expand All @@ -91,7 +95,7 @@ impl TypeDeserializer for VariantDeserializer {

fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
Expand All @@ -105,7 +109,7 @@ impl TypeDeserializer for VariantDeserializer {

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut R,
reader: &mut NestedCheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
Expand Down
Loading

0 comments on commit bff565e

Please sign in to comment.