diff --git a/common/datavalues/src/types/deserializations/null.rs b/common/datavalues/src/types/deserializations/null.rs index 2b102bd16423..7561976df3ed 100644 --- a/common/datavalues/src/types/deserializations/null.rs +++ b/common/datavalues/src/types/deserializations/null.rs @@ -55,6 +55,7 @@ impl TypeDeserializer for NullDeserializer { } fn de_whole_text(&mut self, _reader: &[u8], _format: &FormatSettings) -> Result<()> { + self.builder.append_default(); Ok(()) } diff --git a/common/datavalues/src/types/deserializations/nullable.rs b/common/datavalues/src/types/deserializations/nullable.rs index e7ce25c72bc3..bb2971f2df5d 100644 --- a/common/datavalues/src/types/deserializations/nullable.rs +++ b/common/datavalues/src/types/deserializations/nullable.rs @@ -75,7 +75,7 @@ impl TypeDeserializer for NullableDeserializer { format: &FormatSettings, ) -> Result<()> { reader.push_checkpoint(); - if reader.ignore_insensitive_bytes(b"null")? { + if reader.ignore_insensitive_bytes(&format.null_bytes)? { reader.pop_checkpoint(); self.de_default(format); return Ok(()); @@ -87,24 +87,30 @@ impl TypeDeserializer for NullableDeserializer { Ok(()) } - // TODO: support null text setting fn de_text( &mut self, reader: &mut NestedCheckpointReader, format: &FormatSettings, ) -> Result<()> { - reader.push_checkpoint(); - if reader.ignore_insensitive_bytes(b"null")? { - reader.pop_checkpoint(); + if reader.eof()? { self.de_default(format); - return Ok(()); } else { + reader.push_checkpoint(); + if reader.ignore_insensitive_bytes(&format.null_bytes)? { + let buffer = reader.fill_buf()?; + if buffer.is_empty() + || (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b'\t') + { + self.de_default(format); + reader.pop_checkpoint(); + return Ok(()); + } + } reader.rollback_to_checkpoint()?; reader.pop_checkpoint(); self.inner.de_text(reader, format)?; self.bitmap.push(true); } - Ok(()) } @@ -114,7 +120,7 @@ impl TypeDeserializer for NullableDeserializer { format: &FormatSettings, ) -> Result<()> { reader.push_checkpoint(); - if reader.ignore_insensitive_bytes(b"null")? { + if reader.ignore_insensitive_bytes(&format.null_bytes)? { reader.pop_checkpoint(); self.de_default(format); } else { @@ -135,10 +141,13 @@ impl TypeDeserializer for NullableDeserializer { self.de_default(format); } else { reader.push_checkpoint(); - if reader.ignore_insensitive_bytes(b"null")? { + if reader.ignore_insensitive_bytes(&format.null_bytes)? { let buffer = reader.fill_buf()?; - if !buffer.is_empty() - && (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b',') + + if buffer.is_empty() + || (buffer[0] == b'\r' + || buffer[0] == b'\n' + || buffer[0] == format.field_delimiter[0]) { self.de_default(format); reader.pop_checkpoint(); @@ -154,7 +163,7 @@ impl TypeDeserializer for NullableDeserializer { } fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> { - if reader.eq_ignore_ascii_case(b"null") { + if reader.eq_ignore_ascii_case(&format.null_bytes) { self.de_default(format); return Ok(()); } diff --git a/common/datavalues/src/types/deserializations/string.rs b/common/datavalues/src/types/deserializations/string.rs index 301cef5efd75..ce9f99397169 100644 --- a/common/datavalues/src/types/deserializations/string.rs +++ b/common/datavalues/src/types/deserializations/string.rs @@ -113,7 +113,8 @@ impl TypeDeserializer for StringDeserializer { ) -> Result<()> { let mut read_buffer = reader.fill_buf()?; if read_buffer.is_empty() { - return Err(ErrorCode::BadBytes("Read string after eof.")); + self.builder.append_default(); + return Ok(()); } let maybe_quote = read_buffer[0]; @@ -129,6 +130,10 @@ impl TypeDeserializer for StringDeserializer { if !settings.field_delimiter.is_empty() { field_delimiter = settings.field_delimiter[0]; } + if maybe_quote == field_delimiter || maybe_quote == b'\r' || maybe_quote == b'\n' { + self.builder.append_default(); + return Ok(()); + } if settings.record_delimiter.is_empty() || settings.record_delimiter[0] == b'\r' diff --git a/common/formats/src/format_csv.rs b/common/formats/src/format_csv.rs index a6e07f4731de..da9867be06bf 100644 --- a/common/formats/src/format_csv.rs +++ b/common/formats/src/format_csv.rs @@ -87,7 +87,7 @@ impl CsvInputFormat { pub fn try_create( _name: &str, schema: DataSchemaRef, - settings: FormatSettings, + mut settings: FormatSettings, skip_rows: usize, min_accepted_rows: usize, min_accepted_bytes: usize, @@ -106,6 +106,8 @@ impl CsvInputFormat { record_delimiter = Some(settings.record_delimiter[0]); } + settings.null_bytes = settings.csv_null_bytes.clone(); + Ok(Box::new(CsvInputFormat { schema, settings, diff --git a/common/formats/src/format_tsv.rs b/common/formats/src/format_tsv.rs index 17a24bf290f7..bc91a8fda794 100644 --- a/common/formats/src/format_tsv.rs +++ b/common/formats/src/format_tsv.rs @@ -88,11 +88,15 @@ impl TsvInputFormat { pub fn try_create( _name: &str, schema: DataSchemaRef, - settings: FormatSettings, + mut settings: FormatSettings, skip_rows: usize, min_accepted_rows: usize, min_accepted_bytes: usize, ) -> Result> { + settings.field_delimiter = vec![b'\t']; + settings.record_delimiter = vec![b'\n']; + settings.null_bytes = settings.tsv_null_bytes.clone(); + Ok(Box::new(TsvInputFormat { schema, settings, @@ -193,8 +197,7 @@ impl InputFormat for TsvInputFormat { if checkpoint_reader.ignore_white_spaces_and_byte(b'\t')? { deserializers[column_index].de_default(&self.settings); } else { - deserializers[column_index] - .de_text_csv(&mut checkpoint_reader, &self.settings)?; + deserializers[column_index].de_text(&mut checkpoint_reader, &self.settings)?; if column_index + 1 != deserializers.len() { checkpoint_reader.must_ignore_white_spaces_and_byte(b'\t')?; diff --git a/common/io/src/buffer/buffer_read_ext.rs b/common/io/src/buffer/buffer_read_ext.rs index 77214c4b78c8..e68ad2088734 100644 --- a/common/io/src/buffer/buffer_read_ext.rs +++ b/common/io/src/buffer/buffer_read_ext.rs @@ -56,11 +56,13 @@ pub trait BufferReadExt: BufferRead { if b.is_empty() { return Err(std::io::Error::new( ErrorKind::InvalidData, - "Expected to have terminated string literal.".to_string(), + "Expected to have terminated string literal after escaped char '\' ." + .to_string(), )); } let c = b[0]; self.ignore_byte(c)?; + match c { b'n' => buf.push(b'\n'), b't' => buf.push(b'\t'), @@ -80,13 +82,17 @@ pub trait BufferReadExt: BufferRead { } Err(std::io::Error::new( ErrorKind::InvalidData, - "Expected to have terminated string literal.".to_string(), + format!("Expected to have terminated string literal after quota {:?}, while consumed buf: {:?}", quota as char, buf), )) } fn read_escaped_string_text(&mut self, buf: &mut Vec) -> Result<()> { self.keep_read(buf, |f| f != b'\t' && f != b'\n' && f != b'\\')?; - // TODO judge escaped '\\' + if self.ignore_byte(b'\\')? { + // TODO parse complex escape sequence + buf.push(b'\\'); + self.read_escaped_string_text(buf)?; + } Ok(()) } diff --git a/query/src/servers/clickhouse/writers/query_writer.rs b/query/src/servers/clickhouse/writers/query_writer.rs index dc8ccb87ed71..7532e7ee6074 100644 --- a/query/src/servers/clickhouse/writers/query_writer.rs +++ b/query/src/servers/clickhouse/writers/query_writer.rs @@ -14,6 +14,9 @@ use std::borrow::Cow; +use chrono::Date; +use chrono::DateTime; +use chrono::Datelike; use common_base::base::ProgressValues; use common_datablocks::DataBlock; use common_datavalues::prelude::*; @@ -29,7 +32,6 @@ use opensrv_clickhouse::errors::Result as CHResult; use opensrv_clickhouse::errors::ServerError; use opensrv_clickhouse::types::column::{self}; use opensrv_clickhouse::types::Block; -use opensrv_clickhouse::types::DateTimeType; use opensrv_clickhouse::types::SqlType; use crate::servers::clickhouse::interactive_worker_base::BlockItem; @@ -164,14 +166,20 @@ pub fn to_clickhouse_block(block: DataBlock, format: &FormatSettings) -> Result< pub fn from_clickhouse_block(schema: DataSchemaRef, block: Block) -> Result { let get_series = |block: &Block, index: usize| -> CHResult { let col = &block.columns()[index]; + match col.sql_type() { SqlType::UInt8 => Ok(UInt8Column::from_iterator(col.iter::()?.copied()).arc()), - SqlType::UInt16 | SqlType::Date => { - Ok(UInt16Column::from_iterator(col.iter::()?.copied()).arc()) - } - SqlType::UInt32 | SqlType::DateTime(DateTimeType::DateTime32) => { - Ok(UInt32Column::from_iterator(col.iter::()?.copied()).arc()) - } + SqlType::UInt16 => Ok(UInt16Column::from_iterator(col.iter::()?.copied()).arc()), + SqlType::Date => Ok(Int32Column::from_iterator( + col.iter::>()? + .map(|v| v.naive_utc().num_days_from_ce()), + ) + .arc()), + SqlType::UInt32 => Ok(UInt32Column::from_iterator(col.iter::()?.copied()).arc()), + SqlType::DateTime(_) => Ok(Int64Column::from_iterator( + col.iter::>()?.map(|v| v.timestamp_micros()), + ) + .arc()), SqlType::UInt64 => Ok(UInt64Column::from_iterator(col.iter::()?.copied()).arc()), SqlType::Int8 => Ok(Int8Column::from_iterator(col.iter::()?.copied()).arc()), SqlType::Int16 => Ok(Int16Column::from_iterator(col.iter::()?.copied()).arc()), @@ -185,13 +193,21 @@ pub fn from_clickhouse_block(schema: DataSchemaRef, block: Block) -> Result Ok(Series::from_data( col.iter::>()?.map(|c| c.copied()), )), - SqlType::Nullable(SqlType::UInt16) | SqlType::Nullable(SqlType::Date) => Ok( - Series::from_data(col.iter::>()?.map(|c| c.copied())), - ), - SqlType::Nullable(SqlType::UInt32) - | SqlType::Nullable(SqlType::DateTime(DateTimeType::DateTime32)) => Ok( - Series::from_data(col.iter::>()?.map(|c| c.copied())), - ), + SqlType::Nullable(SqlType::UInt16) => Ok(Series::from_data( + col.iter::>()?.map(|c| c.copied()), + )), + SqlType::Nullable(SqlType::Date) => { + Ok(Series::from_data(col.iter::>>()?.map(|c| { + c.map(|v| v.naive_utc().num_days_from_ce() as u16) + }))) + } + SqlType::Nullable(SqlType::UInt32) => Ok(Series::from_data( + col.iter::>()?.map(|c| c.copied()), + )), + SqlType::Nullable(SqlType::DateTime(_)) => Ok(Series::from_data( + col.iter::>>()? + .map(|c| c.map(|v| v.timestamp_micros())), + )), SqlType::Nullable(SqlType::UInt64) => Ok(Series::from_data( col.iter::>()?.map(|c| c.copied()), )), diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh index ef6885a2ce43..5183ef9db96c 100755 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh @@ -32,9 +32,9 @@ EOF cat << EOF > /tmp/databend_test_tsv_names_and_types.txt insert into a(a,b,c) format TabSeparatedWithNamesAndTypes a b c 'int' 'varchar' 'double' -100 '2' 100.3 -200 '3' 200.4 -300 '2' 300 +100 2 100.3 +200 3 200.4 +300 2 300 EOF diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format_v2.sh b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format_v2.sh index 8959a3d89863..964e0aa57b86 100755 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format_v2.sh +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format_v2.sh @@ -32,9 +32,9 @@ EOF cat << EOF > /tmp/databend_test_tsv_names_and_types.txt insert into a(a,b,c) format TabSeparatedWithNamesAndTypes a b c 'int' 'varchar' 'double' -100 '2' 100.3 -200 '3' 200.4 -300 '2' 300 +100 2 100.3 +200 3 200.4 +300 2 300 EOF diff --git a/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.result b/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.result index 60fcae126d90..e0f18b2a30fa 100755 --- a/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.result +++ b/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.result @@ -1 +1,2 @@ -0 0 0 0 +3 0 0 0 0 +9 0 0 2 0 diff --git a/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.sh b/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.sh index 1287eb6f379a..af6ba727742c 100755 --- a/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.sh +++ b/tests/suites/1_stateful/01_load_v2/01_0000_streaming_load_books.sh @@ -15,7 +15,12 @@ echo "CREATE TABLE books # load csv curl -H "insert_sql:insert into books format CSV" -F "upload=@${CURDIR}/books.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1 +echo "select count(), count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT + +# load tsv +curl -H "insert_sql:insert into books format TSV" -F "upload=@${CURDIR}/books.tsv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1 +echo "select count(), count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT -echo "select count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT echo "drop table books;" | $MYSQL_CLIENT_CONNECT + diff --git a/tests/suites/1_stateful/01_load_v2/books.tsv b/tests/suites/1_stateful/01_load_v2/books.tsv new file mode 100644 index 000000000000..fa1fa22aa9b9 --- /dev/null +++ b/tests/suites/1_stateful/01_load_v2/books.tsv @@ -0,0 +1,6 @@ +Transaction Processing Jim Gray 1992 2020-01-01 11:11:11.345 +Readings in Database Systems Michael Stonebraker 2004 2020-01-01T11:11:11Z +Three Body "NULL-liucixin 2019 2019-07-04T00:00:00 +Three Body NULL-liucixin\rxx \N 2019-07-04T00:00:00 +Three Body NULL-liucixin\fxx \N 2019-07-04T00:00:00 +Three Body NULL-liucixin\fxx NULL 2019-07-04T00:00:00