Skip to content

Commit

Permalink
feat(format): support use \\t as delimiter in csv format
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed May 8, 2022
1 parent 4f9a2e9 commit 914658e
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 30 deletions.
7 changes: 1 addition & 6 deletions common/datavalues/src/types/deserializations/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,10 @@ where
fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
settings: &FormatSettings,
_settings: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;

if maybe_quote && reader.ignore(|f| f == b'\'' || f == b'"')? && settings.empty_as_default {
self.de_default(settings);
return Ok(());
}

let v: T = if !T::FLOATING {
reader.read_int_text()
} else {
Expand Down
138 changes: 127 additions & 11 deletions common/datavalues/src/types/deserializations/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,136 @@ impl TypeDeserializer for StringDeserializer {
fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
format: &FormatSettings,
settings: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
if reader.ignore_byte(b'"')? {
reader.keep_read(&mut self.buffer, |b| b != b'"')?;
reader.must_ignore_byte(b'"')?;
} else if format.field_delimiter.is_empty() {
reader.keep_read(&mut self.buffer, |b| b != b',')?;
} else {
reader.keep_read(&mut self.buffer, |b| b != format.field_delimiter[0])?;
let mut read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
return Err(ErrorCode::BadBytes("Read string after eof."));
}

self.builder.append_value(self.buffer.as_slice());
Ok(())
let maybe_quote = read_buffer[0];
if maybe_quote == b'\'' || maybe_quote == b'"' {
let mut index = 1;
let mut bytes = 0;

loop {
let begin = index;
while index < read_buffer.len() {
if read_buffer[index] == maybe_quote {
self.builder
.values_mut()
.extend_from_slice(&read_buffer[begin..index]);
self.builder.add_offset(bytes + index - begin);
reader.consume(index + 1);
return Ok(());
}

index += 1;
}

bytes += index - begin;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[begin..]);
reader.consume(index - begin);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break;
}
}

Err(ErrorCode::BadBytes(format!(
"Not found '{}' before eof in parse string.",
maybe_quote as char
)))
} else {
// Unquoted case. Look for field_delimiter or record_delimiter.
let mut field_delimiter = b',';

if !settings.field_delimiter.is_empty() {
field_delimiter = settings.field_delimiter[0];
}

if settings.record_delimiter.is_empty()
|| settings.record_delimiter[0] == b'\r'
|| settings.record_delimiter[0] == b'\n'
{
let mut index = 0;
let mut bytes = 0;

'outer1: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == b'\r'
|| read_buffer[index] == b'\n'
{
break 'outer1;
}
index += 1;
}

bytes += index;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
reader.consume(index);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break 'outer1;
}
}

self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
self.builder.add_offset(bytes + index);
reader.consume(index);
} else {
let record_delimiter = settings.record_delimiter[0];

let mut index = 0;
let mut bytes = 0;

'outer2: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == record_delimiter
{
break 'outer2;
}
index += 1;
}

bytes += index;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
reader.consume(index);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break 'outer2;
}
}

self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
self.builder.add_offset(bytes + index);
reader.consume(index);
}

Ok(())
}
}

fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion common/io/src/buffer/buffer_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub trait BufferReadExt: BufferRead {
}

impl<R> BufferReadExt for R
where R: BufferRead
where R: BufferRead
{
fn ignores(&mut self, f: impl Fn(u8) -> bool) -> Result<usize> {
let mut bytes = 0;
Expand Down
14 changes: 11 additions & 3 deletions common/io/src/buffer/buffer_read_number_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,17 @@ where R: BufferRead
let _ = self.ignores(|f| (b'0'..=b'9').contains(&f))?;
}

FromLexical::from_lexical(buf.as_slice()).map_err_to_code(ErrorCode::BadBytes, || {
format!("Cannot parse value:{:?} to number type", buf)
})
match buf.is_empty() {
true => Ok(T::default()),
false => match FromLexical::from_lexical(buf.as_slice()) {
Ok(value) => Ok(value),
Err(cause) => Err(ErrorCode::BadBytes(format!(
"Cannot parse value:{:?} to number type, cause: {:?}",
String::from_utf8(buf),
cause
))),
},
}
}

fn read_float_text<T: FromLexical>(&mut self) -> Result<T> {
Expand Down
24 changes: 16 additions & 8 deletions query/src/formats/format_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,27 @@ impl InputFormat for CsvInputFormat {
checkpoint_reader.ignore_white_spaces_and_byte(self.field_delimiter)?;

if let Some(delimiter) = &self.row_delimiter {
if !checkpoint_reader.ignore_white_spaces_and_byte(*delimiter)? {
if !checkpoint_reader.ignore_white_spaces_and_byte(*delimiter)?
&& !checkpoint_reader.eof()?
{
return Err(ErrorCode::BadBytes(format!(
"Parse csv error at line {}",
row_index
)));
}
} else if !checkpoint_reader.ignore_white_spaces_and_byte(b'\n')?
& !checkpoint_reader.ignore_white_spaces_and_byte(b'\r')?
{
return Err(ErrorCode::BadBytes(format!(
"Parse csv error at line {}",
row_index
)));
} else {
if (!checkpoint_reader.ignore_white_spaces_and_byte(b'\n')?
& !checkpoint_reader.ignore_white_spaces_and_byte(b'\r')?)
&& !checkpoint_reader.eof()?
{
return Err(ErrorCode::BadBytes(format!(
"Parse csv error at line {}",
row_index
)));
}

// \r\n
checkpoint_reader.ignore_white_spaces_and_byte(b'\n')?;
}
}

Expand Down
19 changes: 18 additions & 1 deletion query/src/servers/http/v1/multipart_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ impl MultipartWorker {
break 'outer;
}
Ok(Some(field)) => {
if let Err(cause) = tx.send(Ok(vec![])).await {
common_tracing::tracing::warn!(
"Multipart channel disconnect. {}",
cause
);

break 'outer;
}

let mut async_reader = field.into_async_read();

'read: loop {
Expand Down Expand Up @@ -269,7 +278,15 @@ impl Processor for SequentialInputFormatSource {
async fn async_process(&mut self) -> Result<()> {
if let State::NeedReceiveData = replace(&mut self.state, State::NeedReceiveData) {
if let Some(receive_res) = self.data_receiver.recv().await {
self.state = State::ReceivedData(receive_res?);
let receive_bytes = receive_res?;

if !receive_bytes.is_empty() {
self.state = State::ReceivedData(receive_bytes);
} else {
self.skipped_header = false;
self.state = State::NeedDeserialize;
}

return Ok(());
}
}
Expand Down

0 comments on commit 914658e

Please sign in to comment.