diff --git a/common/datavalues/src/types/deserializations/number.rs b/common/datavalues/src/types/deserializations/number.rs index db341068b99b..95e91b966a76 100644 --- a/common/datavalues/src/types/deserializations/number.rs +++ b/common/datavalues/src/types/deserializations/number.rs @@ -105,15 +105,10 @@ where fn de_text_csv( &mut self, reader: &mut CheckpointReader, - settings: &FormatSettings, + _settings: &FormatSettings, ) -> Result<()> { let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?; - if maybe_quote && reader.ignore(|f| f == b'\'' || f == b'"')? && settings.empty_as_default { - self.de_default(settings); - return Ok(()); - } - let v: T = if !T::FLOATING { reader.read_int_text() } else { diff --git a/common/datavalues/src/types/deserializations/string.rs b/common/datavalues/src/types/deserializations/string.rs index 6685e9c9cc1a..7df6807b17bd 100644 --- a/common/datavalues/src/types/deserializations/string.rs +++ b/common/datavalues/src/types/deserializations/string.rs @@ -109,20 +109,136 @@ impl TypeDeserializer for StringDeserializer { fn de_text_csv( &mut self, reader: &mut CheckpointReader, - format: &FormatSettings, + settings: &FormatSettings, ) -> Result<()> { - self.buffer.clear(); - if reader.ignore_byte(b'"')? { - reader.keep_read(&mut self.buffer, |b| b != b'"')?; - reader.must_ignore_byte(b'"')?; - } else if format.field_delimiter.is_empty() { - reader.keep_read(&mut self.buffer, |b| b != b',')?; - } else { - reader.keep_read(&mut self.buffer, |b| b != format.field_delimiter[0])?; + let mut read_buffer = reader.fill_buf()?; + + if read_buffer.is_empty() { + return Err(ErrorCode::BadBytes("Read string after eof.")); } - self.builder.append_value(self.buffer.as_slice()); - Ok(()) + let maybe_quote = read_buffer[0]; + if maybe_quote == b'\'' || maybe_quote == b'"' { + let mut index = 1; + let mut bytes = 0; + + loop { + let begin = index; + while index < read_buffer.len() { + if read_buffer[index] == maybe_quote { + self.builder + .values_mut() + .extend_from_slice(&read_buffer[begin..index]); + self.builder.add_offset(bytes + index - begin); + reader.consume(index + 1); + return Ok(()); + } + + index += 1; + } + + bytes += index - begin; + self.builder + .values_mut() + .extend_from_slice(&read_buffer[begin..]); + reader.consume(index - begin); + + index = 0; + read_buffer = reader.fill_buf()?; + + if read_buffer.is_empty() { + break; + } + } + + Err(ErrorCode::BadBytes(format!( + "Not found '{}' before eof in parse string.", + maybe_quote as char + ))) + } else { + // Unquoted case. Look for field_delimiter or record_delimiter. + let mut field_delimiter = b','; + + if !settings.field_delimiter.is_empty() { + field_delimiter = settings.field_delimiter[0]; + } + + if settings.record_delimiter.is_empty() + || settings.record_delimiter[0] == b'\r' + || settings.record_delimiter[0] == b'\n' + { + let mut index = 0; + let mut bytes = 0; + + 'outer1: loop { + while index < read_buffer.len() { + if read_buffer[index] == field_delimiter + || read_buffer[index] == b'\r' + || read_buffer[index] == b'\n' + { + break 'outer1; + } + index += 1; + } + + bytes += index; + self.builder + .values_mut() + .extend_from_slice(&read_buffer[..index]); + reader.consume(index); + + index = 0; + read_buffer = reader.fill_buf()?; + + if read_buffer.is_empty() { + break 'outer1; + } + } + + self.builder + .values_mut() + .extend_from_slice(&read_buffer[..index]); + self.builder.add_offset(bytes + index); + reader.consume(index); + } else { + let record_delimiter = settings.record_delimiter[0]; + + let mut index = 0; + let mut bytes = 0; + + 'outer2: loop { + while index < read_buffer.len() { + if read_buffer[index] == field_delimiter + || read_buffer[index] == record_delimiter + { + break 'outer2; + } + index += 1; + } + + bytes += index; + self.builder + .values_mut() + .extend_from_slice(&read_buffer[..index]); + reader.consume(index); + + index = 0; + read_buffer = reader.fill_buf()?; + + if read_buffer.is_empty() { + break 'outer2; + } + } + + self.builder + .values_mut() + .extend_from_slice(&read_buffer[..index]); + self.builder.add_offset(bytes + index); + reader.consume(index); + } + + Ok(()) + } } fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> { diff --git a/common/io/src/buffer/buffer_read_ext.rs b/common/io/src/buffer/buffer_read_ext.rs index 1d840e713f4f..ea19f778e44d 100644 --- a/common/io/src/buffer/buffer_read_ext.rs +++ b/common/io/src/buffer/buffer_read_ext.rs @@ -84,10 +84,9 @@ pub trait BufferReadExt: BufferRead { fn must_ignore_byte(&mut self, b: u8) -> Result<()> { if !self.ignore_byte(b)? { - let buf = self.fill_buf()?; return Err(std::io::Error::new( ErrorKind::InvalidData, - format!("Expected to have char {}, {}", b as char, buf[0] as char), + format!("Expected to have char {}.", b as char), )); } Ok(()) @@ -95,10 +94,9 @@ pub trait BufferReadExt: BufferRead { fn must_ignore_white_spaces_and_byte(&mut self, b: u8) -> Result<()> { if !self.ignore_white_spaces_and_byte(b)? { - let buf = self.fill_buf()?; return Err(std::io::Error::new( ErrorKind::InvalidData, - format!("Expected to have char {}, {}", b as char, buf[0] as char), + format!("Expected to have char {}", b as char), )); } Ok(()) diff --git a/common/io/src/buffer/buffer_read_number_ext.rs b/common/io/src/buffer/buffer_read_number_ext.rs index 0145bed13ce3..45c9c116c1df 100644 --- a/common/io/src/buffer/buffer_read_number_ext.rs +++ b/common/io/src/buffer/buffer_read_number_ext.rs @@ -66,9 +66,17 @@ where R: BufferRead let _ = self.ignores(|f| (b'0'..=b'9').contains(&f))?; } - FromLexical::from_lexical(buf.as_slice()).map_err_to_code(ErrorCode::BadBytes, || { - format!("Cannot parse value:{:?} to number type", buf) - }) + match buf.is_empty() { + true => Ok(T::default()), + false => match FromLexical::from_lexical(buf.as_slice()) { + Ok(value) => Ok(value), + Err(cause) => Err(ErrorCode::BadBytes(format!( + "Cannot parse value:{:?} to number type, cause: {:?}", + String::from_utf8(buf), + cause + ))), + }, + } } fn read_float_text(&mut self) -> Result { diff --git a/query/src/formats/format_csv.rs b/query/src/formats/format_csv.rs index 257de3b4813a..f695e648660e 100644 --- a/query/src/formats/format_csv.rs +++ b/query/src/formats/format_csv.rs @@ -210,19 +210,27 @@ impl InputFormat for CsvInputFormat { checkpoint_reader.ignore_white_spaces_and_byte(self.field_delimiter)?; if let Some(delimiter) = &self.row_delimiter { - if !checkpoint_reader.ignore_white_spaces_and_byte(*delimiter)? { + if !checkpoint_reader.ignore_white_spaces_and_byte(*delimiter)? + && !checkpoint_reader.eof()? + { return Err(ErrorCode::BadBytes(format!( "Parse csv error at line {}", row_index ))); } - } else if !checkpoint_reader.ignore_white_spaces_and_byte(b'\n')? - & !checkpoint_reader.ignore_white_spaces_and_byte(b'\r')? - { - return Err(ErrorCode::BadBytes(format!( - "Parse csv error at line {}", - row_index - ))); + } else { + if (!checkpoint_reader.ignore_white_spaces_and_byte(b'\n')? + & !checkpoint_reader.ignore_white_spaces_and_byte(b'\r')?) + && !checkpoint_reader.eof()? + { + return Err(ErrorCode::BadBytes(format!( + "Parse csv error at line {}", + row_index + ))); + } + + // \r\n + checkpoint_reader.ignore_white_spaces_and_byte(b'\n')?; } } diff --git a/query/src/servers/http/v1/multipart_format.rs b/query/src/servers/http/v1/multipart_format.rs index d27a93502524..2313f05ddd35 100644 --- a/query/src/servers/http/v1/multipart_format.rs +++ b/query/src/servers/http/v1/multipart_format.rs @@ -65,6 +65,15 @@ impl MultipartWorker { break 'outer; } Ok(Some(field)) => { + if let Err(cause) = tx.send(Ok(vec![])).await { + common_tracing::tracing::warn!( + "Multipart channel disconnect. {}", + cause + ); + + break 'outer; + } + let mut async_reader = field.into_async_read(); 'read: loop { @@ -269,7 +278,15 @@ impl Processor for SequentialInputFormatSource { async fn async_process(&mut self) -> Result<()> { if let State::NeedReceiveData = replace(&mut self.state, State::NeedReceiveData) { if let Some(receive_res) = self.data_receiver.recv().await { - self.state = State::ReceivedData(receive_res?); + let receive_bytes = receive_res?; + + if !receive_bytes.is_empty() { + self.state = State::ReceivedData(receive_bytes); + } else { + self.skipped_header = false; + self.state = State::NeedDeserialize; + } + return Ok(()); } }