Skip to content

Commit

Permalink
feat(tianmu): support in-completed column data during load data. (sto…
Browse files Browse the repository at this point in the history
…neatom#1209)

[summary]
1. support few column;
2. support too many column;
3. support the data of column is not completed,such as
3.1 such as start with the enclosed char, finished without enclosed char;
3.2 without line-termination string, but reach to the end of file;
  • Loading branch information
lujiashun authored and mergify[bot] committed Feb 20, 2023
1 parent 6a21184 commit a1d0ad2
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 44 deletions.
45 changes: 45 additions & 0 deletions mysql-test/suite/tianmu/r/issue1209.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
DROP DATABASE IF EXISTS issue1209_test;
CREATE DATABASE issue1209_test;
USE issue1209_test;
CREATE TABLE `t1_tianmu` (
`id` int(11) DEFAULT NULL,
`a` char(20) DEFAULT NULL,
`id2` int(11) DEFAULT NULL
) ENGINE=tianmu DEFAULT CHARSET=utf8mb4;
LOAD DATA LOCAL infile 'MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-1.txt' into table t1_tianmu FIELDS TERMINATED BY ',' LINES TERMINATED BY ';';
Warnings:
Warning 1262 Row 2 was truncated; it contained more data than there were input columns
Warning 1261 Row 3 doesn't contain data for all columns
Warning 1261 Row 4 doesn't contain data for all columns
Warning 1261 Row 4 doesn't contain data for all columns
Warning 1366 Incorrect integer value: '
' for column 'id' at row 5
Warning 1261 Row 5 doesn't contain data for all columns
Warning 1261 Row 5 doesn't contain data for all columns
select * from t1_tianmu;
id a id2
1 chai 6
2 测试 3
3 chayicha NULL
NULL NULL NULL
0 NULL NULL
truncate table t1_tianmu;
LOAD DATA LOCAL infile 'MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-2.txt' into table t1_tianmu FIELDS TERMINATED BY ',' enclosed by '"' LINES TERMINATED BY ';';
Warnings:
Warning 1262 Row 2 was truncated; it contained more data than there were input columns
Warning 1261 Row 3 doesn't contain data for all columns
Warning 1261 Row 4 doesn't contain data for all columns
Warning 1261 Row 4 doesn't contain data for all columns
Warning 1261 Row 5 doesn't contain data for all columns
Warning 1261 Row 5 doesn't contain data for all columns
Warning 1261 Row 6 doesn't contain data for all columns
select * from t1_tianmu;
id a id2
1 chai 7
2 测试 8
3 chayicha NULL
NULL NULL NULL
5 NULL NULL
4 ";
NULL
DROP DATABASE issue1209_test;
2 changes: 1 addition & 1 deletion mysql-test/suite/tianmu/r/issue1263.result
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ CREATIONDATE=NULLif(@CREATIONDATE,''),
MODIFIEDDATE=NULLif(@MODIFIEDDATE,''),
ISACTIVE=NULLif(@ISACTIVE,'')
;
ERROR HY000: Incorrect integer value: ' line 49' for column 'OWNERID' at row 1
ERROR 01000: Row 1 doesn't contain data for all columns
select * from AD_PINSTANCE_LOG_DOUBLE_ENCLOSED;
ID AD_CLIENT_ID AD_ORG_ID AD_PINSTANCE_ID P_DATE P_MSG OWNERID MODIFIERID CREATIONDATE MODIFIEDDATE ISACTIVE
765893 37 27 44221 2021-06-28 02:01:16 计算售罄率出错:ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMITORA-06512: at "BOSNDS3.O2O_STORESALERATE_SET", line 49
Expand Down
1 change: 1 addition & 0 deletions mysql-test/suite/tianmu/std_data/issue1209-1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,chai,6;2,测试,3,4;3,chayicha;;
1 change: 1 addition & 0 deletions mysql-test/suite/tianmu/std_data/issue1209-2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,"chai",7;2,"测试",8,9;3,"chayicha";;"5";"4",";
27 changes: 27 additions & 0 deletions mysql-test/suite/tianmu/t/issue1209.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
--source include/have_tianmu.inc

--disable_warnings
DROP DATABASE IF EXISTS issue1209_test;
--enable_warnings

CREATE DATABASE issue1209_test;

USE issue1209_test;

CREATE TABLE `t1_tianmu` (
`id` int(11) DEFAULT NULL,
`a` char(20) DEFAULT NULL,
`id2` int(11) DEFAULT NULL
) ENGINE=tianmu DEFAULT CHARSET=utf8mb4;

--replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR
eval LOAD DATA LOCAL infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-1.txt' into table t1_tianmu FIELDS TERMINATED BY ',' LINES TERMINATED BY ';';
select * from t1_tianmu;

truncate table t1_tianmu;

--replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR
eval LOAD DATA LOCAL infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-2.txt' into table t1_tianmu FIELDS TERMINATED BY ',' enclosed by '"' LINES TERMINATED BY ';';
select * from t1_tianmu;

DROP DATABASE issue1209_test;
2 changes: 1 addition & 1 deletion mysql-test/suite/tianmu/t/issue1263.test
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ ISACTIVE=NULLif(@ISACTIVE,'')
select * from AD_PINSTANCE_LOG_DOUBLE_ENCLOSED;

--replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR
--error 1366
--error 1261
eval load data infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1263-3.txt' into table AD_PINSTANCE_LOG_DOUBLE_ENCLOSED
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
Expand Down
17 changes: 12 additions & 5 deletions storage/tianmu/loader/load_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ bool LoadParser::MakeRow(std::vector<ValueCache> &value_buffers) {
int errorinfo;

bool cont = true;
bool eof = false;

while (cont) {
switch (strategy_->GetOneRow(cur_ptr_, buf_end_ - cur_ptr_, value_buffers, rowsize, errorinfo)) {
switch (strategy_->GetOneRow(cur_ptr_, buf_end_ - cur_ptr_, value_buffers, rowsize, errorinfo, eof)) {
case ParsingStrategy::ParseResult::EOB:
if (mysql_bin_log.is_open())
binlog_loaded_block(read_buffer_.Buf(), cur_ptr_);
Expand All @@ -98,10 +100,15 @@ bool LoadParser::MakeRow(std::vector<ValueCache> &value_buffers) {
buf_end_ = cur_ptr_ + read_buffer_.BufSize();
} else {
// reaching the end of the buffer
if (cur_ptr_ != buf_end_)
rejecter_.ConsumeBadRow(cur_ptr_, buf_end_ - cur_ptr_, cur_row_ + 1, errorinfo == -1 ? -1 : errorinfo + 1);
cur_row_++;
cont = false;
if (cur_ptr_ != buf_end_) {
// rejecter_.ConsumeBadRow(cur_ptr_, buf_end_ - cur_ptr_, cur_row_ + 1, errorinfo == -1 ? -1 : errorinfo +
// 1);
// do not cousume the row, take this as the normal line
eof = true;
} else {
cur_row_++;
cont = false;
}
}
break;

Expand Down
98 changes: 64 additions & 34 deletions storage/tianmu/loader/parsing_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ inline bool TailsMatch(const char *s1, const char *s2, const size_t size) {
inline ParsingStrategy::SearchResult ParsingStrategy::SearchUnescapedPatternNoEOL(const char *&ptr,
const char *const buf_end,
const std::string &pattern,
const std::string &line_termination,
const std::vector<int> &kmp_next) {
const char *c_pattern = pattern.c_str();
size_t size = pattern.size();
const char *c_eol = terminator_.c_str();
size_t crlf = terminator_.size();
const char *c_eol = line_termination.c_str();
size_t crlf = line_termination.size();
const char *search_end = buf_end;

if (size == 1) {
Expand Down Expand Up @@ -315,12 +316,12 @@ class Field_tmp_nullability_guard {

void ParsingStrategy::ReadField(const char *&ptr, const char *&val_beg, Item *&item, uint &index_of_field,
std::vector<std::pair<const char *, size_t>> &vec_ptr_field,
uint &field_index_in_field_list, const CHARSET_INFO *char_info) {
uint &field_index_in_field_list, const CHARSET_INFO *char_info, bool completed_row) {
bool is_enclosed = false;
char *val_start{nullptr};
size_t val_len{0};

if (string_qualifier_ && *val_beg == string_qualifier_) {
if (string_qualifier_ && *val_beg == string_qualifier_ && completed_row) {
// first char is enclose char, skip it
val_start = const_cast<char *>(val_beg) + 1;
// skip the first and the last char which is encolose char
Expand Down Expand Up @@ -407,8 +408,8 @@ void ParsingStrategy::ReadField(const char *&ptr, const char *&val_beg, Item *&i
}

ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, size_t size,
std::vector<ValueCache> &record, uint &rowsize,
int &errorinfo) {
std::vector<ValueCache> &record, uint &rowsize, int &errorinfo,
bool eof) {
const char *buf_end = buf + size;
if (!prepared_) {
GetEOL(buf, buf_end);
Expand Down Expand Up @@ -464,58 +465,87 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s
uint index_of_field{0};

// step2, fill the field list with data file content;

// three condition for matching one field:
// (1) enclosed-char(optional) + content + enclosed-char(optional) + delimiter-str, return PATTERN_FOUND
// (2) enclosed-char(optional) + content + enclosed-char(optional) + termination_str, return END_OF_LINE
// (3) enclosed-char(optional) + content +(without enclosed-char(optional) + delimiter-str/termination_str), return
// END_OF_BUFFER
bool enclosed_column = false;

while ((item = it++)) {
index++;

if (index == fields_vars.elements)
break;
const char *val_beg = ptr;
if (string_qualifier_ && *ptr == string_qualifier_) {
row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_delimiter_, kmp_next_enclose_delimiter_);

enclosed_column = false;
if (string_qualifier_ && *ptr == string_qualifier_)
enclosed_column = true;
const std::string &delimitor = enclosed_column ? enclose_delimiter_ : delimiter_;
const std::string &line_termination = enclosed_column ? enclose_terminator_ : terminator_;
const std::vector<int> &kmp_local = enclosed_column ? kmp_next_enclose_delimiter_ : kmp_next_delimiter_;

if (enclosed_column)
++ptr;
} else {
SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, delimiter_, kmp_next_delimiter_);
if (res == SearchResult::END_OF_LINE) {
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info);
continue;
}
row_incomplete = (res == SearchResult::END_OF_BUFFER);

SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, delimitor, line_termination, kmp_local);
row_incomplete = (res == SearchResult::END_OF_BUFFER);

if (row_incomplete && eof) {
// field is incompleted,and reach to the end of buffer, take the incompeted data as the field content;
ReadField(buf_end, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info, false);
ptr = buf_end;
row_incomplete = false;
++item;
break;
}

if (row_incomplete) {
errorinfo = index;
goto end;
}

if (enclosed_column)
++ptr;
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info);

ptr += delimiter_.size();
}
if (res == SearchResult::PATTERN_FOUND) {
ptr += delimiter_.size();
}

if (!row_incomplete) {
// the last column
index++;
const char *val_beg = ptr;
if (string_qualifier_ && *ptr == string_qualifier_) {
row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_terminator_, kmp_next_enclose_terminator_);
++ptr;
} else
row_incomplete = !SearchUnescapedPattern(ptr, buf_end, terminator_, kmp_next_terminator_);
if (res == SearchResult::END_OF_LINE) {
++item;
break;
}
}

if (!row_incomplete) {
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info);
} else {
errorinfo = index;
goto end;
if (item) {
while ((item = it++)) {
// field is few, warn if occurs;
ReadField(ptr, ptr, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info, false);
push_warning_printf(thd_, Sql_condition::SL_WARNING, ER_WARN_TOO_FEW_RECORDS, ER(ER_WARN_TOO_FEW_RECORDS),
thd_->get_stmt_da()->current_row_for_condition());
}
}

ptr += terminator_.size();
if (!row_incomplete && !eof) {
const char *orig_ptr = ptr;
SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, terminator_, terminator_, kmp_next_terminator_);
// check too many records, warn if occurs
if (res != SearchResult::END_OF_BUFFER) {
if (orig_ptr != ptr) {
push_warning_printf(thd_, Sql_condition::SL_WARNING, ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
thd_->get_stmt_da()->current_row_for_condition());
}
ptr += terminator_.size();
}
}

if (thd_->killed) {
row_data_error = true;
goto end;
}

it.rewind();
while ((item = it++)) {
Item *real_item = item->real_item();
Expand Down
6 changes: 3 additions & 3 deletions storage/tianmu/loader/parsing_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class ParsingStrategy final {
ParsingStrategy(const system::IOParameters &iop, std::vector<uchar> columns_collations);
~ParsingStrategy() {}
ParseResult GetOneRow(const char *const buf, size_t size, std::vector<ValueCache> &values, uint &rowsize,
int &errorinfo);
int &errorinfo, bool eof = false);
void ReadField(const char *&ptr, const char *&val_beg, Item *&item, uint &index_of_field,
std::vector<std::pair<const char *, size_t>> &vec_ptr_field, uint &field_index_in_field_list,
const CHARSET_INFO *char_info);
const CHARSET_INFO *char_info, bool completed_row = true);
void SetTHD(THD *thd) { thd_ = thd; }
THD *GetTHD() const { return thd_; };

Expand Down Expand Up @@ -79,7 +79,7 @@ class ParsingStrategy final {
const std::vector<int> &kmp_next);
enum class SearchResult { PATTERN_FOUND, END_OF_BUFFER, END_OF_LINE };
SearchResult SearchUnescapedPatternNoEOL(const char *&ptr, const char *const buf_end, const std::string &pattern,
const std::vector<int> &kmp_next);
const std::string &line_termination, const std::vector<int> &kmp_next);

void GetEOL(const char *const buf, const char *const buf_end);
void GetValue(const char *const value_ptr, size_t value_size, ushort col, ValueCache &value);
Expand Down

0 comments on commit a1d0ad2

Please sign in to comment.