diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp index 6d9d25841b281c..06d55dccd2a3ef 100644 --- a/be/src/exec/plain_text_line_reader.cpp +++ b/be/src/exec/plain_text_line_reader.cpp @@ -200,9 +200,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e // for multi bytes delimiter we cannot set offset to avoid incomplete // delimiter // read from file reader - if (_line_delimiter_length == 1) { - offset = output_buf_read_remaining(); - } + offset = output_buf_read_remaining(); extend_output_buf(); if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) { // we still have data in input which is not decompressed. diff --git a/regression-test/data/load_p0/stream_load/test_line_delimiter.csv b/regression-test/data/load_p0/stream_load/test_line_delimiter.csv new file mode 100644 index 00000000000000..6a9e628eae5864 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_line_delimiter.csv @@ -0,0 +1 @@ +1|aaweizuo2|bbweizuo3|cc diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index 0ac4e8095b9f8c..0e283974108189 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -88,4 +88,46 @@ suite("test_stream_load", "p0") { assertEquals(1, json.NumberFilteredRows) } } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `value` varchar(64) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + + streamLoad { + table "${tableName}" + + set 'line_delimiter', 'weizuo' + set 'column_separator', '|' + set 'columns', 'id, value' + + file 'test_line_delimiter.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(3, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + + rowCount = sql "select count(1) from ${tableName}" + assertEquals(3, rowCount[0][0]) }