Skip to content

Commit

Permalink
Merge of #6453
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 5, 2022
2 parents b338ab2 + 1ea84d8 commit c18ebb5
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 42 deletions.
1 change: 1 addition & 0 deletions common/datavalues/src/types/deserializations/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl TypeDeserializer for NullDeserializer {
}

fn de_whole_text(&mut self, _reader: &[u8], _format: &FormatSettings) -> Result<()> {
self.builder.append_default();
Ok(())
}

Expand Down
33 changes: 21 additions & 12 deletions common/datavalues/src/types/deserializations/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl TypeDeserializer for NullableDeserializer {
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
reader.pop_checkpoint();
self.de_default(format);
return Ok(());
Expand All @@ -87,24 +87,30 @@ impl TypeDeserializer for NullableDeserializer {
Ok(())
}

// TODO: support null text setting
fn de_text<R: BufferRead>(
&mut self,
reader: &mut NestedCheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
reader.pop_checkpoint();
if reader.eof()? {
self.de_default(format);
return Ok(());
} else {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
let buffer = reader.fill_buf()?;
if buffer.is_empty()
|| (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b'\t')
{
self.de_default(format);
reader.pop_checkpoint();
return Ok(());
}
}
reader.rollback_to_checkpoint()?;
reader.pop_checkpoint();
self.inner.de_text(reader, format)?;
self.bitmap.push(true);
}

Ok(())
}

Expand All @@ -114,7 +120,7 @@ impl TypeDeserializer for NullableDeserializer {
format: &FormatSettings,
) -> Result<()> {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
reader.pop_checkpoint();
self.de_default(format);
} else {
Expand All @@ -135,10 +141,13 @@ impl TypeDeserializer for NullableDeserializer {
self.de_default(format);
} else {
reader.push_checkpoint();
if reader.ignore_insensitive_bytes(b"null")? {
if reader.ignore_insensitive_bytes(&format.null_bytes)? {
let buffer = reader.fill_buf()?;
if !buffer.is_empty()
&& (buffer[0] == b'\r' || buffer[0] == b'\n' || buffer[0] == b',')

if buffer.is_empty()
|| (buffer[0] == b'\r'
|| buffer[0] == b'\n'
|| buffer[0] == format.field_delimiter[0])
{
self.de_default(format);
reader.pop_checkpoint();
Expand All @@ -154,7 +163,7 @@ impl TypeDeserializer for NullableDeserializer {
}

fn de_whole_text(&mut self, reader: &[u8], format: &FormatSettings) -> Result<()> {
if reader.eq_ignore_ascii_case(b"null") {
if reader.eq_ignore_ascii_case(&format.null_bytes) {
self.de_default(format);
return Ok(());
}
Expand Down
7 changes: 6 additions & 1 deletion common/datavalues/src/types/deserializations/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ impl TypeDeserializer for StringDeserializer {
) -> Result<()> {
let mut read_buffer = reader.fill_buf()?;
if read_buffer.is_empty() {
return Err(ErrorCode::BadBytes("Read string after eof."));
self.builder.append_default();
return Ok(());
}

let maybe_quote = read_buffer[0];
Expand All @@ -129,6 +130,10 @@ impl TypeDeserializer for StringDeserializer {
if !settings.field_delimiter.is_empty() {
field_delimiter = settings.field_delimiter[0];
}
if maybe_quote == field_delimiter || maybe_quote == b'\r' || maybe_quote == b'\n' {
self.builder.append_default();
return Ok(());
}

if settings.record_delimiter.is_empty()
|| settings.record_delimiter[0] == b'\r'
Expand Down
4 changes: 3 additions & 1 deletion common/formats/src/format_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl CsvInputFormat {
pub fn try_create(
_name: &str,
schema: DataSchemaRef,
settings: FormatSettings,
mut settings: FormatSettings,
skip_rows: usize,
min_accepted_rows: usize,
min_accepted_bytes: usize,
Expand All @@ -106,6 +106,8 @@ impl CsvInputFormat {
record_delimiter = Some(settings.record_delimiter[0]);
}

settings.null_bytes = settings.csv_null_bytes.clone();

Ok(Box::new(CsvInputFormat {
schema,
settings,
Expand Down
9 changes: 6 additions & 3 deletions common/formats/src/format_tsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ impl TsvInputFormat {
pub fn try_create(
_name: &str,
schema: DataSchemaRef,
settings: FormatSettings,
mut settings: FormatSettings,
skip_rows: usize,
min_accepted_rows: usize,
min_accepted_bytes: usize,
) -> Result<Box<dyn InputFormat>> {
settings.field_delimiter = vec![b'\t'];
settings.record_delimiter = vec![b'\n'];
settings.null_bytes = settings.tsv_null_bytes.clone();

Ok(Box::new(TsvInputFormat {
schema,
settings,
Expand Down Expand Up @@ -193,8 +197,7 @@ impl InputFormat for TsvInputFormat {
if checkpoint_reader.ignore_white_spaces_and_byte(b'\t')? {
deserializers[column_index].de_default(&self.settings);
} else {
deserializers[column_index]
.de_text_csv(&mut checkpoint_reader, &self.settings)?;
deserializers[column_index].de_text(&mut checkpoint_reader, &self.settings)?;

if column_index + 1 != deserializers.len() {
checkpoint_reader.must_ignore_white_spaces_and_byte(b'\t')?;
Expand Down
12 changes: 9 additions & 3 deletions common/io/src/buffer/buffer_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ pub trait BufferReadExt: BufferRead {
if b.is_empty() {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"Expected to have terminated string literal.".to_string(),
"Expected to have terminated string literal after escaped char '\' ."
.to_string(),
));
}
let c = b[0];
self.ignore_byte(c)?;

match c {
b'n' => buf.push(b'\n'),
b't' => buf.push(b'\t'),
Expand All @@ -80,13 +82,17 @@ pub trait BufferReadExt: BufferRead {
}
Err(std::io::Error::new(
ErrorKind::InvalidData,
"Expected to have terminated string literal.".to_string(),
format!("Expected to have terminated string literal after quota {:?}, while consumed buf: {:?}", quota as char, buf),
))
}

fn read_escaped_string_text(&mut self, buf: &mut Vec<u8>) -> Result<()> {
self.keep_read(buf, |f| f != b'\t' && f != b'\n' && f != b'\\')?;
// TODO judge escaped '\\'
if self.ignore_byte(b'\\')? {
// TODO parse complex escape sequence
buf.push(b'\\');
self.read_escaped_string_text(buf)?;
}
Ok(())
}

Expand Down
44 changes: 30 additions & 14 deletions query/src/servers/clickhouse/writers/query_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

use std::borrow::Cow;

use chrono::Date;
use chrono::DateTime;
use chrono::Datelike;
use common_base::base::ProgressValues;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
Expand All @@ -29,7 +32,6 @@ use opensrv_clickhouse::errors::Result as CHResult;
use opensrv_clickhouse::errors::ServerError;
use opensrv_clickhouse::types::column::{self};
use opensrv_clickhouse::types::Block;
use opensrv_clickhouse::types::DateTimeType;
use opensrv_clickhouse::types::SqlType;

use crate::servers::clickhouse::interactive_worker_base::BlockItem;
Expand Down Expand Up @@ -164,14 +166,20 @@ pub fn to_clickhouse_block(block: DataBlock, format: &FormatSettings) -> Result<
pub fn from_clickhouse_block(schema: DataSchemaRef, block: Block) -> Result<DataBlock> {
let get_series = |block: &Block, index: usize| -> CHResult<ColumnRef> {
let col = &block.columns()[index];

match col.sql_type() {
SqlType::UInt8 => Ok(UInt8Column::from_iterator(col.iter::<u8>()?.copied()).arc()),
SqlType::UInt16 | SqlType::Date => {
Ok(UInt16Column::from_iterator(col.iter::<u16>()?.copied()).arc())
}
SqlType::UInt32 | SqlType::DateTime(DateTimeType::DateTime32) => {
Ok(UInt32Column::from_iterator(col.iter::<u32>()?.copied()).arc())
}
SqlType::UInt16 => Ok(UInt16Column::from_iterator(col.iter::<u16>()?.copied()).arc()),
SqlType::Date => Ok(Int32Column::from_iterator(
col.iter::<Date<_>>()?
.map(|v| v.naive_utc().num_days_from_ce()),
)
.arc()),
SqlType::UInt32 => Ok(UInt32Column::from_iterator(col.iter::<u32>()?.copied()).arc()),
SqlType::DateTime(_) => Ok(Int64Column::from_iterator(
col.iter::<DateTime<_>>()?.map(|v| v.timestamp_micros()),
)
.arc()),
SqlType::UInt64 => Ok(UInt64Column::from_iterator(col.iter::<u64>()?.copied()).arc()),
SqlType::Int8 => Ok(Int8Column::from_iterator(col.iter::<i8>()?.copied()).arc()),
SqlType::Int16 => Ok(Int16Column::from_iterator(col.iter::<i16>()?.copied()).arc()),
Expand All @@ -185,13 +193,21 @@ pub fn from_clickhouse_block(schema: DataSchemaRef, block: Block) -> Result<Data
SqlType::Nullable(SqlType::UInt8) => Ok(Series::from_data(
col.iter::<Option<u8>>()?.map(|c| c.copied()),
)),
SqlType::Nullable(SqlType::UInt16) | SqlType::Nullable(SqlType::Date) => Ok(
Series::from_data(col.iter::<Option<u16>>()?.map(|c| c.copied())),
),
SqlType::Nullable(SqlType::UInt32)
| SqlType::Nullable(SqlType::DateTime(DateTimeType::DateTime32)) => Ok(
Series::from_data(col.iter::<Option<u32>>()?.map(|c| c.copied())),
),
SqlType::Nullable(SqlType::UInt16) => Ok(Series::from_data(
col.iter::<Option<u16>>()?.map(|c| c.copied()),
)),
SqlType::Nullable(SqlType::Date) => {
Ok(Series::from_data(col.iter::<Option<Date<_>>>()?.map(|c| {
c.map(|v| v.naive_utc().num_days_from_ce() as u16)
})))
}
SqlType::Nullable(SqlType::UInt32) => Ok(Series::from_data(
col.iter::<Option<u32>>()?.map(|c| c.copied()),
)),
SqlType::Nullable(SqlType::DateTime(_)) => Ok(Series::from_data(
col.iter::<Option<DateTime<_>>>()?
.map(|c| c.map(|v| v.timestamp_micros())),
)),
SqlType::Nullable(SqlType::UInt64) => Ok(Series::from_data(
col.iter::<Option<u64>>()?.map(|c| c.copied()),
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ EOF
cat << EOF > /tmp/databend_test_tsv_names_and_types.txt
insert into a(a,b,c) format TabSeparatedWithNamesAndTypes a b c
'int' 'varchar' 'double'
100 '2' 100.3
200 '3' 200.4
300 '2' 300
100 2 100.3
200 3 200.4
300 2 300
EOF

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ EOF
cat << EOF > /tmp/databend_test_tsv_names_and_types.txt
insert into a(a,b,c) format TabSeparatedWithNamesAndTypes a b c
'int' 'varchar' 'double'
100 '2' 100.3
200 '3' 200.4
300 '2' 300
100 2 100.3
200 3 200.4
300 2 300
EOF

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
0 0 0 0
3 0 0 0 0
9 0 0 2 0
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ echo "CREATE TABLE books

# load csv
curl -H "insert_sql:insert into books format CSV" -F "upload=@${CURDIR}/books.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
echo "select count(), count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT

# load tsv
curl -H "insert_sql:insert into books format TSV" -F "upload=@${CURDIR}/books.tsv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
echo "select count(), count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT

echo "select count_if(title is null), count_if(author is null), count_if(date is null), count_if(publish_time is null) from books " | $MYSQL_CLIENT_CONNECT

echo "drop table books;" | $MYSQL_CLIENT_CONNECT

6 changes: 6 additions & 0 deletions tests/suites/1_stateful/01_load_v2/books.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Transaction Processing Jim Gray 1992 2020-01-01 11:11:11.345
Readings in Database Systems Michael Stonebraker 2004 2020-01-01T11:11:11Z
Three Body "NULL-liucixin 2019 2019-07-04T00:00:00
Three Body NULL-liucixin\rxx \N 2019-07-04T00:00:00
Three Body NULL-liucixin\fxx \N 2019-07-04T00:00:00
Three Body NULL-liucixin\fxx NULL 2019-07-04T00:00:00

0 comments on commit c18ebb5

Please sign in to comment.