Skip to content

Commit

Permalink
fix bug that last line of data lost for stream load when line delimit…
Browse files Browse the repository at this point in the history
…er is more than one character (apache#13066)
  • Loading branch information
weizuo93 authored and FreeOnePlus committed Oct 8, 2022
1 parent 8c0e39c commit bb4c385
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
4 changes: 1 addition & 3 deletions be/src/exec/plain_text_line_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
// for multi bytes delimiter we cannot set offset to avoid incomplete
// delimiter
// read from file reader
if (_line_delimiter_length == 1) {
offset = output_buf_read_remaining();
}
offset = output_buf_read_remaining();
extend_output_buf();
if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) {
// we still have data in input which is not decompressed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1|aaweizuo2|bbweizuo3|cc
42 changes: 42 additions & 0 deletions regression-test/suites/load_p0/stream_load/test_stream_load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,46 @@ suite("test_stream_load", "p0") {
assertEquals(1, json.NumberFilteredRows)
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`id` int(11) NULL,
`value` varchar(64) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
"""

streamLoad {
table "${tableName}"

set 'line_delimiter', 'weizuo'
set 'column_separator', '|'
set 'columns', 'id, value'

file 'test_line_delimiter.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(3, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}

rowCount = sql "select count(1) from ${tableName}"
assertEquals(3, rowCount[0][0])
}

0 comments on commit bb4c385

Please sign in to comment.