From da6bf79c27b053e6324d2b8f631bfc33fa7eee24 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 11 Nov 2022 12:50:49 +0800 Subject: [PATCH 01/14] refactor(TypeDeserializer): rename uniform() to uniform_date() and make it pub. --- .../datavalues/src/types/deserializations/date.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/datavalues/src/types/deserializations/date.rs b/src/query/datavalues/src/types/deserializations/date.rs index 20074f4bf3c9..2fc4932a9100 100644 --- a/src/query/datavalues/src/types/deserializations/date.rs +++ b/src/query/datavalues/src/types/deserializations/date.rs @@ -75,7 +75,7 @@ where serde_json::Value::String(v) => { let mut reader = Cursor::new(v.as_bytes()); let date = reader.read_date_text(&format.timezone)?; - let days = uniform(date); + let days = uniform_date(date); check_date(days.as_i32())?; self.builder.append_value(days); Ok(()) @@ -87,7 +87,7 @@ where fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> { let mut reader = Cursor::new(reader); let date = reader.read_date_text(&format.timezone)?; - let days = uniform(date); + let days = uniform_date(date); check_date(days.as_i32())?; reader.must_eof()?; self.builder.append_value(days); @@ -105,7 +105,7 @@ where if date.is_err() { return Err(date.err().unwrap()); } - let days = uniform(date.unwrap()); + let days = uniform_date(date.unwrap()); check_date(days.as_i32())?; self.builder.append_value(days); @@ -118,7 +118,7 @@ where format: &FormatSettings, ) -> Result<()> { let date = reader.read_date_text(&format.timezone)?; - let days = uniform(date); + let days = uniform_date(date); check_date(days.as_i32())?; self.builder.append_value(days); Ok(()) @@ -156,7 +156,7 @@ where } #[inline] -fn uniform(date: NaiveDate) -> T +pub fn uniform_date(date: NaiveDate) -> T where i32: AsPrimitive, T: PrimitiveType, From f943005d9e17075630ae3832d8900bdd92d996df Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 11 Nov 2022 12:52:39 +0800 Subject: [PATCH 02/14] refactor(datavalues): TypeDeserializer::de_default do not need arg FormatSettings. --- .../src/types/deserializations/array.rs | 2 +- .../src/types/deserializations/boolean.rs | 2 +- .../src/types/deserializations/date.rs | 4 ++-- .../src/types/deserializations/mod.rs | 2 +- .../src/types/deserializations/null.rs | 2 +- .../src/types/deserializations/nullable.rs | 22 +++++++++---------- .../src/types/deserializations/number.rs | 2 +- .../src/types/deserializations/string.rs | 2 +- .../src/types/deserializations/struct_.rs | 2 +- .../src/types/deserializations/timestamp.rs | 2 +- .../src/types/deserializations/variant.rs | 2 +- .../input_formats/impls/input_format_csv.rs | 2 +- .../input_formats/impls/input_format_tsv.rs | 2 +- .../input_formats/impls/input_format_xml.rs | 4 ++-- 14 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/query/datavalues/src/types/deserializations/array.rs b/src/query/datavalues/src/types/deserializations/array.rs index 6c9ae4bf8bc7..4cf8affa60a3 100644 --- a/src/query/datavalues/src/types/deserializations/array.rs +++ b/src/query/datavalues/src/types/deserializations/array.rs @@ -44,7 +44,7 @@ impl TypeDeserializer for ArrayDeserializer { Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_default(); } diff --git a/src/query/datavalues/src/types/deserializations/boolean.rs b/src/query/datavalues/src/types/deserializations/boolean.rs index 54516ffd6e05..ea7b0e3468d7 100644 --- a/src/query/datavalues/src/types/deserializations/boolean.rs +++ b/src/query/datavalues/src/types/deserializations/boolean.rs @@ -37,7 +37,7 @@ impl TypeDeserializer for BooleanDeserializer { Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_value(false); } diff --git a/src/query/datavalues/src/types/deserializations/date.rs b/src/query/datavalues/src/types/deserializations/date.rs index 2fc4932a9100..cbe808334227 100644 --- a/src/query/datavalues/src/types/deserializations/date.rs +++ b/src/query/datavalues/src/types/deserializations/date.rs @@ -50,7 +50,7 @@ where Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_value(T::default()); } @@ -131,7 +131,7 @@ where ) -> Result<()> { reader.must_ignore_byte(b'"')?; let date = reader.read_date_text(&format.timezone)?; - let days = uniform(date); + let days = uniform_date(date); check_date(days.as_i32())?; reader.must_ignore_byte(b'"')?; diff --git a/src/query/datavalues/src/types/deserializations/mod.rs b/src/query/datavalues/src/types/deserializations/mod.rs index 1168c7ee688f..5b028ebd557d 100644 --- a/src/query/datavalues/src/types/deserializations/mod.rs +++ b/src/query/datavalues/src/types/deserializations/mod.rs @@ -48,7 +48,7 @@ pub trait TypeDeserializer: Send + Sync { fn de_binary(&mut self, reader: &mut &[u8], format: &FormatSettings) -> Result<()>; - fn de_default(&mut self, format: &FormatSettings); + fn de_default(&mut self); fn de_fixed_binary_batch( &mut self, diff --git a/src/query/datavalues/src/types/deserializations/null.rs b/src/query/datavalues/src/types/deserializations/null.rs index c7e91b7738fa..c485f023f965 100644 --- a/src/query/datavalues/src/types/deserializations/null.rs +++ b/src/query/datavalues/src/types/deserializations/null.rs @@ -37,7 +37,7 @@ impl TypeDeserializer for NullDeserializer { Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_default(); } diff --git a/src/query/datavalues/src/types/deserializations/nullable.rs b/src/query/datavalues/src/types/deserializations/nullable.rs index 3d4bb45ec919..ba6dfd7c30f8 100644 --- a/src/query/datavalues/src/types/deserializations/nullable.rs +++ b/src/query/datavalues/src/types/deserializations/nullable.rs @@ -42,14 +42,14 @@ impl TypeDeserializer for NullableDeserializer { if valid { self.inner.de_binary(reader, format)?; } else { - self.inner.de_default(format); + self.inner.de_default(); } self.bitmap.push(valid); Ok(()) } - fn de_default(&mut self, format: &FormatSettings) { - self.inner.de_default(format); + fn de_default(&mut self) { + self.inner.de_default(); self.bitmap.push(false); } @@ -83,7 +83,7 @@ impl TypeDeserializer for NullableDeserializer { format: &FormatSettings, ) -> Result<()> { if reader.ignore_insensitive_bytes(&format.null_bytes) { - self.de_default(format); + self.de_default(); return Ok(()); } self.inner.de_text_json(reader, format)?; @@ -97,14 +97,14 @@ impl TypeDeserializer for NullableDeserializer { format: &FormatSettings, ) -> Result<()> { if reader.eof() { - self.de_default(format); + self.de_default(); } else { if reader.ignore_insensitive_bytes(&format.null_bytes) { let buffer = reader.remaining_slice(); if buffer.is_empty() || (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b'\t') { - self.de_default(format); + self.de_default(); return Ok(()); } } @@ -120,7 +120,7 @@ impl TypeDeserializer for NullableDeserializer { format: &FormatSettings, ) -> Result<()> { if reader.ignore_insensitive_bytes(&format.null_bytes) { - self.de_default(format); + self.de_default(); } else { self.inner.de_text_quoted(reader, format)?; self.bitmap.push(true); @@ -130,7 +130,7 @@ impl TypeDeserializer for NullableDeserializer { fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> { if reader.eq_ignore_ascii_case(&format.null_bytes) { - self.de_default(format); + self.de_default(); return Ok(()); } @@ -139,15 +139,15 @@ impl TypeDeserializer for NullableDeserializer { Ok(()) } - fn de_null(&mut self, format: &FormatSettings) -> bool { - self.inner.de_default(format); + fn de_null(&mut self, _format: &FormatSettings) -> bool { + self.inner.de_default(); self.bitmap.push(false); true } fn append_data_value(&mut self, value: DataValue, format: &FormatSettings) -> Result<()> { if value.is_null() { - self.inner.de_default(format); + self.inner.de_default(); self.bitmap.push(false); } else { self.inner.append_data_value(value, format)?; diff --git a/src/query/datavalues/src/types/deserializations/number.rs b/src/query/datavalues/src/types/deserializations/number.rs index e2766ad8369d..0b0d369f474a 100644 --- a/src/query/datavalues/src/types/deserializations/number.rs +++ b/src/query/datavalues/src/types/deserializations/number.rs @@ -43,7 +43,7 @@ where Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_value(T::default()); } diff --git a/src/query/datavalues/src/types/deserializations/string.rs b/src/query/datavalues/src/types/deserializations/string.rs index 872b73979e8e..7e2e4d5ba664 100644 --- a/src/query/datavalues/src/types/deserializations/string.rs +++ b/src/query/datavalues/src/types/deserializations/string.rs @@ -57,7 +57,7 @@ impl TypeDeserializer for StringDeserializer { Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_value(""); } diff --git a/src/query/datavalues/src/types/deserializations/struct_.rs b/src/query/datavalues/src/types/deserializations/struct_.rs index 2f186c2aa809..49546844f1e8 100644 --- a/src/query/datavalues/src/types/deserializations/struct_.rs +++ b/src/query/datavalues/src/types/deserializations/struct_.rs @@ -41,7 +41,7 @@ impl TypeDeserializer for StructDeserializer { self.builder.append_data_value(DataValue::Struct(values)) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_default(); } diff --git a/src/query/datavalues/src/types/deserializations/timestamp.rs b/src/query/datavalues/src/types/deserializations/timestamp.rs index e871d4cab0f9..184b50396154 100644 --- a/src/query/datavalues/src/types/deserializations/timestamp.rs +++ b/src/query/datavalues/src/types/deserializations/timestamp.rs @@ -38,7 +38,7 @@ impl TypeDeserializer for TimestampDeserializer { Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder.append_value(i64::default()); } diff --git a/src/query/datavalues/src/types/deserializations/variant.rs b/src/query/datavalues/src/types/deserializations/variant.rs index c79aecc51b49..9576241bd320 100644 --- a/src/query/datavalues/src/types/deserializations/variant.rs +++ b/src/query/datavalues/src/types/deserializations/variant.rs @@ -59,7 +59,7 @@ impl TypeDeserializer for VariantDeserializer { Ok(()) } - fn de_default(&mut self, _format: &FormatSettings) { + fn de_default(&mut self) { self.builder .append_value(VariantValue::from(serde_json::Value::Null)); } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index 7cc89f31b48d..a446f04740cf 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -53,7 +53,7 @@ impl InputFormatCSV { let col_data = &buf[field_start..field_end]; let mut reader = Cursor::new(col_data); if reader.eof() { - deserializer.de_default(format_settings); + deserializer.de_default(); } else { // todo(youngsofun): do not need escape, already done in csv-core if let Err(e) = deserializer.de_text(&mut reader, format_settings) { diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index 6ea4b19a41ad..c013e58b32dd 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -52,7 +52,7 @@ impl InputFormatTSV { if pos == buf_len || buf[pos] == b'\t' { let col_data = &buf[field_start..pos]; if col_data.is_empty() { - deserializers[column_index].de_default(format_settings); + deserializers[column_index].de_default(); } else { let mut reader = Cursor::new(col_data); reader.ignores(|c: u8| c == b' '); diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs index 3249044fa41b..c615c8bcff91 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs @@ -63,7 +63,7 @@ impl InputFormatXML { if let Some(value) = value { let mut reader = Cursor::new(&**value); if reader.eof() { - deserializer.de_default(format_settings); + deserializer.de_default(); } else { if let Err(e) = deserializer.de_text(&mut reader, format_settings) { let value_str = format!("{:?}", value); @@ -78,7 +78,7 @@ impl InputFormatXML { } } } else { - deserializer.de_default(format_settings); + deserializer.de_default(); } } Ok(()) From dde922aea9fd9fac47e9597fd2a8af81b011bc50 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 11 Nov 2022 12:53:30 +0800 Subject: [PATCH 03/14] refactor(format): mv CommonSettings to the root of the crate to reuse it in decoder too. --- src/query/formats/src/common_settings.rs | 23 +++++++++++++++++++ src/query/formats/src/field_encoder/csv.rs | 2 +- .../formats/src/field_encoder/helpers/mod.rs | 10 -------- .../field_encoder/helpers/number_helpers.rs | 2 +- src/query/formats/src/field_encoder/json.rs | 2 +- src/query/formats/src/field_encoder/mod.rs | 1 - .../formats/src/field_encoder/row_based.rs | 2 +- src/query/formats/src/field_encoder/tsv.rs | 2 +- src/query/formats/src/field_encoder/values.rs | 2 +- src/query/formats/src/lib.rs | 3 +++ 10 files changed, 32 insertions(+), 17 deletions(-) create mode 100644 src/query/formats/src/common_settings.rs diff --git a/src/query/formats/src/common_settings.rs b/src/query/formats/src/common_settings.rs new file mode 100644 index 000000000000..8d72d0e9a350 --- /dev/null +++ b/src/query/formats/src/common_settings.rs @@ -0,0 +1,23 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono_tz::Tz; +pub struct CommonSettings { + pub true_bytes: Vec, + pub false_bytes: Vec, + pub null_bytes: Vec, + pub nan_bytes: Vec, + pub inf_bytes: Vec, + pub timezone: Tz, +} diff --git a/src/query/formats/src/field_encoder/csv.rs b/src/query/formats/src/field_encoder/csv.rs index 66cf32ff92b9..348bf877ed77 100644 --- a/src/query/formats/src/field_encoder/csv.rs +++ b/src/query/formats/src/field_encoder/csv.rs @@ -20,9 +20,9 @@ use common_io::consts::NAN_BYTES_LOWER; use common_io::consts::NULL_BYTES_ESCAPE; use common_io::consts::TRUE_BYTES_LOWER; -use crate::field_encoder::CommonSettings; use crate::field_encoder::FieldEncoderRowBased; use crate::field_encoder::FieldEncoderValues; +use crate::CommonSettings; use crate::FileFormatOptionsExt; pub struct FieldEncoderCSV { diff --git a/src/query/formats/src/field_encoder/helpers/mod.rs b/src/query/formats/src/field_encoder/helpers/mod.rs index 26fe68a6e9a6..6108c0918d7f 100644 --- a/src/query/formats/src/field_encoder/helpers/mod.rs +++ b/src/query/formats/src/field_encoder/helpers/mod.rs @@ -15,15 +15,5 @@ mod json; mod number_helpers; -use chrono_tz::Tz; pub use json::write_json_string; pub use number_helpers::PrimitiveWithFormat; - -pub struct CommonSettings { - pub true_bytes: Vec, - pub false_bytes: Vec, - pub null_bytes: Vec, - pub nan_bytes: Vec, - pub inf_bytes: Vec, - pub timezone: Tz, -} diff --git a/src/query/formats/src/field_encoder/helpers/number_helpers.rs b/src/query/formats/src/field_encoder/helpers/number_helpers.rs index e3eb9f3cf26b..1672118b8c55 100644 --- a/src/query/formats/src/field_encoder/helpers/number_helpers.rs +++ b/src/query/formats/src/field_encoder/helpers/number_helpers.rs @@ -14,7 +14,7 @@ use std::num::FpCategory; -use crate::field_encoder::helpers::CommonSettings; +use crate::CommonSettings; // 30% faster lexical_core::write to tmp buf and extend_from_slice #[inline] diff --git a/src/query/formats/src/field_encoder/json.rs b/src/query/formats/src/field_encoder/json.rs index e83d14a360d6..28a5a9c7755c 100644 --- a/src/query/formats/src/field_encoder/json.rs +++ b/src/query/formats/src/field_encoder/json.rs @@ -19,9 +19,9 @@ use common_io::consts::NULL_BYTES_LOWER; use common_io::consts::TRUE_BYTES_LOWER; use crate::field_encoder::helpers::write_json_string; -use crate::field_encoder::CommonSettings; use crate::field_encoder::FieldEncoderRowBased; use crate::field_encoder::FieldEncoderValues; +use crate::CommonSettings; use crate::FileFormatOptionsExt; pub struct FieldEncoderJSON { diff --git a/src/query/formats/src/field_encoder/mod.rs b/src/query/formats/src/field_encoder/mod.rs index 72f01e818798..96adaa970444 100644 --- a/src/query/formats/src/field_encoder/mod.rs +++ b/src/query/formats/src/field_encoder/mod.rs @@ -20,7 +20,6 @@ mod tsv; mod values; pub use csv::FieldEncoderCSV; -pub use helpers::CommonSettings; pub use json::FieldEncoderJSON; pub use row_based::FieldEncoderRowBased; pub use tsv::FieldEncoderTSV; diff --git a/src/query/formats/src/field_encoder/row_based.rs b/src/query/formats/src/field_encoder/row_based.rs index d86010542ab6..eff679c6dbdf 100644 --- a/src/query/formats/src/field_encoder/row_based.rs +++ b/src/query/formats/src/field_encoder/row_based.rs @@ -30,7 +30,7 @@ use micromarshal::Unmarshal; use num::cast::AsPrimitive; use crate::field_encoder::helpers::PrimitiveWithFormat; -use crate::field_encoder::CommonSettings; +use crate::CommonSettings; pub trait FieldEncoderRowBased { fn common_settings(&self) -> &CommonSettings; diff --git a/src/query/formats/src/field_encoder/tsv.rs b/src/query/formats/src/field_encoder/tsv.rs index 4f4bf03d1e67..6cb353553941 100644 --- a/src/query/formats/src/field_encoder/tsv.rs +++ b/src/query/formats/src/field_encoder/tsv.rs @@ -21,8 +21,8 @@ use common_io::consts::NAN_BYTES_LOWER; use common_io::consts::NULL_BYTES_ESCAPE; use common_io::consts::TRUE_BYTES_NUM; -use crate::field_encoder::CommonSettings; use crate::field_encoder::FieldEncoderRowBased; +use crate::CommonSettings; use crate::FileFormatOptionsExt; pub struct FieldEncoderTSV { diff --git a/src/query/formats/src/field_encoder/values.rs b/src/query/formats/src/field_encoder/values.rs index 39a9426fc585..db3629aec507 100644 --- a/src/query/formats/src/field_encoder/values.rs +++ b/src/query/formats/src/field_encoder/values.rs @@ -22,8 +22,8 @@ use common_io::consts::NAN_BYTES_LOWER; use common_io::consts::NULL_BYTES_UPPER; use common_io::consts::TRUE_BYTES_NUM; -use crate::field_encoder::CommonSettings; use crate::field_encoder::FieldEncoderRowBased; +use crate::CommonSettings; use crate::FileFormatOptionsExt; pub struct FieldEncoderValues { diff --git a/src/query/formats/src/lib.rs b/src/query/formats/src/lib.rs index 159d6bf2a7ae..524767c5e7c0 100644 --- a/src/query/formats/src/lib.rs +++ b/src/query/formats/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. mod clickhouse; +mod common_settings; pub mod field_encoder; mod file_format_type; pub mod output_format; @@ -20,3 +21,5 @@ pub mod output_format; pub use clickhouse::ClickhouseFormatType; pub use file_format_type::FileFormatOptionsExt; pub use file_format_type::FileFormatTypeExt; + +use crate::common_settings::CommonSettings; From 2c6ba61a49f8160558e26706f753b736e52de489 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 11 Nov 2022 15:34:17 +0800 Subject: [PATCH 04/14] chore(databvalues): remove unused TypeDeserializer::de_text_json. --- .../datavalues/src/types/deserializations/date.rs | 15 --------------- .../datavalues/src/types/deserializations/mod.rs | 8 -------- .../src/types/deserializations/nullable.rs | 14 -------------- .../src/types/deserializations/timestamp.rs | 15 --------------- 4 files changed, 52 deletions(-) diff --git a/src/query/datavalues/src/types/deserializations/date.rs b/src/query/datavalues/src/types/deserializations/date.rs index cbe808334227..d6875b390291 100644 --- a/src/query/datavalues/src/types/deserializations/date.rs +++ b/src/query/datavalues/src/types/deserializations/date.rs @@ -124,21 +124,6 @@ where Ok(()) } - fn de_text_json>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - reader.must_ignore_byte(b'"')?; - let date = reader.read_date_text(&format.timezone)?; - let days = uniform_date(date); - check_date(days.as_i32())?; - reader.must_ignore_byte(b'"')?; - - self.builder.append_value(days); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { let v = value.as_i64()? as i32; check_date(v)?; diff --git a/src/query/datavalues/src/types/deserializations/mod.rs b/src/query/datavalues/src/types/deserializations/mod.rs index 5b028ebd557d..75fa70378f58 100644 --- a/src/query/datavalues/src/types/deserializations/mod.rs +++ b/src/query/datavalues/src/types/deserializations/mod.rs @@ -72,14 +72,6 @@ pub trait TypeDeserializer: Send + Sync { format: &FormatSettings, ) -> Result<()>; - fn de_text_json>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - self.de_text(reader, format) - } - fn de_text_quoted>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/nullable.rs b/src/query/datavalues/src/types/deserializations/nullable.rs index ba6dfd7c30f8..784c1f954d98 100644 --- a/src/query/datavalues/src/types/deserializations/nullable.rs +++ b/src/query/datavalues/src/types/deserializations/nullable.rs @@ -77,20 +77,6 @@ impl TypeDeserializer for NullableDeserializer { } } - fn de_text_json>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - if reader.ignore_insensitive_bytes(&format.null_bytes) { - self.de_default(); - return Ok(()); - } - self.inner.de_text_json(reader, format)?; - self.bitmap.push(true); - Ok(()) - } - fn de_text>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/timestamp.rs b/src/query/datavalues/src/types/deserializations/timestamp.rs index 184b50396154..cd2a34755702 100644 --- a/src/query/datavalues/src/types/deserializations/timestamp.rs +++ b/src/query/datavalues/src/types/deserializations/timestamp.rs @@ -113,21 +113,6 @@ impl TypeDeserializer for TimestampDeserializer { Ok(()) } - fn de_text_json>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - reader.must_ignore_byte(b'"')?; - let ts = reader.read_timestamp_text(&format.timezone)?; - let micros = ts.timestamp_micros(); - check_timestamp(micros)?; - reader.must_ignore_byte(b'"')?; - - self.builder.append_value(micros.as_()); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { let v = value.as_i64()?; check_timestamp(v)?; From 33af46e4bed7e4d3603c2e2571178001cdd1146d Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 11 Nov 2022 15:38:56 +0800 Subject: [PATCH 05/14] chore(databvalues): remove unused TypeDeserializer::de_whole_text. --- .../datavalues/src/types/deserializations/array.rs | 5 ----- .../src/types/deserializations/boolean.rs | 11 ----------- .../datavalues/src/types/deserializations/date.rs | 10 ---------- .../datavalues/src/types/deserializations/mod.rs | 2 -- .../datavalues/src/types/deserializations/null.rs | 5 ----- .../src/types/deserializations/nullable.rs | 11 ----------- .../datavalues/src/types/deserializations/number.rs | 13 ------------- .../datavalues/src/types/deserializations/string.rs | 5 ----- .../src/types/deserializations/struct_.rs | 4 ---- .../src/types/deserializations/timestamp.rs | 10 ---------- .../src/types/deserializations/variant.rs | 6 ------ 11 files changed, 82 deletions(-) diff --git a/src/query/datavalues/src/types/deserializations/array.rs b/src/query/datavalues/src/types/deserializations/array.rs index 4cf8affa60a3..8fdfe4c91465 100644 --- a/src/query/datavalues/src/types/deserializations/array.rs +++ b/src/query/datavalues/src/types/deserializations/array.rs @@ -115,11 +115,6 @@ impl TypeDeserializer for ArrayDeserializer { Ok(()) } - fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> { - let mut reader = Cursor::new(reader); - self.de_text(&mut reader, format) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_data_value(value) } diff --git a/src/query/datavalues/src/types/deserializations/boolean.rs b/src/query/datavalues/src/types/deserializations/boolean.rs index ea7b0e3468d7..3b6309f31948 100644 --- a/src/query/datavalues/src/types/deserializations/boolean.rs +++ b/src/query/datavalues/src/types/deserializations/boolean.rs @@ -65,17 +65,6 @@ impl TypeDeserializer for BooleanDeserializer { Ok(()) } - fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> { - if reader.eq_ignore_ascii_case(b"true") { - self.builder.append_value(true); - } else if reader.eq_ignore_ascii_case(b"false") { - self.builder.append_value(false); - } else { - return Err(ErrorCode::BadBytes("Incorrect boolean value")); - } - Ok(()) - } - fn de_text>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/date.rs b/src/query/datavalues/src/types/deserializations/date.rs index d6875b390291..190e008bfa71 100644 --- a/src/query/datavalues/src/types/deserializations/date.rs +++ b/src/query/datavalues/src/types/deserializations/date.rs @@ -84,16 +84,6 @@ where } } - fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> { - let mut reader = Cursor::new(reader); - let date = reader.read_date_text(&format.timezone)?; - let days = uniform_date(date); - check_date(days.as_i32())?; - reader.must_eof()?; - self.builder.append_value(days); - Ok(()) - } - fn de_text_quoted>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/mod.rs b/src/query/datavalues/src/types/deserializations/mod.rs index 75fa70378f58..c2fe5e0ccb61 100644 --- a/src/query/datavalues/src/types/deserializations/mod.rs +++ b/src/query/datavalues/src/types/deserializations/mod.rs @@ -64,8 +64,6 @@ pub trait TypeDeserializer: Send + Sync { false } - fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()>; - fn de_text>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/null.rs b/src/query/datavalues/src/types/deserializations/null.rs index c485f023f965..da2db4f23bf7 100644 --- a/src/query/datavalues/src/types/deserializations/null.rs +++ b/src/query/datavalues/src/types/deserializations/null.rs @@ -59,11 +59,6 @@ impl TypeDeserializer for NullDeserializer { Ok(()) } - fn de_whole_text(&mut self, _reader: &[u8], _format: &FormatSettings) -> Result<()> { - self.builder.append_default(); - Ok(()) - } - fn de_text>( &mut self, _reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/nullable.rs b/src/query/datavalues/src/types/deserializations/nullable.rs index 784c1f954d98..75bf76749381 100644 --- a/src/query/datavalues/src/types/deserializations/nullable.rs +++ b/src/query/datavalues/src/types/deserializations/nullable.rs @@ -114,17 +114,6 @@ impl TypeDeserializer for NullableDeserializer { Ok(()) } - fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> { - if reader.eq_ignore_ascii_case(&format.null_bytes) { - self.de_default(); - return Ok(()); - } - - self.inner.de_whole_text(reader, format)?; - self.bitmap.push(true); - Ok(()) - } - fn de_null(&mut self, _format: &FormatSettings) -> bool { self.inner.de_default(); self.bitmap.push(false); diff --git a/src/query/datavalues/src/types/deserializations/number.rs b/src/query/datavalues/src/types/deserializations/number.rs index 0b0d369f474a..d2caf2ee0c7e 100644 --- a/src/query/datavalues/src/types/deserializations/number.rs +++ b/src/query/datavalues/src/types/deserializations/number.rs @@ -84,19 +84,6 @@ where false } - fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> { - let mut reader = Cursor::new(reader); - let v: T = if !T::FLOATING { - reader.read_int_text() - } else { - reader.read_float_text() - }?; - reader.must_eof()?; - - self.builder.append_value(v); - Ok(()) - } - fn de_text>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/string.rs b/src/query/datavalues/src/types/deserializations/string.rs index 7e2e4d5ba664..9ff4b6ad6f24 100644 --- a/src/query/datavalues/src/types/deserializations/string.rs +++ b/src/query/datavalues/src/types/deserializations/string.rs @@ -85,11 +85,6 @@ impl TypeDeserializer for StringDeserializer { } } - fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> { - self.builder.append_value(reader); - Ok(()) - } - fn de_text>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/struct_.rs b/src/query/datavalues/src/types/deserializations/struct_.rs index 49546844f1e8..77498fd139b7 100644 --- a/src/query/datavalues/src/types/deserializations/struct_.rs +++ b/src/query/datavalues/src/types/deserializations/struct_.rs @@ -109,10 +109,6 @@ impl TypeDeserializer for StructDeserializer { Ok(()) } - fn de_whole_text(&mut self, _reader: &[u8], _format: &FormatSettings) -> Result<()> { - Err(ErrorCode::Unimplemented("Unimplement error")) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_data_value(value) } diff --git a/src/query/datavalues/src/types/deserializations/timestamp.rs b/src/query/datavalues/src/types/deserializations/timestamp.rs index cd2a34755702..440cef31093c 100644 --- a/src/query/datavalues/src/types/deserializations/timestamp.rs +++ b/src/query/datavalues/src/types/deserializations/timestamp.rs @@ -91,16 +91,6 @@ impl TypeDeserializer for TimestampDeserializer { Ok(()) } - fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> { - let mut reader = Cursor::new(reader); - let ts = reader.read_timestamp_text(&format.timezone)?; - let micros = ts.timestamp_micros(); - check_timestamp(micros)?; - reader.must_eof()?; - self.builder.append_value(micros.as_()); - Ok(()) - } - fn de_text>( &mut self, reader: &mut Cursor, diff --git a/src/query/datavalues/src/types/deserializations/variant.rs b/src/query/datavalues/src/types/deserializations/variant.rs index 9576241bd320..694df6116fa1 100644 --- a/src/query/datavalues/src/types/deserializations/variant.rs +++ b/src/query/datavalues/src/types/deserializations/variant.rs @@ -86,12 +86,6 @@ impl TypeDeserializer for VariantDeserializer { Ok(()) } - fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> { - let val = serde_json::from_slice(reader)?; - self.builder.append_value(val); - Ok(()) - } - fn de_text>( &mut self, reader: &mut Cursor, From 994919d3406168c8eb12ca278e1cefeb2ddb29df Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 13 Nov 2022 22:53:53 +0800 Subject: [PATCH 06/14] refactor(datavalues): Date/TS Deserializers add internal buffer. --- src/query/datavalues/src/types/deserializations/date.rs | 1 + src/query/datavalues/src/types/deserializations/timestamp.rs | 1 + src/query/datavalues/src/types/type_date.rs | 1 + src/query/datavalues/src/types/type_interval.rs | 1 + src/query/datavalues/src/types/type_timestamp.rs | 1 + 5 files changed, 5 insertions(+) diff --git a/src/query/datavalues/src/types/deserializations/date.rs b/src/query/datavalues/src/types/deserializations/date.rs index 190e008bfa71..16bf8d904a54 100644 --- a/src/query/datavalues/src/types/deserializations/date.rs +++ b/src/query/datavalues/src/types/deserializations/date.rs @@ -28,6 +28,7 @@ use num::cast::AsPrimitive; use crate::prelude::*; pub struct DateDeserializer { + pub buffer: Vec, pub builder: MutablePrimitiveColumn, } diff --git a/src/query/datavalues/src/types/deserializations/timestamp.rs b/src/query/datavalues/src/types/deserializations/timestamp.rs index 440cef31093c..2e881771f129 100644 --- a/src/query/datavalues/src/types/deserializations/timestamp.rs +++ b/src/query/datavalues/src/types/deserializations/timestamp.rs @@ -23,6 +23,7 @@ use num::cast::AsPrimitive; use crate::columns::MutableColumn; use crate::prelude::*; pub struct TimestampDeserializer { + pub buffer: Vec, pub builder: MutablePrimitiveColumn, } diff --git a/src/query/datavalues/src/types/type_date.rs b/src/query/datavalues/src/types/type_date.rs index 37e541fce8bc..30c97b45bcad 100644 --- a/src/query/datavalues/src/types/type_date.rs +++ b/src/query/datavalues/src/types/type_date.rs @@ -100,6 +100,7 @@ impl DataType for DateType { fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { DateDeserializer:: { + buffer: vec![], builder: MutablePrimitiveColumn::::with_capacity(capacity), } .into() diff --git a/src/query/datavalues/src/types/type_interval.rs b/src/query/datavalues/src/types/type_interval.rs index 3c435e4c2066..b1b423048b0f 100644 --- a/src/query/datavalues/src/types/type_interval.rs +++ b/src/query/datavalues/src/types/type_interval.rs @@ -140,6 +140,7 @@ impl DataType for IntervalType { fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { DateDeserializer:: { + buffer: vec![], builder: MutablePrimitiveColumn::::with_capacity(capacity), } .into() diff --git a/src/query/datavalues/src/types/type_timestamp.rs b/src/query/datavalues/src/types/type_timestamp.rs index 758d3f6bae03..e1e99192d8c1 100644 --- a/src/query/datavalues/src/types/type_timestamp.rs +++ b/src/query/datavalues/src/types/type_timestamp.rs @@ -128,6 +128,7 @@ impl DataType for TimestampType { fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { TimestampDeserializer { + buffer: vec![], builder: MutablePrimitiveColumn::::with_capacity(capacity), } .into() From f89e5f6237738072b4cecfa6a21e65a5f112eda6 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 11 Nov 2022 12:55:48 +0800 Subject: [PATCH 07/14] refactor(format): add trait FieldEncoder. --- Cargo.lock | 2 + src/query/formats/Cargo.toml | 2 + src/query/formats/src/common_settings.rs | 2 + src/query/formats/src/field_decoder/csv.rs | 132 +++++++++ src/query/formats/src/field_decoder/mod.rs | 31 ++ .../formats/src/field_decoder/row_based.rs | 269 ++++++++++++++++++ src/query/formats/src/field_decoder/tsv.rs | 151 ++++++++++ src/query/formats/src/field_decoder/values.rs | 182 ++++++++++++ src/query/formats/src/field_decoder/xml.rs | 132 +++++++++ src/query/formats/src/lib.rs | 4 + 10 files changed, 907 insertions(+) create mode 100644 src/query/formats/src/field_decoder/csv.rs create mode 100644 src/query/formats/src/field_decoder/mod.rs create mode 100644 src/query/formats/src/field_decoder/row_based.rs create mode 100644 src/query/formats/src/field_decoder/tsv.rs create mode 100644 src/query/formats/src/field_decoder/values.rs create mode 100644 src/query/formats/src/field_decoder/xml.rs diff --git a/Cargo.lock b/Cargo.lock index 10b38dc3113f..4419db31e5d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1421,6 +1421,7 @@ dependencies = [ name = "common-formats" version = "0.1.0" dependencies = [ + "bstr 1.0.1", "chrono-tz", "common-arrow", "common-datablocks", @@ -1434,6 +1435,7 @@ dependencies = [ "num", "once_cell", "pretty_assertions", + "serde_json", ] [[package]] diff --git a/src/query/formats/Cargo.toml b/src/query/formats/Cargo.toml index 7851a501c182..f96072991991 100644 --- a/src/query/formats/Cargo.toml +++ b/src/query/formats/Cargo.toml @@ -11,10 +11,12 @@ doctest = false test = false [dependencies] # In alphabetical order +bstr = "1.0.1" chrono-tz = "0.6.3" lexical-core = "0.8.5" micromarshal = "0.2.0" num = "0.4.0" +serde_json = { workspace = true } # Workspace dependencies common-datablocks = { path = "../datablocks" } diff --git a/src/query/formats/src/common_settings.rs b/src/query/formats/src/common_settings.rs index 8d72d0e9a350..28328566f3f9 100644 --- a/src/query/formats/src/common_settings.rs +++ b/src/query/formats/src/common_settings.rs @@ -13,6 +13,8 @@ // limitations under the License. use chrono_tz::Tz; + +#[derive(Clone)] pub struct CommonSettings { pub true_bytes: Vec, pub false_bytes: Vec, diff --git a/src/query/formats/src/field_decoder/csv.rs b/src/query/formats/src/field_decoder/csv.rs new file mode 100644 index 000000000000..aad021cde17e --- /dev/null +++ b/src/query/formats/src/field_decoder/csv.rs @@ -0,0 +1,132 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::BufRead; +use std::io::Cursor; + +use common_datavalues::ArrayDeserializer; +use common_datavalues::StringDeserializer; +use common_datavalues::StructDeserializer; +use common_datavalues::VariantDeserializer; +use common_exception::Result; +use common_io::consts::FALSE_BYTES_LOWER; +use common_io::consts::INF_BYTES_LOWER; +use common_io::consts::NAN_BYTES_LOWER; +use common_io::consts::NULL_BYTES_ESCAPE; +use common_io::consts::TRUE_BYTES_LOWER; +use common_io::cursor_ext::ReadBytesExt; + +use crate::field_decoder::row_based::FieldDecoderRowBased; +use crate::field_decoder::values::FieldDecoderValues; +use crate::CommonSettings; +use crate::FieldDecoder; +use crate::FileFormatOptionsExt; + +#[derive(Clone)] +pub struct FieldDecoderCSV { + pub nested: FieldDecoderValues, + pub common_settings: CommonSettings, +} + +impl FieldDecoderCSV { + pub fn create(options: &FileFormatOptionsExt) -> Self { + FieldDecoderCSV { + nested: FieldDecoderValues::create(options), + common_settings: CommonSettings { + true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(), + false_bytes: FALSE_BYTES_LOWER.as_bytes().to_vec(), + null_bytes: NULL_BYTES_ESCAPE.as_bytes().to_vec(), + nan_bytes: NAN_BYTES_LOWER.as_bytes().to_vec(), + inf_bytes: INF_BYTES_LOWER.as_bytes().to_vec(), + timezone: options.timezone, + }, + } + } +} + +impl FieldDecoder for FieldDecoderCSV { + fn as_any(&self) -> &dyn Any { + self + } +} + +impl FieldDecoderRowBased for FieldDecoderCSV { + fn common_settings(&self) -> &CommonSettings { + &self.common_settings + } + + fn ignore_field_end>(&self, reader: &mut Cursor) -> bool { + reader.eof() + } + + fn read_string_inner>( + &self, + reader: &mut Cursor, + out_buf: &mut Vec, + _raw: bool, + ) -> Result<()> { + let buf = reader.remaining_slice(); + out_buf.extend_from_slice(buf); + reader.consume(buf.len()); + Ok(()) + } + + fn read_string>( + &self, + column: &mut StringDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + let buf = reader.remaining_slice(); + column.builder.append_value(buf); + reader.consume(buf.len()); + Ok(()) + } + + fn read_variant>( + &self, + column: &mut VariantDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + let buf = reader.remaining_slice(); + let len = buf.len(); + let val = serde_json::from_slice(buf)?; + reader.consume(len); + column.builder.append_value(val); + column.memory_size += len; + Ok(()) + } + + fn read_array>( + &self, + column: &mut ArrayDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + self.nested.read_array(column, reader, false)?; + Ok(()) + } + + fn read_struct>( + &self, + column: &mut StructDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + self.nested.read_struct(column, reader, false)?; + Ok(()) + } +} diff --git a/src/query/formats/src/field_decoder/mod.rs b/src/query/formats/src/field_decoder/mod.rs new file mode 100644 index 000000000000..971e3206da26 --- /dev/null +++ b/src/query/formats/src/field_decoder/mod.rs @@ -0,0 +1,31 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod csv; +mod row_based; +mod tsv; +mod values; +mod xml; + +use std::any::Any; + +pub use csv::FieldDecoderCSV; +pub use row_based::FieldDecoderRowBased; +pub use tsv::FieldDecoderTSV; +pub use values::FieldDecoderValues; +pub use xml::FieldDecoderXML; + +pub trait FieldDecoder: Send + Sync { + fn as_any(&self) -> &dyn Any; +} diff --git a/src/query/formats/src/field_decoder/row_based.rs b/src/query/formats/src/field_decoder/row_based.rs new file mode 100644 index 000000000000..424b0caa7bc7 --- /dev/null +++ b/src/query/formats/src/field_decoder/row_based.rs @@ -0,0 +1,269 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Cursor; + +use bstr::ByteSlice; +use common_datavalues::check_date; +use common_datavalues::check_timestamp; +use common_datavalues::deserializations::ArrayDeserializer; +use common_datavalues::deserializations::BooleanDeserializer; +use common_datavalues::deserializations::DateDeserializer; +use common_datavalues::deserializations::NullableDeserializer; +use common_datavalues::deserializations::NumberDeserializer; +use common_datavalues::deserializations::StringDeserializer; +use common_datavalues::deserializations::StructDeserializer; +use common_datavalues::deserializations::TimestampDeserializer; +use common_datavalues::deserializations::VariantDeserializer; +use common_datavalues::uniform_date; +use common_datavalues::MutableColumn; +use common_datavalues::NullDeserializer; +use common_datavalues::PrimitiveType; +use common_datavalues::TypeDeserializer; +use common_datavalues::TypeDeserializerImpl; +use common_exception::ErrorCode; +use common_exception::Result; +use common_io::cursor_ext::BufferReadDateTimeExt; +use common_io::cursor_ext::ReadBytesExt; +use common_io::cursor_ext::ReadCheckPointExt; +use common_io::cursor_ext::ReadNumberExt; +use common_io::prelude::StatBuffer; +use lexical_core::FromLexical; +use micromarshal::Unmarshal; +use num::cast::AsPrimitive; + +use crate::field_decoder::FieldDecoder; +use crate::CommonSettings; + +pub trait FieldDecoderRowBased: FieldDecoder { + fn common_settings(&self) -> &CommonSettings; + + fn ignore_field_end>(&self, reader: &mut Cursor) -> bool; + + fn match_bytes>(&self, reader: &mut Cursor, bs: &[u8]) -> bool { + let pos = reader.checkpoint(); + if reader.ignore_bytes(bs) && self.ignore_field_end(reader) { + true + } else { + reader.rollback(pos); + false + } + } + + fn read_field>( + &self, + column: &mut TypeDeserializerImpl, + reader: &mut Cursor, + raw: bool, + ) -> Result<()> { + match column { + TypeDeserializerImpl::Null(c) => self.read_null(c, reader, raw), + TypeDeserializerImpl::Nullable(c) => self.read_nullable(c, reader, raw), + TypeDeserializerImpl::Boolean(c) => self.read_bool(c, reader, raw), + TypeDeserializerImpl::Int8(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::Int16(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::Int32(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::Int64(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::UInt8(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::UInt16(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::UInt32(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::UInt64(c) => self.read_int(c, reader, raw), + TypeDeserializerImpl::Float32(c) => self.read_float(c, reader, raw), + TypeDeserializerImpl::Float64(c) => self.read_float(c, reader, raw), + TypeDeserializerImpl::Date(c) => self.read_date(c, reader, raw), + TypeDeserializerImpl::Interval(c) => self.read_date(c, reader, raw), + TypeDeserializerImpl::Timestamp(c) => self.read_timestamp(c, reader, raw), + TypeDeserializerImpl::String(c) => self.read_string(c, reader, raw), + TypeDeserializerImpl::Array(c) => self.read_array(c, reader, raw), + TypeDeserializerImpl::Struct(c) => self.read_struct(c, reader, raw), + TypeDeserializerImpl::Variant(c) => self.read_variant(c, reader, raw), + } + } + + fn read_bool>( + &self, + column: &mut BooleanDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + if self.match_bytes(reader, &self.common_settings().true_bytes) { + column.builder.append_value(true); + Ok(()) + } else if self.match_bytes(reader, &self.common_settings().false_bytes) { + column.builder.append_value(false); + Ok(()) + } else { + let err_msg = format!( + "Incorrect boolean value, expect {} or {}", + self.common_settings().true_bytes.to_str().unwrap(), + self.common_settings().false_bytes.to_str().unwrap() + ); + Err(ErrorCode::BadBytes(err_msg)) + } + } + + fn read_null>( + &self, + column: &mut NullDeserializer, + _reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + column.builder.append_default(); + Ok(()) + } + + fn read_nullable>( + &self, + column: &mut NullableDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()> { + if reader.eof() { + column.de_default(); + } else if self.match_bytes(reader, &self.common_settings().null_bytes) + && self.ignore_field_end(reader) + { + column.de_default(); + return Ok(()); + } else { + self.read_field(&mut column.inner, reader, raw)?; + column.bitmap.push(true); + } + Ok(()) + } + + fn read_string_inner>( + &self, + reader: &mut Cursor, + out_buf: &mut Vec, + raw: bool, + ) -> Result<()>; + + fn read_int>( + &self, + column: &mut NumberDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> + where + T: PrimitiveType + Unmarshal + StatBuffer + FromLexical, + { + let v: T = if !T::FLOATING { + reader.read_int_text() + } else { + reader.read_float_text() + }?; + column.builder.append_value(v); + Ok(()) + } + + fn read_float>( + &self, + column: &mut NumberDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> + where + T: PrimitiveType + Unmarshal + StatBuffer + FromLexical, + { + let v: T = if !T::FLOATING { + reader.read_int_text() + } else { + reader.read_float_text() + }?; + column.builder.append_value(v); + Ok(()) + } + + fn read_string>( + &self, + column: &mut StringDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()>; + + fn read_date>( + &self, + column: &mut DateDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()> + where + i32: AsPrimitive, + T: PrimitiveType, + T: Unmarshal + StatBuffer + FromLexical, + { + column.buffer.clear(); + self.read_string_inner(reader, &mut column.buffer, raw)?; + let mut buffer_readr = Cursor::new(&column.buffer); + let date = buffer_readr.read_date_text(&self.common_settings().timezone)?; + let days = uniform_date::(date); + check_date(days.as_i32())?; + column.builder.append_value(days); + Ok(()) + } + + fn read_timestamp>( + &self, + column: &mut TimestampDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()> { + column.buffer.clear(); + self.read_string_inner(reader, &mut column.buffer, raw)?; + let mut buffer_readr = Cursor::new(&column.buffer); + let ts = buffer_readr.read_timestamp_text(&self.common_settings().timezone)?; + if !buffer_readr.eof() { + let data = column.buffer.to_str().unwrap_or("not utf8"); + let msg = format!( + "fail to deserialize timestamp, unexpected end at pos {} of {}", + buffer_readr.position(), + data + ); + return Err(ErrorCode::BadBytes(msg)); + } + let micros = ts.timestamp_micros(); + check_timestamp(micros)?; + column.builder.append_value(micros.as_()); + Ok(()) + } + + fn read_variant>( + &self, + column: &mut VariantDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()> { + column.buffer.clear(); + self.read_string_inner(reader, &mut column.buffer, raw)?; + let val = serde_json::from_slice(column.buffer.as_slice())?; + column.builder.append_value(val); + column.memory_size += column.buffer.len(); + Ok(()) + } + + fn read_array>( + &self, + column: &mut ArrayDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()>; + + fn read_struct>( + &self, + column: &mut StructDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()>; +} diff --git a/src/query/formats/src/field_decoder/tsv.rs b/src/query/formats/src/field_decoder/tsv.rs new file mode 100644 index 000000000000..b318146c4271 --- /dev/null +++ b/src/query/formats/src/field_decoder/tsv.rs @@ -0,0 +1,151 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::Cursor; + +use common_datavalues::ArrayDeserializer; +use common_datavalues::ArrayValue; +use common_datavalues::StringDeserializer; +use common_datavalues::StructDeserializer; +use common_datavalues::StructValue; +use common_datavalues::TypeDeserializer; +use common_exception::Result; +use common_io::consts::FALSE_BYTES_NUM; +use common_io::consts::INF_BYTES_LOWER; +use common_io::consts::NAN_BYTES_LOWER; +use common_io::consts::NULL_BYTES_ESCAPE; +use common_io::consts::TRUE_BYTES_NUM; +use common_io::cursor_ext::BufferReadStringExt; +use common_io::cursor_ext::ReadBytesExt; + +use crate::field_decoder::row_based::FieldDecoderRowBased; +use crate::CommonSettings; +use crate::FieldDecoder; +use crate::FileFormatOptionsExt; + +#[derive(Clone)] +pub struct FieldDecoderTSV { + pub common_settings: CommonSettings, + pub quote_char: u8, +} + +impl FieldDecoderTSV { + pub fn create(options: &FileFormatOptionsExt) -> Self { + FieldDecoderTSV { + common_settings: CommonSettings { + true_bytes: TRUE_BYTES_NUM.as_bytes().to_vec(), + false_bytes: FALSE_BYTES_NUM.as_bytes().to_vec(), + null_bytes: NULL_BYTES_ESCAPE.as_bytes().to_vec(), + nan_bytes: NAN_BYTES_LOWER.as_bytes().to_vec(), + inf_bytes: INF_BYTES_LOWER.as_bytes().to_vec(), + timezone: options.timezone, + }, + quote_char: options.get_quote_char(), + } + } +} + +impl FieldDecoder for FieldDecoderTSV { + fn as_any(&self) -> &dyn Any { + self + } +} + +impl FieldDecoderRowBased for FieldDecoderTSV { + fn common_settings(&self) -> &CommonSettings { + &self.common_settings + } + + fn ignore_field_end>(&self, reader: &mut Cursor) -> bool { + reader.eof() + } + + fn read_string_inner>( + &self, + reader: &mut Cursor, + tmp_buf: &mut Vec, + raw: bool, + ) -> Result<()> { + tmp_buf.clear(); + if raw { + reader.read_escaped_string_text(tmp_buf) + } else { + reader.read_quoted_text(tmp_buf, self.quote_char) + }?; + Ok(()) + } + + fn read_string>( + &self, + column: &mut StringDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()> { + self.read_string_inner(reader, &mut column.buffer, raw)?; + column.builder.append_value(column.buffer.as_slice()); + Ok(()) + } + + fn read_array>( + &self, + column: &mut ArrayDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + reader.must_ignore_byte(b'[')?; + let mut idx = 0; + loop { + let _ = reader.ignore_white_spaces(); + if reader.ignore_byte(b']') { + break; + } + if idx != 0 { + reader.must_ignore_byte(b',')?; + } + let _ = reader.ignore_white_spaces(); + self.read_field(&mut column.inner, reader, false)?; + idx += 1; + } + let mut values = Vec::with_capacity(idx); + for _ in 0..idx { + values.push(column.inner.pop_data_value()?); + } + values.reverse(); + column.builder.append_value(ArrayValue::new(values)); + Ok(()) + } + + fn read_struct>( + &self, + column: &mut StructDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + reader.must_ignore_byte(b'(')?; + let mut values = Vec::with_capacity(column.inners.len()); + for (idx, inner) in column.inners.iter_mut().enumerate() { + let _ = reader.ignore_white_spaces(); + if idx != 0 { + reader.must_ignore_byte(b',')?; + } + let _ = reader.ignore_white_spaces(); + self.read_field(inner, reader, false)?; + values.push(inner.pop_data_value()?); + } + reader.must_ignore_byte(b')')?; + column.builder.append_value(StructValue::new(values)); + Ok(()) + } +} diff --git a/src/query/formats/src/field_decoder/values.rs b/src/query/formats/src/field_decoder/values.rs new file mode 100644 index 000000000000..4370fe82e64e --- /dev/null +++ b/src/query/formats/src/field_decoder/values.rs @@ -0,0 +1,182 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::Cursor; + +use chrono_tz::Tz; +use common_datavalues::ArrayDeserializer; +use common_datavalues::ArrayValue; +use common_datavalues::NullableDeserializer; +use common_datavalues::StringDeserializer; +use common_datavalues::StructDeserializer; +use common_datavalues::StructValue; +use common_datavalues::TypeDeserializer; +use common_exception::Result; +use common_io::consts::FALSE_BYTES_LOWER; +use common_io::consts::INF_BYTES_LOWER; +use common_io::consts::NAN_BYTES_LOWER; +use common_io::consts::NULL_BYTES_UPPER; +use common_io::consts::TRUE_BYTES_LOWER; +use common_io::cursor_ext::BufferReadStringExt; +use common_io::cursor_ext::ReadBytesExt; + +use crate::field_decoder::row_based::FieldDecoderRowBased; +use crate::CommonSettings; +use crate::FieldDecoder; +use crate::FileFormatOptionsExt; + +#[derive(Clone)] +pub struct FieldDecoderValues { + pub common_settings: CommonSettings, +} + +impl FieldDecoderValues { + pub fn create(options: &FileFormatOptionsExt) -> Self { + FieldDecoderValues { + common_settings: CommonSettings { + true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(), + false_bytes: FALSE_BYTES_LOWER.as_bytes().to_vec(), + null_bytes: NULL_BYTES_UPPER.as_bytes().to_vec(), + nan_bytes: NAN_BYTES_LOWER.as_bytes().to_vec(), + inf_bytes: INF_BYTES_LOWER.as_bytes().to_vec(), + timezone: options.timezone, + }, + } + } + + pub fn create_for_insert(timezone: Tz) -> Self { + FieldDecoderValues { + common_settings: CommonSettings { + true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(), + false_bytes: FALSE_BYTES_LOWER.as_bytes().to_vec(), + null_bytes: NULL_BYTES_UPPER.as_bytes().to_vec(), + nan_bytes: NAN_BYTES_LOWER.as_bytes().to_vec(), + inf_bytes: INF_BYTES_LOWER.as_bytes().to_vec(), + timezone, + }, + } + } +} + +impl FieldDecoder for FieldDecoderValues { + fn as_any(&self) -> &dyn Any { + self + } +} + +impl FieldDecoderRowBased for FieldDecoderValues { + fn common_settings(&self) -> &CommonSettings { + &self.common_settings + } + + fn ignore_field_end>(&self, reader: &mut Cursor) -> bool { + reader.ignore_white_spaces(); + matches!(reader.peek(), None | Some(',') | Some(')')) + } + + fn read_nullable>( + &self, + column: &mut NullableDeserializer, + reader: &mut Cursor, + raw: bool, + ) -> Result<()> { + if reader.eof() { + column.de_default(); + } else if raw && self.match_bytes(reader, b"NULL") || self.match_bytes(reader, b"null") { + column.de_default(); + return Ok(()); + } else if !raw && reader.ignore_bytes(b"NULL") || reader.ignore_bytes(b"null") { + column.de_default(); + return Ok(()); + } else { + self.read_field(&mut column.inner, reader, raw)?; + column.bitmap.push(true); + } + Ok(()) + } + + fn read_string_inner>( + &self, + reader: &mut Cursor, + out_buf: &mut Vec, + _raw: bool, + ) -> Result<()> { + reader.read_quoted_text(out_buf, b'\'')?; + Ok(()) + } + + fn read_string>( + &self, + column: &mut StringDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + column.buffer.clear(); + reader.read_quoted_text(&mut column.buffer, b'\'')?; + column.builder.append_value(column.buffer.as_slice()); + Ok(()) + } + + fn read_array>( + &self, + column: &mut ArrayDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + reader.must_ignore_byte(b'[')?; + let mut idx = 0; + loop { + let _ = reader.ignore_white_spaces(); + if reader.ignore_byte(b']') { + break; + } + if idx != 0 { + reader.must_ignore_byte(b',')?; + } + let _ = reader.ignore_white_spaces(); + self.read_field(&mut column.inner, reader, false)?; + idx += 1; + } + let mut values = Vec::with_capacity(idx); + for _ in 0..idx { + values.push(column.inner.pop_data_value()?); + } + values.reverse(); + column.builder.append_value(ArrayValue::new(values)); + Ok(()) + } + + fn read_struct>( + &self, + column: &mut StructDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + reader.must_ignore_byte(b'(')?; + let mut values = Vec::with_capacity(column.inners.len()); + for (idx, inner) in column.inners.iter_mut().enumerate() { + let _ = reader.ignore_white_spaces(); + if idx != 0 { + reader.must_ignore_byte(b',')?; + } + let _ = reader.ignore_white_spaces(); + self.read_field(inner, reader, false)?; + values.push(inner.pop_data_value()?); + } + reader.must_ignore_byte(b')')?; + column.builder.append_value(StructValue::new(values)); + Ok(()) + } +} diff --git a/src/query/formats/src/field_decoder/xml.rs b/src/query/formats/src/field_decoder/xml.rs new file mode 100644 index 000000000000..3a90cc310e88 --- /dev/null +++ b/src/query/formats/src/field_decoder/xml.rs @@ -0,0 +1,132 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::BufRead; +use std::io::Cursor; + +use common_datavalues::ArrayDeserializer; +use common_datavalues::StringDeserializer; +use common_datavalues::StructDeserializer; +use common_datavalues::VariantDeserializer; +use common_exception::Result; +use common_io::consts::FALSE_BYTES_LOWER; +use common_io::consts::INF_BYTES_LOWER; +use common_io::consts::NAN_BYTES_LOWER; +use common_io::consts::NULL_BYTES_LOWER; +use common_io::consts::TRUE_BYTES_LOWER; +use common_io::cursor_ext::ReadBytesExt; + +use crate::field_decoder::row_based::FieldDecoderRowBased; +use crate::field_decoder::values::FieldDecoderValues; +use crate::CommonSettings; +use crate::FieldDecoder; +use crate::FileFormatOptionsExt; + +#[derive(Clone)] +pub struct FieldDecoderXML { + pub nested: FieldDecoderValues, + pub common_settings: CommonSettings, +} + +impl FieldDecoderXML { + pub fn create(options: &FileFormatOptionsExt) -> Self { + FieldDecoderXML { + nested: FieldDecoderValues::create(options), + common_settings: CommonSettings { + true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(), + false_bytes: FALSE_BYTES_LOWER.as_bytes().to_vec(), + null_bytes: NULL_BYTES_LOWER.as_bytes().to_vec(), + nan_bytes: NAN_BYTES_LOWER.as_bytes().to_vec(), + inf_bytes: INF_BYTES_LOWER.as_bytes().to_vec(), + timezone: options.timezone, + }, + } + } +} + +impl FieldDecoder for FieldDecoderXML { + fn as_any(&self) -> &dyn Any { + self + } +} + +impl FieldDecoderRowBased for FieldDecoderXML { + fn common_settings(&self) -> &CommonSettings { + &self.common_settings + } + + fn ignore_field_end>(&self, reader: &mut Cursor) -> bool { + reader.eof() + } + + fn read_string_inner>( + &self, + reader: &mut Cursor, + out_buf: &mut Vec, + _raw: bool, + ) -> Result<()> { + let buf = reader.remaining_slice(); + out_buf.extend_from_slice(buf); + reader.consume(buf.len()); + Ok(()) + } + + fn read_string>( + &self, + column: &mut StringDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + let buf = reader.remaining_slice(); + column.builder.append_value(buf); + reader.consume(buf.len()); + Ok(()) + } + + fn read_variant>( + &self, + column: &mut VariantDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + let buf = reader.remaining_slice(); + let len = buf.len(); + let val = serde_json::from_slice(buf)?; + reader.consume(len); + column.builder.append_value(val); + column.memory_size += len; + Ok(()) + } + + fn read_array>( + &self, + column: &mut ArrayDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + self.nested.read_array(column, reader, false)?; + Ok(()) + } + + fn read_struct>( + &self, + column: &mut StructDeserializer, + reader: &mut Cursor, + _raw: bool, + ) -> Result<()> { + self.nested.read_struct(column, reader, false)?; + Ok(()) + } +} diff --git a/src/query/formats/src/lib.rs b/src/query/formats/src/lib.rs index 524767c5e7c0..f837abc7634a 100644 --- a/src/query/formats/src/lib.rs +++ b/src/query/formats/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(cursor_remaining)] // Copyright 2022 Datafuse Labs. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,11 +15,14 @@ mod clickhouse; mod common_settings; +mod field_decoder; pub mod field_encoder; mod file_format_type; pub mod output_format; pub use clickhouse::ClickhouseFormatType; +pub use field_decoder::*; +pub use file_format_type::parse_timezone; pub use file_format_type::FileFormatOptionsExt; pub use file_format_type::FileFormatTypeExt; From 65adc66bedbc2d1b213cdee8d300f9453c194e7f Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 13 Nov 2022 19:43:20 +0800 Subject: [PATCH 08/14] refactor(input format): refactor with FieldEncoder. --- .../input_formats/impls/input_format_csv.rs | 21 +++++++++++---- .../impls/input_format_ndjson.rs | 8 ++++++ .../input_formats/impls/input_format_tsv.rs | 27 ++++++++++++++----- .../input_formats/impls/input_format_xml.rs | 17 +++++++++++- .../sources/input_formats/input_context.rs | 5 ++++ .../input_formats/input_format_text.rs | 7 +++++ 6 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index a446f04740cf..aed0923a2265 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -21,9 +21,12 @@ use common_datavalues::DataSchemaRef; use common_datavalues::TypeDeserializer; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::FieldDecoder; +use common_formats::FieldDecoderCSV; +use common_formats::FieldDecoderRowBased; +use common_formats::FileFormatOptionsExt; use common_io::cursor_ext::*; use common_io::format_diagnostic::verbose_char; -use common_io::prelude::FormatSettings; use common_meta_types::StageFileFormatType; use csv_core::ReadRecordResult; @@ -39,11 +42,11 @@ pub struct InputFormatCSV {} impl InputFormatCSV { fn read_row( + field_decoder: &FieldDecoderCSV, buf: &[u8], deserializers: &mut [common_datavalues::TypeDeserializerImpl], schema: &DataSchemaRef, field_ends: &[usize], - format_settings: &FormatSettings, path: &str, row_index: usize, ) -> Result<()> { @@ -55,8 +58,7 @@ impl InputFormatCSV { if reader.eof() { deserializer.de_default(); } else { - // todo(youngsofun): do not need escape, already done in csv-core - if let Err(e) = deserializer.de_text(&mut reader, format_settings) { + if let Err(e) = field_decoder.read_field(deserializer, &mut reader, true) { let err_msg = format_column_error(schema, c, col_data, &e.message()); return Err(csv_error(&err_msg, path, row_index)); }; @@ -118,20 +120,29 @@ impl InputFormatTextBase for InputFormatCSV { b',' } + fn create_field_decoder(options: &FileFormatOptionsExt) -> Arc { + Arc::new(FieldDecoderCSV::create(options)) + } + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { let columns = &mut builder.mutable_columns; let n_column = columns.len(); let mut start = 0usize; let start_row = batch.start_row.expect("must success"); let mut field_end_idx = 0; + let field_decoder = builder + .field_decoder + .as_any() + .downcast_ref::() + .expect("must success"); for (i, end) in batch.row_ends.iter().enumerate() { let buf = &batch.data[start..*end]; Self::read_row( + field_decoder, buf, columns, &builder.ctx.schema, &batch.field_ends[field_end_idx..field_end_idx + n_column], - &builder.ctx.format_settings, &batch.path, start_row + i, )?; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs index a3924fc21528..b445d1b414da 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::borrow::Cow; +use std::sync::Arc; use bstr::ByteSlice; use common_datavalues::DataSchemaRef; @@ -20,6 +21,9 @@ use common_datavalues::TypeDeserializer; use common_datavalues::TypeDeserializerImpl; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::FieldDecoder; +use common_formats::FieldDecoderValues; +use common_formats::FileFormatOptionsExt; use common_io::prelude::FormatSettings; use common_meta_types::StageFileFormatType; @@ -76,6 +80,10 @@ impl InputFormatTextBase for InputFormatNDJson { true } + fn create_field_decoder(options: &FileFormatOptionsExt) -> Arc { + Arc::new(FieldDecoderValues::create(options)) + } + fn default_field_delimiter() -> u8 { b',' } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index c013e58b32dd..c5f46c6dedbe 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -13,14 +13,18 @@ // limitations under the License. use std::io::Cursor; +use std::sync::Arc; use common_datavalues::DataSchemaRef; use common_datavalues::TypeDeserializer; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::FieldDecoder; +use common_formats::FieldDecoderRowBased; +use common_formats::FieldDecoderTSV; +use common_formats::FileFormatOptionsExt; use common_io::cursor_ext::*; use common_io::format_diagnostic::verbose_string; -use common_io::prelude::FormatSettings; use common_meta_types::StageFileFormatType; use crate::processors::sources::input_formats::input_format_text::AligningState; @@ -33,10 +37,10 @@ pub struct InputFormatTSV {} impl InputFormatTSV { #[allow(clippy::too_many_arguments)] fn read_row( + field_decoder: &FieldDecoderTSV, buf: &[u8], deserializers: &mut Vec, schema: &DataSchemaRef, - format_settings: &FormatSettings, path: &str, batch_id: usize, offset: usize, @@ -56,9 +60,11 @@ impl InputFormatTSV { } else { let mut reader = Cursor::new(col_data); reader.ignores(|c: u8| c == b' '); - if let Err(e) = - deserializers[column_index].de_text(&mut reader, format_settings) - { + if let Err(e) = field_decoder.read_field( + &mut deserializers[column_index], + &mut reader, + true, + ) { err_msg = Some(format_column_error( schema, column_index, @@ -126,6 +132,10 @@ impl InputFormatTextBase for InputFormatTSV { true } + fn create_field_decoder(options: &FileFormatOptionsExt) -> Arc { + Arc::new(FieldDecoderTSV::create(options)) + } + fn default_field_delimiter() -> u8 { b'\t' } @@ -138,6 +148,11 @@ impl InputFormatTextBase for InputFormatTSV { batch.start_row, batch.offset ); + let field_decoder = builder + .field_decoder + .as_any() + .downcast_ref::() + .expect("must success"); let schema = &builder.ctx.schema; let columns = &mut builder.mutable_columns; let mut start = 0usize; @@ -145,10 +160,10 @@ impl InputFormatTextBase for InputFormatTSV { for (i, end) in batch.row_ends.iter().enumerate() { let buf = &batch.data[start..*end]; // include \n Self::read_row( + field_decoder, buf, columns, schema, - &builder.ctx.format_settings, &batch.path, batch.batch_id, batch.offset + start, diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs index c615c8bcff91..41b2464cf2b7 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_xml.rs @@ -20,6 +20,10 @@ use common_datavalues::TypeDeserializer; use common_datavalues::TypeDeserializerImpl; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::FieldDecoder; +use common_formats::FieldDecoderRowBased; +use common_formats::FieldDecoderXML; +use common_formats::FileFormatOptionsExt; use common_io::cursor_ext::*; use common_io::prelude::FormatSettings; use common_meta_types::StageFileFormatType; @@ -37,6 +41,7 @@ pub struct InputFormatXML {} impl InputFormatXML { fn read_row( + field_decoder: &FieldDecoderXML, buf: &[u8], deserializers: &mut [TypeDeserializerImpl], schema: &DataSchemaRef, @@ -65,7 +70,7 @@ impl InputFormatXML { if reader.eof() { deserializer.de_default(); } else { - if let Err(e) = deserializer.de_text(&mut reader, format_settings) { + if let Err(e) = field_decoder.read_field(deserializer, &mut reader, true) { let value_str = format!("{:?}", value); let err_msg = format!("{}. column={} value={}", e, field.name(), value_str); return Err(xml_error(&err_msg, path, row_index)); @@ -90,6 +95,10 @@ impl InputFormatTextBase for InputFormatXML { StageFileFormatType::Xml } + fn create_field_decoder(options: &FileFormatOptionsExt) -> Arc { + Arc::new(FieldDecoderXML::create(options)) + } + fn default_field_delimiter() -> u8 { b',' } @@ -102,6 +111,11 @@ impl InputFormatTextBase for InputFormatXML { batch.start_row, batch.offset, ); + let field_decoder = builder + .field_decoder + .as_any() + .downcast_ref::() + .expect("must success"); let columns = &mut builder.mutable_columns; let mut start = 0usize; @@ -109,6 +123,7 @@ impl InputFormatTextBase for InputFormatXML { for (i, end) in batch.row_ends.iter().enumerate() { let buf = &batch.data[start..*end]; Self::read_row( + field_decoder, buf, columns, &builder.ctx.schema, diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index 96da0b930f7c..1103c2899946 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -26,6 +26,7 @@ use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; use common_formats::ClickhouseFormatType; +use common_formats::FileFormatOptionsExt; use common_formats::FileFormatTypeExt; use common_io::prelude::FormatSettings; use common_meta_types::StageFileCompression; @@ -111,6 +112,7 @@ pub struct InputContext { pub format: Arc, pub splits: Vec>, + pub format_options: FileFormatOptionsExt, // row format only pub rows_to_skip: usize, pub field_delimiter: u8, @@ -215,6 +217,7 @@ impl InputContext { source: InputSource::Operator(operator), plan: InputPlan::CopyInto(plan), block_compact_thresholds, + format_options: file_format_options, }) } @@ -246,6 +249,7 @@ impl InputContext { let format = Self::get_input_format(&format_type)?; let format_settings = format_type.get_format_settings(&file_format_options, &settings)?; let read_batch_size = settings.get_input_read_buffer_size()? as usize; + let file_format_options_clone = file_format_options.clone(); let field_delimiter = file_format_options.stage.field_delimiter; let field_delimiter = { if field_delimiter.is_empty() { @@ -281,6 +285,7 @@ impl InputContext { plan: InputPlan::StreamingLoad(plan), splits: vec![], block_compact_thresholds, + format_options: file_format_options_clone, }) } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index e848df183611..0c1337b7e115 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -22,6 +22,8 @@ use common_datavalues::TypeDeserializer; use common_datavalues::TypeDeserializerImpl; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::FieldDecoder; +use common_formats::FileFormatOptionsExt; use common_meta_types::StageFileFormatType; use common_meta_types::UserStageInfo; use common_pipeline_core::Pipeline; @@ -55,6 +57,8 @@ pub trait InputFormatTextBase: Sized + Send + Sync + 'static { RecordDelimiter::Crlf } + fn create_field_decoder(options: &FileFormatOptionsExt) -> Arc; + fn default_field_delimiter() -> u8; fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()>; @@ -374,6 +378,7 @@ impl AligningStateTrait for AligningState { } pub struct BlockBuilder { + pub field_decoder: Arc, pub ctx: Arc, pub mutable_columns: Vec, pub num_rows: usize, @@ -407,11 +412,13 @@ impl BlockBuilderTrait for BlockBuilder { let columns = ctx .schema .create_deserializers(ctx.block_compact_thresholds.min_rows_per_block); + let field_decoder = T::create_field_decoder(&ctx.format_options); BlockBuilder { ctx, mutable_columns: columns, num_rows: 0, phantom: Default::default(), + field_decoder, } } From 517e7dd20cba23a92b9136ea1e249e81f78da54a Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 13 Nov 2022 20:56:19 +0800 Subject: [PATCH 09/14] refactor(insert): decode values with FieldEncoder. --- src/query/formats/src/file_format_type.rs | 2 +- .../service/src/interpreters/interpreter_insert_v2.rs | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/query/formats/src/file_format_type.rs b/src/query/formats/src/file_format_type.rs index 872743b26dc0..d9d5e3818a87 100644 --- a/src/query/formats/src/file_format_type.rs +++ b/src/query/formats/src/file_format_type.rs @@ -445,7 +445,7 @@ fn format_setting_parquet(options: &FileFormatOptionsExt, timezone: Tz) -> Forma format_setting_csv(options, timezone) } -fn parse_timezone(settings: &Settings) -> Result { +pub fn parse_timezone(settings: &Settings) -> Result { let tz = settings.get_timezone()?; tz.parse::() .map_err(|_| ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")) diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index a3b1aefd4a5d..e8d33949fd41 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -28,6 +28,9 @@ use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::parse_timezone; +use common_formats::FieldDecoderRowBased; +use common_formats::FieldDecoderValues; use common_io::cursor_ext::ReadBytesExt; use common_io::cursor_ext::ReadCheckPointExt; use common_io::prelude::FormatSettings; @@ -317,6 +320,8 @@ impl ValueSource { let col_size = desers.len(); let mut rows = 0; + let timezone = parse_timezone(&self.ctx.get_settings())?; + let field_decoder = FieldDecoderValues::create_for_insert(timezone); loop { let _ = reader.ignore_white_spaces(); @@ -329,6 +334,7 @@ impl ValueSource { } self.parse_next_row( + &field_decoder, reader, col_size, &mut desers, @@ -354,6 +360,7 @@ impl ValueSource { /// Parse single row value, like ('111', 222, 1 + 1) async fn parse_next_row>( &self, + field_decoder: &FieldDecoderValues, reader: &mut Cursor, col_size: usize, desers: &mut [TypeDeserializerImpl], @@ -383,8 +390,8 @@ impl ValueSource { .get_mut(col_idx) .ok_or_else(|| ErrorCode::BadBytes("Deserializer is None"))?; - let (need_fallback, pop_count) = deser - .de_text_quoted(reader, &format) + let (need_fallback, pop_count) = field_decoder + .read_field(deser, reader, false) .map(|_| { let _ = reader.ignore_white_spaces(); let need_fallback = reader.ignore_byte(col_end).not(); From c58bbc9674d338c1997d1e36bbc5539ed16572f8 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 13 Nov 2022 21:01:19 +0800 Subject: [PATCH 10/14] refactor(datavalues): rm unused de_text. --- .../src/types/deserializations/array.rs | 31 -------------- .../src/types/deserializations/boolean.rs | 20 ---------- .../src/types/deserializations/date.rs | 30 -------------- .../src/types/deserializations/mod.rs | 15 ------- .../src/types/deserializations/null.rs | 11 ----- .../src/types/deserializations/nullable.rs | 40 ------------------- .../src/types/deserializations/number.rs | 14 ------- .../src/types/deserializations/string.rs | 24 ----------- .../src/types/deserializations/struct_.rs | 24 ----------- .../src/types/deserializations/timestamp.rs | 29 -------------- .../src/types/deserializations/variant.rs | 30 -------------- 11 files changed, 268 deletions(-) diff --git a/src/query/datavalues/src/types/deserializations/array.rs b/src/query/datavalues/src/types/deserializations/array.rs index 8fdfe4c91465..614291d90e5a 100644 --- a/src/query/datavalues/src/types/deserializations/array.rs +++ b/src/query/datavalues/src/types/deserializations/array.rs @@ -12,11 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Cursor; - use common_exception::ErrorCode; use common_exception::Result; -use common_io::cursor_ext::*; use common_io::prelude::BinaryRead; use common_io::prelude::FormatSettings; @@ -87,34 +84,6 @@ impl TypeDeserializer for ArrayDeserializer { } } - fn de_text>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - reader.must_ignore_byte(b'[')?; - let mut idx = 0; - loop { - let _ = reader.ignore_white_spaces(); - if reader.ignore_byte(b']') { - break; - } - if idx != 0 { - reader.must_ignore_byte(b',')?; - } - let _ = reader.ignore_white_spaces(); - self.inner.de_text_quoted(reader, format)?; - idx += 1; - } - let mut values = Vec::with_capacity(idx); - for _ in 0..idx { - values.push(self.inner.pop_data_value()?); - } - values.reverse(); - self.builder.append_value(ArrayValue::new(values)); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_data_value(value) } diff --git a/src/query/datavalues/src/types/deserializations/boolean.rs b/src/query/datavalues/src/types/deserializations/boolean.rs index 3b6309f31948..5ff6293e5552 100644 --- a/src/query/datavalues/src/types/deserializations/boolean.rs +++ b/src/query/datavalues/src/types/deserializations/boolean.rs @@ -12,11 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Cursor; - use common_exception::ErrorCode; use common_exception::Result; -use common_io::cursor_ext::*; use common_io::prelude::BinaryRead; use common_io::prelude::FormatSettings; @@ -65,23 +62,6 @@ impl TypeDeserializer for BooleanDeserializer { Ok(()) } - fn de_text>( - &mut self, - reader: &mut Cursor, - _format: &FormatSettings, - ) -> Result<()> { - let v = if reader.ignore_insensitive_bytes(b"true") { - Ok(true) - } else if reader.ignore_insensitive_bytes(b"false") { - Ok(false) - } else { - Err(ErrorCode::BadBytes("Incorrect boolean value")) - }?; - - self.builder.append_value(v); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_value(value.as_bool()?); Ok(()) diff --git a/src/query/datavalues/src/types/deserializations/date.rs b/src/query/datavalues/src/types/deserializations/date.rs index 16bf8d904a54..e78938475720 100644 --- a/src/query/datavalues/src/types/deserializations/date.rs +++ b/src/query/datavalues/src/types/deserializations/date.rs @@ -85,36 +85,6 @@ where } } - fn de_text_quoted>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - reader.must_ignore_byte(b'\'')?; - let date = reader.read_date_text(&format.timezone); - reader.must_ignore_byte(b'\'')?; - if date.is_err() { - return Err(date.err().unwrap()); - } - let days = uniform_date(date.unwrap()); - check_date(days.as_i32())?; - - self.builder.append_value(days); - Ok(()) - } - - fn de_text>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - let date = reader.read_date_text(&format.timezone)?; - let days = uniform_date(date); - check_date(days.as_i32())?; - self.builder.append_value(days); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { let v = value.as_i64()? as i32; check_date(v)?; diff --git a/src/query/datavalues/src/types/deserializations/mod.rs b/src/query/datavalues/src/types/deserializations/mod.rs index c2fe5e0ccb61..49de01b86113 100644 --- a/src/query/datavalues/src/types/deserializations/mod.rs +++ b/src/query/datavalues/src/types/deserializations/mod.rs @@ -28,7 +28,6 @@ mod string; mod struct_; mod timestamp; mod variant; -use std::io::Cursor; pub use array::*; pub use boolean::*; @@ -64,20 +63,6 @@ pub trait TypeDeserializer: Send + Sync { false } - fn de_text>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()>; - - fn de_text_quoted>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - self.de_text(reader, format) - } - fn append_data_value(&mut self, value: DataValue, format: &FormatSettings) -> Result<()>; /// Note this method will return err only when inner builder is empty. diff --git a/src/query/datavalues/src/types/deserializations/null.rs b/src/query/datavalues/src/types/deserializations/null.rs index da2db4f23bf7..819ee7961e59 100644 --- a/src/query/datavalues/src/types/deserializations/null.rs +++ b/src/query/datavalues/src/types/deserializations/null.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Cursor; - use common_exception::Result; use common_io::prelude::FormatSettings; @@ -59,15 +57,6 @@ impl TypeDeserializer for NullDeserializer { Ok(()) } - fn de_text>( - &mut self, - _reader: &mut Cursor, - _format: &FormatSettings, - ) -> Result<()> { - self.builder.append_default(); - Ok(()) - } - fn append_data_value(&mut self, _value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_default(); Ok(()) diff --git a/src/query/datavalues/src/types/deserializations/nullable.rs b/src/query/datavalues/src/types/deserializations/nullable.rs index 75bf76749381..9723d06c8fb5 100644 --- a/src/query/datavalues/src/types/deserializations/nullable.rs +++ b/src/query/datavalues/src/types/deserializations/nullable.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Cursor; - use common_arrow::arrow::bitmap::MutableBitmap; use common_exception::ErrorCode; use common_exception::Result; -use common_io::cursor_ext::*; use common_io::prelude::BinaryRead; use common_io::prelude::FormatSettings; @@ -77,43 +74,6 @@ impl TypeDeserializer for NullableDeserializer { } } - fn de_text>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - if reader.eof() { - self.de_default(); - } else { - if reader.ignore_insensitive_bytes(&format.null_bytes) { - let buffer = reader.remaining_slice(); - if buffer.is_empty() - || (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b'\t') - { - self.de_default(); - return Ok(()); - } - } - self.inner.de_text(reader, format)?; - self.bitmap.push(true); - } - Ok(()) - } - - fn de_text_quoted>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - if reader.ignore_insensitive_bytes(&format.null_bytes) { - self.de_default(); - } else { - self.inner.de_text_quoted(reader, format)?; - self.bitmap.push(true); - } - Ok(()) - } - fn de_null(&mut self, _format: &FormatSettings) -> bool { self.inner.de_default(); self.bitmap.push(false); diff --git a/src/query/datavalues/src/types/deserializations/number.rs b/src/query/datavalues/src/types/deserializations/number.rs index d2caf2ee0c7e..552165012a79 100644 --- a/src/query/datavalues/src/types/deserializations/number.rs +++ b/src/query/datavalues/src/types/deserializations/number.rs @@ -84,20 +84,6 @@ where false } - fn de_text>( - &mut self, - reader: &mut Cursor, - _format: &FormatSettings, - ) -> Result<()> { - let v: T = if !T::FLOATING { - reader.read_int_text() - } else { - reader.read_float_text() - }?; - self.builder.append_value(v); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_data_value(value) } diff --git a/src/query/datavalues/src/types/deserializations/string.rs b/src/query/datavalues/src/types/deserializations/string.rs index 9ff4b6ad6f24..9ac96451dbfb 100644 --- a/src/query/datavalues/src/types/deserializations/string.rs +++ b/src/query/datavalues/src/types/deserializations/string.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Cursor; use std::io::Read; use common_exception::ErrorCode; use common_exception::Result; -use common_io::cursor_ext::*; use common_io::prelude::BinaryRead; use common_io::prelude::FormatSettings; @@ -85,28 +83,6 @@ impl TypeDeserializer for StringDeserializer { } } - fn de_text>( - &mut self, - reader: &mut Cursor, - _format: &FormatSettings, - ) -> Result<()> { - self.buffer.clear(); - reader.read_escaped_string_text(&mut self.buffer)?; - self.builder.append_value(self.buffer.as_slice()); - Ok(()) - } - - fn de_text_quoted>( - &mut self, - reader: &mut Cursor, - _format: &FormatSettings, - ) -> Result<()> { - self.buffer.clear(); - reader.read_quoted_text(&mut self.buffer, b'\'')?; - self.builder.append_value(self.buffer.as_slice()); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_data_value(value) } diff --git a/src/query/datavalues/src/types/deserializations/struct_.rs b/src/query/datavalues/src/types/deserializations/struct_.rs index 77498fd139b7..cd168695427f 100644 --- a/src/query/datavalues/src/types/deserializations/struct_.rs +++ b/src/query/datavalues/src/types/deserializations/struct_.rs @@ -12,11 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Cursor; - use common_exception::ErrorCode; use common_exception::Result; -use common_io::cursor_ext::*; use common_io::prelude::FormatSettings; use crate::prelude::*; @@ -88,27 +85,6 @@ impl TypeDeserializer for StructDeserializer { } } - fn de_text>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - reader.must_ignore_byte(b'(')?; - let mut values = Vec::with_capacity(self.inners.len()); - for (idx, inner) in self.inners.iter_mut().enumerate() { - let _ = reader.ignore_white_spaces(); - if idx != 0 { - reader.must_ignore_byte(b',')?; - } - let _ = reader.ignore_white_spaces(); - inner.de_text_quoted(reader, format)?; - values.push(inner.pop_data_value()?); - } - reader.must_ignore_byte(b')')?; - self.builder.append_value(StructValue::new(values)); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_data_value(value) } diff --git a/src/query/datavalues/src/types/deserializations/timestamp.rs b/src/query/datavalues/src/types/deserializations/timestamp.rs index 2e881771f129..aa5d952290e1 100644 --- a/src/query/datavalues/src/types/deserializations/timestamp.rs +++ b/src/query/datavalues/src/types/deserializations/timestamp.rs @@ -75,35 +75,6 @@ impl TypeDeserializer for TimestampDeserializer { } } - fn de_text_quoted>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - reader.must_ignore_byte(b'\'')?; - let ts = reader.read_timestamp_text(&format.timezone); - reader.must_ignore_byte(b'\'')?; - if ts.is_err() { - return Err(ts.err().unwrap()); - } - let micros = ts.unwrap().timestamp_micros(); - check_timestamp(micros)?; - self.builder.append_value(micros.as_()); - Ok(()) - } - - fn de_text>( - &mut self, - reader: &mut Cursor, - format: &FormatSettings, - ) -> Result<()> { - let ts = reader.read_timestamp_text(&format.timezone)?; - let micros = ts.timestamp_micros(); - check_timestamp(micros)?; - self.builder.append_value(micros.as_()); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { let v = value.as_i64()?; check_timestamp(v)?; diff --git a/src/query/datavalues/src/types/deserializations/variant.rs b/src/query/datavalues/src/types/deserializations/variant.rs index 694df6116fa1..309749c16218 100644 --- a/src/query/datavalues/src/types/deserializations/variant.rs +++ b/src/query/datavalues/src/types/deserializations/variant.rs @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Cursor; use std::io::Read; use common_exception::Result; -use common_io::cursor_ext::*; use common_io::prelude::BinaryRead; use common_io::prelude::FormatSettings; @@ -86,34 +84,6 @@ impl TypeDeserializer for VariantDeserializer { Ok(()) } - fn de_text>( - &mut self, - reader: &mut Cursor, - _format: &FormatSettings, - ) -> Result<()> { - self.buffer.clear(); - reader.read_escaped_string_text(&mut self.buffer)?; - let val = serde_json::from_slice(self.buffer.as_slice())?; - self.builder.append_value(val); - self.memory_size += self.buffer.len(); - Ok(()) - } - - fn de_text_quoted>( - &mut self, - reader: &mut Cursor, - _format: &FormatSettings, - ) -> Result<()> { - self.buffer.clear(); - reader.read_quoted_text(&mut self.buffer, b'\'')?; - - let val = serde_json::from_slice(self.buffer.as_slice())?; - - self.builder.append_value(val); - self.memory_size += self.buffer.len(); - Ok(()) - } - fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { self.builder.append_data_value(value) } From 3a824fd262cda8b7045881c414d36e755aaa6092 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 13 Nov 2022 22:22:24 +0800 Subject: [PATCH 11/14] refactor(common/io): fix typo. --- src/common/io/src/cursor_ext/cursor_read_bytes_ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/io/src/cursor_ext/cursor_read_bytes_ext.rs b/src/common/io/src/cursor_ext/cursor_read_bytes_ext.rs index fae7c7411abf..86975f1e98af 100644 --- a/src/common/io/src/cursor_ext/cursor_read_bytes_ext.rs +++ b/src/common/io/src/cursor_ext/cursor_read_bytes_ext.rs @@ -122,7 +122,7 @@ where T: AsRef<[u8]> fn ignore_bytes(&mut self, bs: &[u8]) -> bool { let available = self.remaining_slice(); let len = bs.len(); - if available.len() <= len { + if available.len() < len { return false; } let eq = available[..len].iter().zip(bs).all(|(x, y)| x == y); From c6e69dfa7cfed3613e7363d435aa649bcc1edbfa Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 13 Nov 2022 22:24:44 +0800 Subject: [PATCH 12/14] refactor(test): clean up lazy to make load_unload_all.sh easier to debug. --- tests/suites/1_stateful/05_formats/05_00_load_unload_all.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/suites/1_stateful/05_formats/05_00_load_unload_all.sh b/tests/suites/1_stateful/05_formats/05_00_load_unload_all.sh index 5a1e6884f0a3..1821cd2813c6 100755 --- a/tests/suites/1_stateful/05_formats/05_00_load_unload_all.sh +++ b/tests/suites/1_stateful/05_formats/05_00_load_unload_all.sh @@ -24,7 +24,10 @@ insert_data() { test_format() { echo "---${1}" + echo "truncate table test_load_unload" | $MYSQL_CLIENT_CONNECT insert_data + rm -f /tmp/test_load_unload2.txt /tmp/test_load_unload.txt + curl -s -u root: -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}" \ -d "select * from test_load_unload FORMAT ${1}" > /tmp/test_load_unload.txt @@ -40,8 +43,6 @@ test_format() { -d "select * from test_load_unload FORMAT ${1}" > /tmp/test_load_unload2.txt diff /tmp/test_load_unload2.txt /tmp/test_load_unload.txt - rm /tmp/test_load_unload2.txt /tmp/test_load_unload.txt - echo "truncate table test_load_unload" | $MYSQL_CLIENT_CONNECT } test_format "CSV" From 2d796026f2bdcafd4b4ecda1c8a1feb3cec2f5c8 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 13 Nov 2022 23:32:03 +0800 Subject: [PATCH 13/14] test(xml): update xml tests. --- .../05_formats/05_03_xml/05_03_01_xml_v1.sh | 26 +++++++++---------- .../05_formats/05_03_xml/05_03_02_xml_v2.sh | 12 ++++----- .../05_formats/05_03_xml/05_03_03_xml_v3.sh | 24 ++++++++--------- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/tests/suites/1_stateful/05_formats/05_03_xml/05_03_01_xml_v1.sh b/tests/suites/1_stateful/05_formats/05_03_xml/05_03_01_xml_v1.sh index 050b88f1c093..247d367563af 100755 --- a/tests/suites/1_stateful/05_formats/05_03_xml/05_03_01_xml_v1.sh +++ b/tests/suites/1_stateful/05_formats/05_03_xml/05_03_01_xml_v1.sh @@ -18,20 +18,20 @@ cat << EOF > /tmp/simple_v1.xml 1 - shuai\"ge - {\"我是\":\"帅哥\"} + shuai"ge + {"我是":"帅哥"} 2022-11-01 10:51:14 2 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 3 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 123 @@ -44,27 +44,27 @@ cat << EOF > /tmp/simple_v2.xml 1 - shuai\"ge - {\"我是\":\"帅哥\"} + shuai"ge + {"我是":"帅哥"} 2022-11-01 10:51:14 2 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 3 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 123 EOF -curl -sH "insert_sql:insert into test_xml format XML" -F "upload=@/tmp/simple_v1.xml" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "SUCCESS" +curl -sH "insert_sql:insert into test_xml format XML" -F "upload=@/tmp/simple_v1.xml" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "SUCCESS" echo "select * from test_xml" | $MYSQL_CLIENT_CONNECT echo "truncate table test_xml" | $MYSQL_CLIENT_CONNECT diff --git a/tests/suites/1_stateful/05_formats/05_03_xml/05_03_02_xml_v2.sh b/tests/suites/1_stateful/05_formats/05_03_xml/05_03_02_xml_v2.sh index 9c75a16b3035..17d34a8b4f51 100755 --- a/tests/suites/1_stateful/05_formats/05_03_xml/05_03_02_xml_v2.sh +++ b/tests/suites/1_stateful/05_formats/05_03_xml/05_03_02_xml_v2.sh @@ -16,9 +16,9 @@ echo "CREATE TABLE test_xml ( cat << EOF > /tmp/simple_v2.xml - - - + + + EOF @@ -26,9 +26,9 @@ EOF cat << EOF > /tmp/simple_v3.xml - - - + + + EOF diff --git a/tests/suites/1_stateful/05_formats/05_03_xml/05_03_03_xml_v3.sh b/tests/suites/1_stateful/05_formats/05_03_xml/05_03_03_xml_v3.sh index 2fa52676d057..c9270beffdfc 100755 --- a/tests/suites/1_stateful/05_formats/05_03_xml/05_03_03_xml_v3.sh +++ b/tests/suites/1_stateful/05_formats/05_03_xml/05_03_03_xml_v3.sh @@ -8,20 +8,20 @@ cat << EOF > /tmp/simple_v3.xml 1 - shuai\"ge - {\"我是\":\"帅哥\"} + shuai"ge + {"我是":"帅哥"} 2022-11-01 10:51:14 2 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 3 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 123 @@ -34,20 +34,20 @@ cat << EOF > /tmp/simple_v4.xml 1 - shuai\"ge - {\"我是\":\"帅哥\"} + shuai"ge + {"我是":"帅哥"} 2022-11-01 10:51:14 2 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 3 - \"mengnan\" - \"猛\"男 + "mengnan" + "猛"男 2022-11-01 10:51:14 123 From cda3baf96c59ae4cdec8d5fa909313a80d147a70 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 14 Nov 2022 13:34:57 +0800 Subject: [PATCH 14/14] fix clippy. --- src/query/formats/src/field_decoder/values.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/query/formats/src/field_decoder/values.rs b/src/query/formats/src/field_decoder/values.rs index 4370fe82e64e..3b2dddcccea4 100644 --- a/src/query/formats/src/field_decoder/values.rs +++ b/src/query/formats/src/field_decoder/values.rs @@ -94,10 +94,9 @@ impl FieldDecoderRowBased for FieldDecoderValues { ) -> Result<()> { if reader.eof() { column.de_default(); - } else if raw && self.match_bytes(reader, b"NULL") || self.match_bytes(reader, b"null") { - column.de_default(); - return Ok(()); - } else if !raw && reader.ignore_bytes(b"NULL") || reader.ignore_bytes(b"null") { + } else if (raw && (self.match_bytes(reader, b"NULL") || self.match_bytes(reader, b"null"))) + || (!raw && (reader.ignore_bytes(b"NULL") || reader.ignore_bytes(b"null"))) + { column.de_default(); return Ok(()); } else {