Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): fix tsv deserialization #6453

Merged
merged 6 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/datavalues/src/types/deserializations/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl TypeDeserializer for NullDeserializer {
}

fn de_whole_text(&mut self, _reader: &[u8], _format: &FormatSettings) -> Result<()> {
self.builder.append_default();
Ok(())
}

Expand Down
33 changes: 21 additions & 12 deletions common/datavalues/src/types/deserializations/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl TypeDeserializer for NullableDeserializer {
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
reader.pop_checkpoint();
self.de_default(format);
return Ok(());
Expand All @@ -87,24 +87,30 @@ impl TypeDeserializer for NullableDeserializer {
Ok(())
}

// TODO: support null text setting
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
reader.pop_checkpoint();
if reader.eof()? {
self.de_default(format);
return Ok(());
} else {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
let buffer = reader.fill_buf()?;
if buffer.is_empty()
|| (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b'\t')
{
self.de_default(format);
reader.pop_checkpoint();
return Ok(());
}
}
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text(reader, format)?;
self.bitmap.push(true);
}

Ok(())
}

Expand All @@ -114,7 +120,7 @@ impl TypeDeserializer for NullableDeserializer {
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
reader.pop_checkpoint();
self.de_default(format);
} else {
Expand All @@ -135,10 +141,13 @@ impl TypeDeserializer for NullableDeserializer {
self.de_default(format);
} else {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
let buffer = reader.fill_buf()?;
if !buffer.is_empty()
&& (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b',')

if buffer.is_empty()
|| (buffer[0] == b'\r'
|| buffer[0] == b'\n'
|| buffer[0] == format.field_delimiter[0])
{
self.de_default(format);
reader.pop_checkpoint();
Expand All @@ -154,7 +163,7 @@ impl TypeDeserializer for NullableDeserializer {
}

fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> {
if reader.eq_ignore_ascii_case(b"null") {
if reader.eq_ignore_ascii_case(&format.null_bytes) {
self.de_default(format);
return Ok(());
}
Expand Down
7 changes: 6 additions & 1 deletion common/datavalues/src/types/deserializations/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ impl TypeDeserializer for StringDeserializer {
) -> Result<()> {
let mut read_buffer = reader.fill_buf()?;
if read_buffer.is_empty() {
return Err(ErrorCode::BadBytes("Read string after eof."));
self.builder.append_default();
return Ok(());
}

let maybe_quote = read_buffer[0];
Expand All @@ -129,6 +130,10 @@ impl TypeDeserializer for StringDeserializer {
if !settings.field_delimiter.is_empty() {
field_delimiter = settings.field_delimiter[0];
}
if maybe_quote == field_delimiter || maybe_quote == b'\r' || maybe_quote == b'\n' {
self.builder.append_default();
return Ok(());
}

if settings.record_delimiter.is_empty()
|| settings.record_delimiter[0] == b'\r'
Expand Down
4 changes: 3 additions & 1 deletion common/formats/src/format_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl CsvInputFormat {
pub fn try_create(
_name: &str,
schema: DataSchemaRef,
settings: FormatSettings,
mut settings: FormatSettings,
skip_rows: usize,
min_accepted_rows: usize,
min_accepted_bytes: usize,
Expand All @@ -106,6 +106,8 @@ impl CsvInputFormat {
record_delimiter = Some(settings.record_delimiter[0]);
}

settings.null_bytes = settings.csv_null_bytes.clone();

Ok(Box::new(CsvInputFormat {
schema,
settings,
Expand Down
9 changes: 6 additions & 3 deletions common/formats/src/format_tsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ impl TsvInputFormat {
pub fn try_create(
_name: &str,
schema: DataSchemaRef,
settings: FormatSettings,
mut settings: FormatSettings,
skip_rows: usize,
min_accepted_rows: usize,
min_accepted_bytes: usize,
) -> Result<Box<dyn InputFormat>> {
settings.field_delimiter = vec![b'\t'];
settings.record_delimiter = vec![b'\n'];
settings.null_bytes = settings.tsv_null_bytes.clone();

Ok(Box::new(TsvInputFormat {
schema,
settings,
Expand Down Expand Up @@ -193,8 +197,7 @@ impl InputFormat for TsvInputFormat {
if checkpoint_reader.ignore_white_spaces_and_byte(b'\t')? {
deserializers[column_index].de_default(&self.settings);
} else {
deserializers[column_index]
.de_text_csv(&mut checkpoint_reader, &self.settings)?;
deserializers[column_index].de_text(&mut checkpoint_reader, &self.settings)?;

if column_index + 1 != deserializers.len() {
checkpoint_reader.must_ignore_white_spaces_and_byte(b'\t')?;
Expand Down
12 changes: 9 additions & 3 deletions common/io/src/buffer/buffer_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ pub trait BufferReadExt: BufferRead {
if b.is_empty() {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"Expected to have terminated string literal.".to_string(),
"Expected to have terminated string literal after escaped char '\' ."
.to_string(),
));
}
let c = b[0];
self.ignore_byte(c)?;

match c {
b'n' => buf.push(b'\n'),
b't' => buf.push(b'\t'),
Expand All @@ -80,13 +82,17 @@ pub trait BufferReadExt: BufferRead {
}
Err(std::io::Error::new(
ErrorKind::InvalidData,
"Expected to have terminated string literal.".to_string(),
format!("Expected to have terminated string literal after quota {:?}, while consumed buf: {:?}", quota as char, buf),
))
}

fn read_escaped_string_text(&mut self, buf: &mut Vec<u8>) -> Result<()> {
self.keep_read(buf, |f| f != b'\t' && f != b'\n' && f != b'\\')?;
// TODO judge escaped '\\'
if self.ignore_byte(b'\\')? {
// TODO parse complex escape sequence
buf.push(b'\\');
self.read_escaped_string_text(buf)?;
}
Ok(())
}

Expand Down
44 changes: 30 additions & 14 deletions query/src/servers/clickhouse/writers/query_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

use std::borrow::Cow;

use chrono::Date;
use chrono::DateTime;
use chrono::Datelike;
use common_base::base::ProgressValues;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
Expand All @@ -29,7 +32,6 @@ use opensrv_clickhouse::errors::Result as CHResult;
use opensrv_clickhouse::errors::ServerError;
use opensrv_clickhouse::types::column::{self};
use opensrv_clickhouse::types::Block;
use opensrv_clickhouse::types::DateTimeType;
use opensrv_clickhouse::types::SqlType;

use crate::servers::clickhouse::interactive_worker_base::BlockItem;
Expand Down Expand Up @@ -164,14 +166,20 @@ pub fn to_clickhouse_block(block: DataBlock, format: &FormatSettings) -> Result<
pub fn from_clickhouse_block(schema: DataSchemaRef, block: Block) -> Result<DataBlock> {
let get_series = |block: &Block, index: usize| -> CHResult<ColumnRef> {
let col = &block.columns()[index];

match col.sql_type() {
SqlType::UInt8 => Ok(UInt8Column::from_iterator(col.iter::<u8>()?.copied()).arc()),
SqlType::UInt16 | SqlType::Date => {
Ok(UInt16Column::from_iterator(col.iter::<u16>()?.copied()).arc())
}
SqlType::UInt32 | SqlType::DateTime(DateTimeType::DateTime32) => {
Ok(UInt32Column::from_iterator(col.iter::<u32>()?.copied()).arc())
}
SqlType::UInt16 => Ok(UInt16Column::from_iterator(col.iter::<u16>()?.copied()).arc()),
SqlType::Date => Ok(Int32Column::from_iterator(
col.iter::<Date<_>>()?
.map(|v| v.naive_utc().num_days_from_ce()),
)
.arc()),
SqlType::UInt32 => Ok(UInt32Column::from_iterator(col.iter::<u32>()?.copied()).arc()),
SqlType::DateTime(_) => Ok(Int64Column::from_iterator(
col.iter::<DateTime<_>>()?.map(|v| v.timestamp_micros()),
)
.arc()),
SqlType::UInt64 => Ok(UInt64Column::from_iterator(col.iter::<u64>()?.copied()).arc()),
SqlType::Int8 => Ok(Int8Column::from_iterator(col.iter::<i8>()?.copied()).arc()),
SqlType::Int16 => Ok(Int16Column::from_iterator(col.iter::<i16>()?.copied()).arc()),
Expand All @@ -185,13 +193,21 @@ pub fn from_clickhouse_block(schema: DataSchemaRef, block: Block) -> Result<Data
SqlType::Nullable(SqlType::UInt8) => Ok(Series::from_data(
col.iter::<Option<u8>>()?.map(|c| c.copied()),
)),
SqlType::Nullable(SqlType::UInt16) | SqlType::Nullable(SqlType::Date) => Ok(
Series::from_data(col.iter::<Option<u16>>()?.map(|c| c.copied())),
),
SqlType::Nullable(SqlType::UInt32)
| SqlType::Nullable(SqlType::DateTime(DateTimeType::DateTime32)) => Ok(
Series::from_data(col.iter::<Option<u32>>()?.map(|c| c.copied())),
),
SqlType::Nullable(SqlType::UInt16) => Ok(Series::from_data(
col.iter::<Option<u16>>()?.map(|c| c.copied()),
)),
SqlType::Nullable(SqlType::Date) => {
Ok(Series::from_data(col.iter::<Option<Date<_>>>()?.map(|c| {
c.map(|v| v.naive_utc().num_days_from_ce() as u16)
})))
}
SqlType::Nullable(SqlType::UInt32) => Ok(Series::from_data(
col.iter::<Option<u32>>()?.map(|c| c.copied()),
)),
SqlType::Nullable(SqlType::DateTime(_)) => Ok(Series::from_data(
col.iter::<Option<DateTime<_>>>()?
.map(|c| c.map(|v| v.timestamp_micros())),
)),
SqlType::Nullable(SqlType::UInt64) => Ok(Series::from_data(
col.iter::<Option<u64>>()?.map(|c| c.copied()),
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ EOF
cat << EOF > /tmp/databend_test_tsv_names_and_types.txt
insert into a(a,b,c) format TabSeparatedWithNamesAndTypes a b c
'int' 'varchar' 'double'
100 '2' 100.3
200 '3' 200.4
300 '2' 300
100 2 100.3
200 3 200.4
300 2 300

EOF

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ EOF
cat << EOF > /tmp/databend_test_tsv_names_and_types.txt
insert into a(a,b,c) format TabSeparatedWithNamesAndTypes a b c
'int' 'varchar' 'double'
100 '2' 100.3
200 '3' 200.4
300 '2' 300
100 2 100.3
200 3 200.4
300 2 300

EOF

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
0 0 0 0
3 0 0 0 0
9 0 0 2 0
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ echo "CREATE TABLE books

# load csv
curl -H "insert_sql:insert into books format CSV" -F "upload=@${CURDIR}/books.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
echo "select count(), count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT

# load tsv
curl -H "insert_sql:insert into books format TSV" -F "upload=@${CURDIR}/books.tsv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
echo "select count(), count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT

echo "select count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT

echo "drop table books;" | $MYSQL_CLIENT_CONNECT

6 changes: 6 additions & 0 deletions tests/suites/1_stateful/01_load_v2/books.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Transaction Processing Jim Gray 1992 2020-01-01 11:11:11.345
Readings in Database Systems Michael Stonebraker 2004 2020-01-01T11:11:11Z
Three Body "NULL-liucixin 2019 2019-07-04T00:00:00
Three Body NULL-liucixin\rxx \N 2019-07-04T00:00:00
Three Body NULL-liucixin\fxx \N 2019-07-04T00:00:00
Three Body NULL-liucixin\fxx NULL 2019-07-04T00:00:00