From 8086fbce2dee6ee4f768f17e13739542690a4d2e Mon Sep 17 00:00:00 2001 From: lihongjian Date: Wed, 2 Nov 2022 17:45:40 +0800 Subject: [PATCH] feat(tianmu): Improve the readbility of stonedb. (#11) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [summary] 1 class member variable refactor:exproter directory; 2 class member variable refactor:loader directory; --- storage/tianmu/exporter/data_exporter.cpp | 24 +-- storage/tianmu/exporter/data_exporter.h | 28 ++-- storage/tianmu/exporter/data_exporter_txt.cpp | 84 +++++----- storage/tianmu/exporter/data_exporter_txt.h | 22 +-- storage/tianmu/exporter/export2file.cpp | 6 +- storage/tianmu/exporter/export2file.h | 6 +- storage/tianmu/loader/load_parser.cpp | 106 ++++++------- storage/tianmu/loader/load_parser.h | 38 ++--- storage/tianmu/loader/parsing_strategy.cpp | 146 +++++++++--------- storage/tianmu/loader/parsing_strategy.h | 40 ++--- 10 files changed, 251 insertions(+), 249 deletions(-) diff --git a/storage/tianmu/exporter/data_exporter.cpp b/storage/tianmu/exporter/data_exporter.cpp index 400cca9614..3704dc0776 100644 --- a/storage/tianmu/exporter/data_exporter.cpp +++ b/storage/tianmu/exporter/data_exporter.cpp @@ -27,23 +27,23 @@ namespace exporter { void DataExporter::Init(std::shared_ptr buffer, std::vector source_deas, fields_t const &fields, std::vector &result_deas) { - _fields = fields; - buf = buffer; + fields_ = fields; + data_exporter_buf_ = buffer; - this->source_deas = source_deas; - this->deas = result_deas; - this->no_attrs = int(deas.size()); + this->source_attr_infos_ = source_deas; + this->attr_infos_ = result_deas; + this->no_attrs_ = int(attr_infos_.size()); - for (size_t i = 0; i < deas.size(); ++i) { + for (size_t i = 0; i < attr_infos_.size(); ++i) { common::CT f_at = ha_rcengine_->GetCorrespondingType(fields[i]); - if (core::ATI::IsStringType(deas[i].Type()) && !core::ATI::IsStringType(f_at)) - this->deas[i] = core::AttributeTypeInfo(f_at, deas[i].NotNull()); + if (core::ATI::IsStringType(attr_infos_[i].Type()) && !core::ATI::IsStringType(f_at)) + this->attr_infos_[i] = core::AttributeTypeInfo(f_at, attr_infos_[i].NotNull()); } - cur_attr = 0; - row = NULL; - row_ptr = NULL; - nulls_indicator = 0; + cur_attr_ = 0; + row_ = NULL; + row_ptr_ = NULL; + nulls_indicator_ = 0; } DataExporter::~DataExporter() {} diff --git a/storage/tianmu/exporter/data_exporter.h b/storage/tianmu/exporter/data_exporter.h index 23dd514bea..dd39ea3b04 100644 --- a/storage/tianmu/exporter/data_exporter.h +++ b/storage/tianmu/exporter/data_exporter.h @@ -29,13 +29,13 @@ namespace exporter { class DataExporter { public: - DataExporter() : progressout(NULL), row(NULL), row_ptr(NULL), nulls_indicator(NULL) {} + DataExporter() : progress_out_(NULL), row_(NULL), row_ptr_(NULL), nulls_indicator_(NULL) {} virtual void Init(std::shared_ptr buffer, std::vector source_deas, fields_t const &fields, std::vector &result_deas); virtual ~DataExporter(); void FlushBuffer(); - void SetProgressOut(system::ChannelOut *po) { progressout = po; } + void SetProgressOut(system::ChannelOut *po) { progress_out_ = po; } void ShowProgress(int no_eq); virtual void PutNull() = 0; @@ -46,22 +46,22 @@ class DataExporter { virtual void PutRowEnd() = 0; protected: - int cur_attr; - std::vector deas; - std::vector source_deas; - fields_t _fields; + int cur_attr_ = 0; + std::vector attr_infos_; + std::vector source_attr_infos_; + fields_t fields_; - std::shared_ptr buf; - system::ChannelOut *progressout; + std::shared_ptr data_exporter_buf_; + system::ChannelOut *progress_out_; - int no_attrs; + int no_attrs_ = 0; // the fields below should be moved to RCDEforBinIndicator - char *row; - char *row_ptr; - int max_row_size; - int nulls_indicator_len; - char *nulls_indicator; + char *row_; + char *row_ptr_; + int max_row_size_ = 0; + int nulls_indicator_len_ = 0; + char *nulls_indicator_; }; } // namespace exporter diff --git a/storage/tianmu/exporter/data_exporter_txt.cpp b/storage/tianmu/exporter/data_exporter_txt.cpp index 51bef9a168..bd204fab02 100644 --- a/storage/tianmu/exporter/data_exporter_txt.cpp +++ b/storage/tianmu/exporter/data_exporter_txt.cpp @@ -23,20 +23,20 @@ namespace Tianmu { namespace exporter { DEforTxt::DEforTxt(const system::IOParameters &iop) - : delim(iop.Delimiter()[0]), - str_q(iop.StringQualifier()), - esc(iop.EscapeCharacter()), - line_terminator(iop.LineTerminator()[0]), - nulls_str(iop.NullsStr()), - destination_cs(iop.CharsetInfoNumber() ? get_charset(iop.CharsetInfoNumber(), 0) : 0) {} + : delimiter_(iop.Delimiter()[0]), + str_qualifier_(iop.StringQualifier()), + escape_character_(iop.EscapeCharacter()), + line_terminator_(iop.LineTerminator()[0]), + nulls_str_(iop.NullsStr()), + destination_charset_(iop.CharsetInfoNumber() ? get_charset(iop.CharsetInfoNumber(), 0) : 0) {} void DEforTxt::PutText(const types::BString &str) { WriteStringQualifier(); - size_t char_len = - deas[cur_attr].GetCollation().collation->cset->numchars(deas[cur_attr].GetCollation().collation, str.val_, - str.val_ + str.len_); // len in chars - WriteString(str, str.len_); // len in bytes - if ((deas[cur_attr].Type() == common::CT::STRING) && (char_len < deas[cur_attr].CharLen())) + size_t char_len = attr_infos_[cur_attr_].GetCollation().collation->cset->numchars( + attr_infos_[cur_attr_].GetCollation().collation, str.val_, + str.val_ + str.len_); // len in chars + WriteString(str, str.len_); // len in bytes + if ((attr_infos_[cur_attr_].Type() == common::CT::STRING) && (char_len < attr_infos_[cur_attr_].CharLen())) // it can be necessary to change the WritePad implementation to something like: // collation->cset->fill(cs, copy->to_ptr+copy->from_length, // copy->to_length-copy->from_length, ' @@ -44,10 +44,10 @@ void DEforTxt::PutText(const types::BString &str) { // if ' ' (space) can have different codes. // for export data not fill space #if 0 - if (!destination_cs) { //export as binary - WritePad(deas[cur_attr].Precision() - bytes_written); + if (!destination_cs_) { //export as binary + WritePad(attr_infos_[cur_attr_].Precision() - bytes_written); } else { - WritePad(deas[cur_attr].CharLen() - char_len); + WritePad(attr_infos_[cur_attr_].CharLen() - char_len); } #endif WriteStringQualifier(); @@ -56,9 +56,9 @@ void DEforTxt::PutText(const types::BString &str) { void DEforTxt::PutBin(const types::BString &str) { int len = str.size(); - // if((rcdea[cur_attr].attrt == common::CT::BYTE) && (len < - // rcdea[cur_attr].size)) - // len = rcdea[cur_attr].size; + // if((rcdea[cur_attr_].attrt == common::CT::BYTE) && (len < + // rcdea[cur_attr_].size)) + // len = rcdea[cur_attr_].size; if (len > 0) { char *hex = new char[len * 2]; system::Convert2Hex((const unsigned char *)str.val_, len, hex, len * 2, false); @@ -69,58 +69,60 @@ void DEforTxt::PutBin(const types::BString &str) { } void DEforTxt::PutNumeric(int64_t num) { - types::RCNum rcn(num, source_deas[cur_attr].Scale(), core::ATI::IsRealType(source_deas[cur_attr].Type()), - source_deas[cur_attr].Type()); + types::RCNum rcn(num, source_attr_infos_[cur_attr_].Scale(), + core::ATI::IsRealType(source_attr_infos_[cur_attr_].Type()), source_attr_infos_[cur_attr_].Type()); types::BString rcs = rcn.ToBString(); WriteString(rcs); WriteValueEnd(); } void DEforTxt::PutDateTime(int64_t dt) { - types::RCDateTime rcdt(dt, deas[cur_attr].Type()); + types::RCDateTime rcdt(dt, attr_infos_[cur_attr_].Type()); types::BString rcs = rcdt.ToBString(); WriteString(rcs); WriteValueEnd(); } void DEforTxt::PutRowEnd() { - if (line_terminator == 0) + if (line_terminator_ == 0) WriteString("\r\n", 2); else - buf->WriteIfNonzero(line_terminator); + data_exporter_buf_->WriteIfNonzero(line_terminator_); } size_t DEforTxt::WriteString(const types::BString &str, int len) { int res_len = 0; - if (esc) { - escaped.erase(); + if (escape_character_) { + escaped_.erase(); for (size_t i = 0; i < str.size(); i++) { - if (str[i] == str_q || (!str_q && str[i] == delim)) - escaped.append(1, esc); - escaped.append(1, str[i]); + if (str[i] == str_qualifier_ || (!str_qualifier_ && str[i] == delimiter_)) + escaped_.append(1, escape_character_); + escaped_.append(1, str[i]); } - if (destination_cs) { - int max_res_len = - std::max(destination_cs->mbmaxlen * len + len, deas[cur_attr].GetCollation().collation->mbmaxlen * len + len); + if (destination_charset_) { + int max_res_len = std::max(destination_charset_->mbmaxlen * len + len, + attr_infos_[cur_attr_].GetCollation().collation->mbmaxlen * len + len); uint errors = 0; - res_len = copy_and_convert(buf->BufAppend(max_res_len), max_res_len, destination_cs, escaped.c_str(), - uint32_t(escaped.length()), deas[cur_attr].GetCollation().collation, &errors); - buf->SeekBack(max_res_len - res_len); + res_len = copy_and_convert(data_exporter_buf_->BufAppend(max_res_len), max_res_len, destination_charset_, + escaped_.c_str(), uint32_t(escaped_.length()), + attr_infos_[cur_attr_].GetCollation().collation, &errors); + data_exporter_buf_->SeekBack(max_res_len - res_len); } else { - std::strncpy(buf->BufAppend(uint(escaped.length())), escaped.c_str(), escaped.length()); + std::strncpy(data_exporter_buf_->BufAppend(uint(escaped_.length())), escaped_.c_str(), escaped_.length()); res_len = len; } } else { - if (destination_cs) { - int max_res_len = - std::max(destination_cs->mbmaxlen * len, deas[cur_attr].GetCollation().collation->mbmaxlen * len); + if (destination_charset_) { + int max_res_len = std::max(destination_charset_->mbmaxlen * len, + attr_infos_[cur_attr_].GetCollation().collation->mbmaxlen * len); uint errors = 0; - res_len = copy_and_convert(buf->BufAppend(max_res_len), max_res_len, destination_cs, str.GetDataBytesPointer(), - len, deas[cur_attr].GetCollation().collation, &errors); - buf->SeekBack(max_res_len - res_len); + res_len = + copy_and_convert(data_exporter_buf_->BufAppend(max_res_len), max_res_len, destination_charset_, + str.GetDataBytesPointer(), len, attr_infos_[cur_attr_].GetCollation().collation, &errors); + data_exporter_buf_->SeekBack(max_res_len - res_len); } else { - str.CopyTo(buf->BufAppend((uint)len), len); + str.CopyTo(data_exporter_buf_->BufAppend((uint)len), len); res_len = len; } } diff --git a/storage/tianmu/exporter/data_exporter_txt.h b/storage/tianmu/exporter/data_exporter_txt.h index 390a6c671a..f46f37dd87 100644 --- a/storage/tianmu/exporter/data_exporter_txt.h +++ b/storage/tianmu/exporter/data_exporter_txt.h @@ -43,27 +43,27 @@ class DEforTxt : public DataExporter { void PutRowEnd() override; protected: - void WriteStringQualifier() { /*buf->WriteIfNonzero(str_q);*/ + void WriteStringQualifier() { /*data_exporter_buf_->WriteIfNonzero(str_q);*/ } - void WriteDelimiter() { buf->WriteIfNonzero(delim); } - void WriteNull() { WriteString(nulls_str.c_str(), (int)nulls_str.length()); } + void WriteDelimiter() { data_exporter_buf_->WriteIfNonzero(delimiter_); } + void WriteNull() { WriteString(nulls_str_.c_str(), (int)nulls_str_.length()); } void WriteString(const types::BString &str) { WriteString(str, str.size()); } size_t WriteString(const types::BString &str, int len); - void WriteChar(char c, uint repeat = 1) { std::memset(buf->BufAppend(repeat), c, repeat); } + void WriteChar(char c, uint repeat = 1) { std::memset(data_exporter_buf_->BufAppend(repeat), c, repeat); } void WritePad(uint repeat) { WriteChar(' ', repeat); } void WriteValueEnd() { - if (cur_attr == no_attrs - 1) - cur_attr = 0; + if (cur_attr_ == no_attrs_ - 1) + cur_attr_ = 0; else { WriteDelimiter(); - cur_attr++; + cur_attr_++; } } - uchar delim, str_q, esc; - uchar line_terminator; - std::string nulls_str, escaped; - CHARSET_INFO *destination_cs; + uchar delimiter_, str_qualifier_, escape_character_; + uchar line_terminator_; + std::string nulls_str_, escaped_; + CHARSET_INFO *destination_charset_; }; } // namespace exporter diff --git a/storage/tianmu/exporter/export2file.cpp b/storage/tianmu/exporter/export2file.cpp index 7cea3958aa..6bc585ced9 100644 --- a/storage/tianmu/exporter/export2file.cpp +++ b/storage/tianmu/exporter/export2file.cpp @@ -24,7 +24,7 @@ namespace Tianmu { namespace exporter { select_tianmu_export::select_tianmu_export(Query_result_export *se) - : Query_result_export(se->get_sql_exchange()), se(se), prepared(false) {} + : Query_result_export(se->get_sql_exchange()), select_export_(se), prepared_(false) {} int select_tianmu_export::prepare(List &list, SELECT_LEX_UNIT *u) { bool blob_flag = 0; @@ -53,7 +53,7 @@ int select_tianmu_export::prepare(List &list, SELECT_LEX_UNIT *u) { exchange->field.opt_enclosed = 1; // A little quicker loop fixed_row_size = (!field_term_length && !exchange->field.enclosed->length() && !blob_flag); - prepared = true; + prepared_ = true; return 0; } @@ -63,7 +63,7 @@ void select_tianmu_export::SendOk(THD *thd) { ::my_ok(thd, row_count); } sql_exchange *select_tianmu_export::SqlExchange() { return exchange; } -bool select_tianmu_export::send_data(List &items) { return se->send_data(items); } +bool select_tianmu_export::send_data(List &items) { return select_export_->send_data(items); } } // namespace exporter } // namespace Tianmu diff --git a/storage/tianmu/exporter/export2file.h b/storage/tianmu/exporter/export2file.h index 4035bc1524..035dc9b91d 100644 --- a/storage/tianmu/exporter/export2file.h +++ b/storage/tianmu/exporter/export2file.h @@ -33,12 +33,12 @@ class select_tianmu_export : public Query_result_export { void SetRowCount(ha_rows x); void SendOk(THD *thd); sql_exchange *SqlExchange(); - bool IsPrepared() const { return prepared; }; + bool IsPrepared() const { return prepared_; }; bool send_data(List &items) override; private: - Query_result_export *se; - bool prepared; + Query_result_export *select_export_; + bool prepared_; }; } // namespace exporter diff --git a/storage/tianmu/loader/load_parser.cpp b/storage/tianmu/loader/load_parser.cpp index 424df0d93e..0edda138e7 100644 --- a/storage/tianmu/loader/load_parser.cpp +++ b/storage/tianmu/loader/load_parser.cpp @@ -29,44 +29,44 @@ namespace Tianmu { namespace loader { LoadParser::LoadParser(RCAttrPtrVect_t &attrs, const system::IOParameters &iop, uint packsize, std::unique_ptr &f) - : attrs(attrs), - start_time(types::RCDateTime::GetCurrent().GetInt64()), - ioparam(iop), - pack_size(packsize), - rejecter(packsize, iop.GetRejectFile(), iop.GetAbortOnCount(), iop.GetAbortOnThreshold()), - no_obj(attrs[0]->NumOfObj()) { + : attrs_(attrs), + start_time_(types::RCDateTime::GetCurrent().GetInt64()), + io_param_(iop), + pack_size_(packsize), + rejecter_(packsize, iop.GetRejectFile(), iop.GetAbortOnCount(), iop.GetAbortOnThreshold()), + num_of_obj_(attrs[0]->NumOfObj()) { std::vector columns_collations; for (auto &it : attrs) columns_collations.push_back(it->GetCollation().collation->number); - strategy = std::make_shared(iop, columns_collations); + strategy_ = std::make_shared(iop, columns_collations); utils::Timer timer; - if (!read_buffer.BufOpen(f)) + if (!read_buffer_.BufOpen(f)) throw common::NetStreamException("Unable to open file " + std::string(iop.Path())); - if (read_buffer.BufSize() == 0) + if (read_buffer_.BufSize() == 0) throw common::NetStreamException("File looks to be empty: " + std::string(iop.Path())); - cur_ptr = read_buffer.Buf(); - buf_end = cur_ptr + read_buffer.BufSize(); + cur_ptr_ = read_buffer_.Buf(); + buf_end_ = cur_ptr_ + read_buffer_.BufSize(); timer.Print(__PRETTY_FUNCTION__); - tab_index = ha_rcengine_->GetTableIndex("./" + ioparam.TableName()); + tab_index_ = ha_rcengine_->GetTableIndex("./" + io_param_.TableName()); } uint LoadParser::GetPackrow(uint no_of_rows, std::vector &value_buffers) { - value_buffers.reserve(attrs.size()); - for (uint att = 0; att < attrs.size(); att++) { + value_buffers.reserve(attrs_.size()); + for (uint att = 0; att < attrs_.size(); att++) { int64_t init_capacity; - if (last_pack_size.size() > att) - init_capacity = static_cast(last_pack_size[att] * 1.1) + 128; + if (last_pack_size_.size() > att) + init_capacity = static_cast(last_pack_size_[att] * 1.1) + 128; else { auto max_value_size = sizeof(int64_t); - if (core::ATI::IsStringType(attrs[att]->TypeName()) && attrs[att]->Type().GetPrecision() < max_value_size) - max_value_size = attrs[att]->Type().GetPrecision(); - init_capacity = pack_size * max_value_size + 512; + if (core::ATI::IsStringType(attrs_[att]->TypeName()) && attrs_[att]->Type().GetPrecision() < max_value_size) + max_value_size = attrs_[att]->Type().GetPrecision(); + init_capacity = pack_size_ * max_value_size + 512; } - value_buffers.emplace_back(pack_size, init_capacity); + value_buffers.emplace_back(pack_size_, init_capacity); } uint no_of_rows_returned; @@ -75,9 +75,9 @@ uint LoadParser::GetPackrow(uint no_of_rows, std::vector &value_buff break; } - last_pack_size.clear(); + last_pack_size_.clear(); for (auto &it : value_buffers) { - last_pack_size.push_back(it.SumarizedSize()); + last_pack_size_.push_back(it.SumarizedSize()); } return no_of_rows_returned; @@ -90,46 +90,46 @@ bool LoadParser::MakeRow(std::vector &value_buffers) { bool cont = true; while (cont) { bool make_value_ok; - switch (strategy->GetOneRow(cur_ptr, buf_end - cur_ptr, value_buffers, rowsize, errorinfo)) { + switch (strategy_->GetOneRow(cur_ptr_, buf_end_ - cur_ptr_, value_buffers, rowsize, errorinfo)) { case ParsingStrategy::ParseResult::EOB: if (mysql_bin_log.is_open()) - binlog_loaded_block(read_buffer.Buf(), cur_ptr); - if (read_buffer.BufFetch(int(buf_end - cur_ptr))) { - cur_ptr = read_buffer.Buf(); - buf_end = cur_ptr + read_buffer.BufSize(); + binlog_loaded_block(read_buffer_.Buf(), cur_ptr_); + if (read_buffer_.BufFetch(int(buf_end_ - cur_ptr_))) { + cur_ptr_ = read_buffer_.Buf(); + buf_end_ = cur_ptr_ + read_buffer_.BufSize(); } else { // reaching the end of the buffer - if (cur_ptr != buf_end) - rejecter.ConsumeBadRow(cur_ptr, buf_end - cur_ptr, cur_row + 1, errorinfo == -1 ? -1 : errorinfo + 1); - cur_row++; + if (cur_ptr_ != buf_end_) + rejecter_.ConsumeBadRow(cur_ptr_, buf_end_ - cur_ptr_, cur_row_ + 1, errorinfo == -1 ? -1 : errorinfo + 1); + cur_row_++; cont = false; } break; case ParsingStrategy::ParseResult::ERROR: - rejecter.ConsumeBadRow(cur_ptr, rowsize, cur_row + 1, errorinfo + 1); - cur_ptr += rowsize; - cur_row++; + rejecter_.ConsumeBadRow(cur_ptr_, rowsize, cur_row_ + 1, errorinfo + 1); + cur_ptr_ += rowsize; + cur_row_++; break; case ParsingStrategy::ParseResult::OK: make_value_ok = true; - for (uint att = 0; make_value_ok && att < attrs.size(); ++att) + for (uint att = 0; make_value_ok && att < attrs_.size(); ++att) if (!MakeValue(att, value_buffers[att])) { - rejecter.ConsumeBadRow(cur_ptr, rowsize, cur_row + 1, att + 1); + rejecter_.ConsumeBadRow(cur_ptr_, rowsize, cur_row_ + 1, att + 1); make_value_ok = false; } - cur_ptr += rowsize; - cur_row++; + cur_ptr_ += rowsize; + cur_row_++; if (make_value_ok) { - for (uint att = 0; att < attrs.size(); ++att) value_buffers[att].Commit(); + for (uint att = 0; att < attrs_.size(); ++att) value_buffers[att].Commit(); // check key - row_no++; - if (tab_index != nullptr) { - if (HA_ERR_FOUND_DUPP_KEY == ProcessInsertIndex(tab_index, value_buffers, row_no - 1)) { - row_no--; - dup_no++; - for (uint att = 0; att < attrs.size(); ++att) value_buffers[att].Rollback(); + num_of_row_++; + if (tab_index_ != nullptr) { + if (HA_ERR_FOUND_DUPP_KEY == ProcessInsertIndex(tab_index_, value_buffers, num_of_row_ - 1)) { + num_of_row_--; + num_of_dup_++; + for (uint att = 0; att < attrs_.size(); ++att) value_buffers[att].Rollback(); } } return true; @@ -142,25 +142,25 @@ bool LoadParser::MakeRow(std::vector &value_buffers) { } bool LoadParser::MakeValue(uint att, ValueCache &buffer) { - if (attrs[att]->TypeName() == common::CT::TIMESTAMP) { - if (buffer.ExpectedNull() && attrs[att]->Type().NotNull()) { - *reinterpret_cast(buffer.Prepare(sizeof(int64_t))) = start_time; + if (attrs_[att]->TypeName() == common::CT::TIMESTAMP) { + if (buffer.ExpectedNull() && attrs_[att]->Type().NotNull()) { + *reinterpret_cast(buffer.Prepare(sizeof(int64_t))) = start_time_; buffer.ExpectedSize(sizeof(int64_t)); buffer.ExpectedNull(false); } } // validate the value length - if (core::ATI::IsStringType(attrs[att]->TypeName()) && !buffer.ExpectedNull() && - (size_t)buffer.ExpectedSize() > attrs[att]->Type().GetPrecision()) + if (core::ATI::IsStringType(attrs_[att]->TypeName()) && !buffer.ExpectedNull() && + (size_t)buffer.ExpectedSize() > attrs_[att]->Type().GetPrecision()) return false; - if (attrs[att]->Type().IsLookup() && !buffer.ExpectedNull()) { + if (attrs_[att]->Type().IsLookup() && !buffer.ExpectedNull()) { types::BString s(ZERO_LENGTH_STRING, 0); buffer.Prepare(sizeof(int64_t)); s.val_ = static_cast(buffer.PreparedBuffer()); s.len_ = buffer.ExpectedSize(); - *reinterpret_cast(buffer.PreparedBuffer()) = attrs[att]->EncodeValue_T(s, true); + *reinterpret_cast(buffer.PreparedBuffer()) = attrs_[att]->EncodeValue_T(s, true); buffer.ExpectedSize(sizeof(int64_t)); } @@ -177,7 +177,7 @@ int LoadParser::ProcessInsertIndex(std::shared_ptr tab, std fields.emplace_back(vcs[col].GetDataBytesPointer(lastrow - 1), vcs[col].Size(lastrow - 1)); } - if (tab->InsertIndex(current_txn_, fields, no_obj + no_rows) == common::ErrorCode::DUPP_KEY) { + if (tab->InsertIndex(current_txn_, fields, num_of_obj_ + no_rows) == common::ErrorCode::DUPP_KEY) { TIANMU_LOG(LogCtl_Level::INFO, "Load discard this row for duplicate key"); return HA_ERR_FOUND_DUPP_KEY; } @@ -189,7 +189,7 @@ int LoadParser::binlog_loaded_block(const char *buf_start, const char *buf_end) LOAD_FILE_INFO *lf_info = nullptr; uint block_len = 0; - lf_info = static_cast(ioparam.GetLogInfo()); + lf_info = static_cast(io_param_.GetLogInfo()); uchar *buffer = reinterpret_cast(const_cast(buf_start)); uint max_event_size = lf_info->thd->variables.max_allowed_packet; diff --git a/storage/tianmu/loader/load_parser.h b/storage/tianmu/loader/load_parser.h index c9bd8c0d82..359d33d110 100644 --- a/storage/tianmu/loader/load_parser.h +++ b/storage/tianmu/loader/load_parser.h @@ -49,32 +49,32 @@ class LoadParser final { ~LoadParser() = default; uint GetPackrow(uint no_of_rows, std::vector &vcs); - int64_t GetNumOfRejectedRows() const { return rejecter.GetNumOfRejectedRows(); } - bool ThresholdExceeded(int64_t no_rows) const { return rejecter.ThresholdExceeded(no_rows); } + int64_t GetNumOfRejectedRows() const { return rejecter_.GetNumOfRejectedRows(); } + bool ThresholdExceeded(int64_t no_rows) const { return rejecter_.ThresholdExceeded(no_rows); } int ProcessInsertIndex(std::shared_ptr tab, std::vector &vcs, uint no_rows); - int64_t GetNoRow() const { return row_no; } - int64_t GetDuprow() const { return dup_no; } + int64_t GetNoRow() const { return num_of_row_; } + int64_t GetDuprow() const { return num_of_dup_; } private: - RCAttrPtrVect_t &attrs; + RCAttrPtrVect_t &attrs_; - std::vector last_pack_size; - int64_t start_time; + std::vector last_pack_size_; + int64_t start_time_ = 0; - ReadBuffer read_buffer; + ReadBuffer read_buffer_; - std::shared_ptr strategy; - std::shared_ptr tab_index; + std::shared_ptr strategy_; + std::shared_ptr tab_index_; - const char *cur_ptr; - const char *buf_end; - const system::IOParameters &ioparam; - uint pack_size; - Rejecter rejecter; - uint cur_row = 0; - int64_t no_obj = 0; - int64_t row_no = 0; - int64_t dup_no = 0; + const char *cur_ptr_; + const char *buf_end_; + const system::IOParameters &io_param_; + uint pack_size_ = 0; + Rejecter rejecter_; + uint cur_row_ = 0; + int64_t num_of_obj_ = 0; + int64_t num_of_row_ = 0; + int64_t num_of_dup_ = 0; bool MakeRow(std::vector &value_buffers); bool MakeValue(uint col, ValueCache &buffer); diff --git a/storage/tianmu/loader/parsing_strategy.cpp b/storage/tianmu/loader/parsing_strategy.cpp index 4c29c2fb43..c661032b31 100644 --- a/storage/tianmu/loader/parsing_strategy.cpp +++ b/storage/tianmu/loader/parsing_strategy.cpp @@ -41,45 +41,45 @@ static inline void PrepareKMP(ParsingStrategy::kmp_next_t &kmp_next, std::string } ParsingStrategy::ParsingStrategy(const system::IOParameters &iop, std::vector columns_collations) - : atis(iop.ATIs()), - prepared(false), - eol(iop.LineTerminator()), - delimiter(iop.Delimiter()), - string_qualifier(iop.StringQualifier()), - escape_char(iop.EscapeCharacter()), - temp_buf(65536) { - cs_info = get_charset(iop.CharsetInfoNumber(), 0); - for (ushort i = 0; i < atis.size(); ++i) { + : attr_infos_(iop.ATIs()), + prepared_(false), + terminator_(iop.LineTerminator()), + delimiter_(iop.Delimiter()), + string_qualifier_(iop.StringQualifier()), + escape_char_(iop.EscapeCharacter()), + temp_buf_(65536) { + charset_info_ = get_charset(iop.CharsetInfoNumber(), 0); + for (ushort i = 0; i < attr_infos_.size(); ++i) { if (core::ATI::IsStringType(GetATI(i).Type())) { GetATI(i).SetCollation(get_charset(columns_collations[i], 0)); } } - PrepareKMP(kmp_next_delimiter, delimiter); - if (string_qualifier) { - enclose_delimiter = std::string((char *)&string_qualifier, 1) + delimiter; - PrepareKMP(kmp_next_enclose_delimiter, enclose_delimiter); + PrepareKMP(kmp_next_delimiter_, delimiter_); + if (string_qualifier_) { + enclose_delimiter_ = std::string((char *)&string_qualifier_, 1) + delimiter_; + PrepareKMP(kmp_next_enclose_delimiter_, enclose_delimiter_); } std::string tmpName = iop.GetTableName(); if (tmpName.find("/") != std::string::npos) { - tablename = tmpName.substr(tmpName.find("/") + 1); - dbname = tmpName.substr(0, tmpName.find("/")); + tablename_ = tmpName.substr(tmpName.find("/") + 1); + dbname_ = tmpName.substr(0, tmpName.find("/")); } } inline void ParsingStrategy::GuessUnescapedEOL(const char *ptr, const char *const buf_end) { for (; ptr < buf_end; ptr++) { - if (escape_char && *ptr == escape_char) { + if (escape_char_ && *ptr == escape_char_) { if (ptr + 2 < buf_end && ptr[1] == '\r' && ptr[2] == '\n') ptr += 3; else ptr += 2; } else if (*ptr == '\n') { - eol = "\n"; + terminator_ = "\n"; break; } else if (*ptr == '\r' && ptr + 1 < buf_end && ptr[1] == '\n') { - eol = "\r\n"; + terminator_ = "\r\n"; break; } } @@ -87,14 +87,14 @@ inline void ParsingStrategy::GuessUnescapedEOL(const char *ptr, const char *cons inline void ParsingStrategy::GuessUnescapedEOLWithEnclose(const char *ptr, const char *const buf_end) { for (; ptr < buf_end; ptr++) { - if (escape_char && *ptr == escape_char) + if (escape_char_ && *ptr == escape_char_) ptr += 2; - else if (string_qualifier && *ptr == string_qualifier) { + else if (string_qualifier_ && *ptr == string_qualifier_) { if (ptr + 1 < buf_end && ptr[1] == '\n') { - eol = "\n"; + terminator_ = "\n"; break; } else if (ptr + 2 < buf_end && ptr[1] == '\r' && ptr[2] == '\n') { - eol = "\r\n"; + terminator_ = "\r\n"; break; } } @@ -108,7 +108,7 @@ inline bool ParsingStrategy::SearchUnescapedPattern(const char *&ptr, const char const char *search_end = buf_end; if (size == 1) { while (ptr < search_end && *ptr != *c_pattern) { - if (escape_char && *ptr == escape_char) + if (escape_char_ && *ptr == escape_char_) ptr += 2; else ++ptr; @@ -116,7 +116,7 @@ inline bool ParsingStrategy::SearchUnescapedPattern(const char *&ptr, const char } else if (size == 2) { --search_end; while (ptr < search_end && (*ptr != *c_pattern || ptr[1] != c_pattern[1])) { - if (escape_char && *ptr == escape_char) + if (escape_char_ && *ptr == escape_char_) ptr += 2; else ++ptr; @@ -124,7 +124,7 @@ inline bool ParsingStrategy::SearchUnescapedPattern(const char *&ptr, const char } else { int b = 0; for (; ptr < buf_end; ++ptr) { - if (escape_char && *ptr == escape_char) { + if (escape_char_ && *ptr == escape_char_) { b = 0; ++ptr; } else if (c_pattern[b] != *ptr) { @@ -144,14 +144,14 @@ inline bool ParsingStrategy::SearchUnescapedPattern(const char *&ptr, const char while(ptr < search_end && (*ptr != c_pattern[0] || *(ptr+1) != c_pattern[1] || *(ptr+2) != c_pattern[2])) { - if(*ptr == escape_char) + if(*ptr == escape_char_) ptr += 2; else ++ptr; } } else - ASSERT(0, (std::string("Unexpected pattern: '") + delimiter + + ASSERT(0, (std::string("Unexpected pattern: '") + delimiter_ + "'").c_str());*/ return (ptr < search_end); @@ -169,13 +169,13 @@ inline ParsingStrategy::SearchResult ParsingStrategy::SearchUnescapedPatternNoEO const std::vector &kmp_next) { const char *c_pattern = pattern.c_str(); size_t size = pattern.size(); - const char *c_eol = eol.c_str(); - size_t crlf = eol.size(); + const char *c_eol = terminator_.c_str(); + size_t crlf = terminator_.size(); const char *search_end = buf_end; if (size == 1) { while (ptr < search_end && *ptr != *c_pattern) { - if (escape_char && *ptr == escape_char) + if (escape_char_ && *ptr == escape_char_) ptr += 2; else if (*ptr == *c_eol && ptr + crlf <= buf_end && TailsMatch(ptr, c_eol, crlf)) return SearchResult::END_OF_LINE; @@ -185,7 +185,7 @@ inline ParsingStrategy::SearchResult ParsingStrategy::SearchUnescapedPatternNoEO } else if (size == 2) { --search_end; while (ptr < search_end && (*ptr != *c_pattern || ptr[1] != c_pattern[1])) { - if (escape_char && *ptr == escape_char) + if (escape_char_ && *ptr == escape_char_) ptr += 2; else if (*ptr == *c_eol && ptr + crlf <= buf_end && TailsMatch(ptr, c_eol, crlf)) return SearchResult::END_OF_LINE; @@ -195,7 +195,7 @@ inline ParsingStrategy::SearchResult ParsingStrategy::SearchUnescapedPatternNoEO } else { int b = 0; for (; ptr < buf_end; ++ptr) { - if (escape_char && *ptr == escape_char) { + if (escape_char_ && *ptr == escape_char_) { b = 0; ++ptr; } else if (*ptr == *c_eol && ptr + crlf <= buf_end && TailsMatch(ptr, c_eol, crlf)) @@ -217,52 +217,52 @@ inline ParsingStrategy::SearchResult ParsingStrategy::SearchUnescapedPatternNoEO while(ptr < search_end && (*ptr != c_pattern[0] || *(ptr+1) != c_pattern[1] || *(ptr+2) != c_pattern[2])) { - if(escape_char && *ptr == escape_char) + if(escape_char_ && *ptr == escape_char_) ptr += 2; - else if (*ptr == eol[0] && (crlf == 1 || *(ptr+1) == eol[1])) + else if (*ptr == terminator_[0] && (crlf == 1 || *(ptr+1) == terminator_[1])) return SearchResult::END_OF_LINE; else ++ptr; } } else - ASSERT(0, (std::string("Unexpected pattern: '") + delimiter + + ASSERT(0, (std::string("Unexpected pattern: '") + delimiter_ + "'").c_str());*/ return (ptr < search_end) ? SearchResult::PATTERN_FOUND : SearchResult::END_OF_BUFFER; } void ParsingStrategy::GetEOL(const char *const buf, const char *const buf_end) { - if (eol.size() == 0) { + if (terminator_.size() == 0) { const char *ptr = buf; - for (uint col = 0; col < atis.size() - 1; ++col) { - if (string_qualifier && *ptr == string_qualifier) { - if (!SearchUnescapedPattern(++ptr, buf_end, enclose_delimiter, kmp_next_enclose_delimiter)) + for (uint col = 0; col < attr_infos_.size() - 1; ++col) { + if (string_qualifier_ && *ptr == string_qualifier_) { + if (!SearchUnescapedPattern(++ptr, buf_end, enclose_delimiter_, kmp_next_enclose_delimiter_)) throw common::Exception( "Unable to detect the line terminating sequence, please specify " "it " "explicitly."); ++ptr; - } else if (!SearchUnescapedPattern(ptr, buf_end, delimiter, kmp_next_delimiter)) + } else if (!SearchUnescapedPattern(ptr, buf_end, delimiter_, kmp_next_delimiter_)) throw common::Exception( "Unable to detect the line terminating sequence, please specify it " "explicitly."); - ptr += delimiter.size(); + ptr += delimiter_.size(); } - if (string_qualifier && *ptr == string_qualifier) + if (string_qualifier_ && *ptr == string_qualifier_) GuessUnescapedEOLWithEnclose(++ptr, buf_end); else GuessUnescapedEOL(ptr, buf_end); } - if (eol.size() == 0) + if (terminator_.size() == 0) throw common::Exception( "Unable to detect the line terminating sequence, please specify it " "explicitly."); - PrepareKMP(kmp_next_eol, eol); - if (string_qualifier) { - enclose_eol = std::string((char *)&string_qualifier, 1) + eol; - PrepareKMP(kmp_next_enclose_eol, enclose_eol); + PrepareKMP(kmp_next_terminator_, terminator_); + if (string_qualifier_) { + enclose_terminator_ = std::string((char *)&string_qualifier_, 1) + terminator_; + PrepareKMP(kmp_next_enclose_terminator_, enclose_terminator_); } } @@ -270,9 +270,9 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s std::vector &record, uint &rowsize, int &errorinfo) { const char *buf_end = buf + size; - if (!prepared) { + if (!prepared_) { GetEOL(buf, buf_end); - prepared = true; + prepared_ = true; } if (buf == buf_end) @@ -281,13 +281,13 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s const char *ptr = buf; bool row_incomplete = false; errorinfo = -1; - for (uint col = 0; col < atis.size() - 1; ++col) { + for (uint col = 0; col < attr_infos_.size() - 1; ++col) { const char *val_beg = ptr; - if (string_qualifier && *ptr == string_qualifier) { - row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_delimiter, kmp_next_enclose_delimiter); + if (string_qualifier_ && *ptr == string_qualifier_) { + row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_delimiter_, kmp_next_enclose_delimiter_); ++ptr; } else { - SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, delimiter, kmp_next_delimiter); + SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, delimiter_, kmp_next_delimiter_); if (res == SearchResult::END_OF_LINE) { GetValue(val_beg, ptr - val_beg, col, record[col]); continue; @@ -306,32 +306,32 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s if (errorinfo == -1) errorinfo = col; } - ptr += delimiter.size(); + ptr += delimiter_.size(); } if (!row_incomplete) { // the last column const char *val_beg = ptr; - if (string_qualifier && *ptr == string_qualifier) { - row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_eol, kmp_next_enclose_eol); + if (string_qualifier_ && *ptr == string_qualifier_) { + row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_terminator_, kmp_next_enclose_terminator_); ++ptr; } else - row_incomplete = !SearchUnescapedPattern(ptr, buf_end, eol, kmp_next_eol); + row_incomplete = !SearchUnescapedPattern(ptr, buf_end, terminator_, kmp_next_terminator_); if (!row_incomplete) { try { - GetValue(val_beg, ptr - val_beg, atis.size() - 1, record[atis.size() - 1]); + GetValue(val_beg, ptr - val_beg, attr_infos_.size() - 1, record[attr_infos_.size() - 1]); } catch (...) { if (errorinfo == -1) - errorinfo = atis.size() - 1; + errorinfo = attr_infos_.size() - 1; } - ptr += eol.size(); + ptr += terminator_.size(); } } if (row_incomplete) { if (errorinfo == -1) - errorinfo = atis.size() - 1; + errorinfo = attr_infos_.size() - 1; return ParsingStrategy::ParseResult::EOB; } rowsize = uint(ptr - buf); @@ -352,7 +352,7 @@ void ParsingStrategy::GetValue(const char *value_ptr, size_t value_size, ushort core::AttributeTypeInfo &ati = GetATI(col); bool is_enclosed = false; - if (string_qualifier && *value_ptr == string_qualifier) { + if (string_qualifier_ && *value_ptr == string_qualifier_) { // trim quotes ++value_ptr; value_size -= 2; @@ -418,8 +418,8 @@ void ParsingStrategy::GetValue(const char *value_ptr, size_t value_size, ushort if (ati.CharLen() < (uint)value_size) { std::string valueStr(value_ptr, value_size); value_size = ati.CharLen(); - TIANMU_LOG(LogCtl_Level::DEBUG, "Data format error. DbName:%s ,TableName:%s ,Col %d, value:%s", dbname.c_str(), - tablename.c_str(), col, valueStr.c_str()); + TIANMU_LOG(LogCtl_Level::DEBUG, "Data format error. DbName:%s ,TableName:%s ,Col %d, value:%s", dbname_.c_str(), + tablename_.c_str(), col, valueStr.c_str()); std::stringstream err_msg; err_msg << "data truncate,col num" << col << " value:" << valueStr << std::endl; common::PushWarning(current_txn_->Thd(), Sql_condition::SL_WARNING, ER_UNKNOWN_ERROR, err_msg.str().c_str()); @@ -429,26 +429,26 @@ void ParsingStrategy::GetValue(const char *value_ptr, size_t value_size, ushort char *buf = reinterpret_cast(buffer.Prepare(reserved)); size_t new_size = 0; for (size_t j = 0; j < value_size; j++) { - if (value_ptr[j] == escape_char) + if (value_ptr[j] == escape_char_) buf[new_size] = TranslateEscapedChar(value_ptr[++j]); - else if (value_ptr[j] == *delimiter.c_str()) + else if (value_ptr[j] == *delimiter_.c_str()) break; else buf[new_size] = value_ptr[j]; new_size++; } - if (ati.CharsetInfo() != cs_info) { + if (ati.CharsetInfo() != charset_info_) { // convert between charsets uint errors = 0; - if (ati.CharsetInfo()->mbmaxlen <= cs_info->mbmaxlen) - new_size = copy_and_convert(buf, reserved, ati.CharsetInfo(), buf, new_size, cs_info, &errors); + if (ati.CharsetInfo()->mbmaxlen <= charset_info_->mbmaxlen) + new_size = copy_and_convert(buf, reserved, ati.CharsetInfo(), buf, new_size, charset_info_, &errors); else { - if (new_size > temp_buf.size()) - temp_buf.resize(new_size); - char *tmpbuf = &temp_buf[0]; + if (new_size > temp_buf_.size()) + temp_buf_.resize(new_size); + char *tmpbuf = &temp_buf_[0]; std::memcpy(tmpbuf, buf, new_size); - new_size = copy_and_convert(buf, reserved, ati.CharsetInfo(), tmpbuf, new_size, cs_info, &errors); + new_size = copy_and_convert(buf, reserved, ati.CharsetInfo(), tmpbuf, new_size, charset_info_, &errors); } } diff --git a/storage/tianmu/loader/parsing_strategy.h b/storage/tianmu/loader/parsing_strategy.h index 567ec4283f..ce14b88763 100644 --- a/storage/tianmu/loader/parsing_strategy.h +++ b/storage/tianmu/loader/parsing_strategy.h @@ -37,28 +37,28 @@ class ParsingStrategy final { int &errorinfo); protected: - core::AttributeTypeInfo &GetATI(ushort col) { return atis[col]; } + core::AttributeTypeInfo &GetATI(ushort col) { return attr_infos_[col]; } private: - std::vector atis; - bool prepared; - - std::string eol; - std::string delimiter; // cloumn separator - uchar string_qualifier; - char escape_char; // row separator - std::string enclose_delimiter; - std::string enclose_eol; - std::string tablename; - std::string dbname; - - kmp_next_t kmp_next_delimiter; - kmp_next_t kmp_next_enclose_delimiter; - kmp_next_t kmp_next_eol; - kmp_next_t kmp_next_enclose_eol; - - CHARSET_INFO *cs_info; - std::vector temp_buf; + std::vector attr_infos_; + bool prepared_; + + std::string terminator_; + std::string delimiter_; // cloumn separator + uchar string_qualifier_; + char escape_char_; // row separator + std::string enclose_delimiter_; + std::string enclose_terminator_; + std::string tablename_; + std::string dbname_; + + kmp_next_t kmp_next_delimiter_; + kmp_next_t kmp_next_enclose_delimiter_; + kmp_next_t kmp_next_terminator_; + kmp_next_t kmp_next_enclose_terminator_; + + CHARSET_INFO *charset_info_; + std::vector temp_buf_; void GuessUnescapedEOL(const char *ptr, const char *buf_end); void GuessUnescapedEOLWithEnclose(const char *ptr, const char *const buf_end);