Skip to content

Commit

Permalink
feat(stonedb8.0:load binlog): support mysql load binlog. (#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
lujiashun authored and mergify[bot] committed Sep 23, 2022
1 parent 479eef2 commit 124002c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 97 deletions.
2 changes: 1 addition & 1 deletion sql/sql_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class Sql_cmd_load_table final : public Sql_cmd {

bool read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
READ_INFO &read_info, ulong skip_lines);

public: // stonedb8
bool write_execute_load_query_log_event(
THD *thd, const char *db, const char *table_name, bool is_concurrent,
enum enum_duplicates duplicates, bool transactional_table, int errocode);
Expand Down
101 changes: 11 additions & 90 deletions storage/tianmu/core/rc_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,99 +771,20 @@ uint64_t RCTable::ProceedNormal(system::IOParameters &iop) {
return no_loaded_rows;
}

// stonedb8 start TODO
/*
// stonedb8 start
int RCTable::binlog_load_query_log_event(system::IOParameters &iop) {
char *load_data_query, *end, *fname_start, *fname_end, *p = NULL;
size_t pl = 0;
List<Item> fv;
Item *item;
String *str;
String pfield, pfields, string_buf;
int n;
LOAD_FILE_INFO *lf_info;
std::string db_name, tab_name;
lf_info = (LOAD_FILE_INFO *)iop.GetLogInfo();
LOAD_FILE_INFO *lf_info = (LOAD_FILE_INFO *)iop.GetLogInfo();
THD *thd = lf_info->thd;
sql_exchange *ex = thd->lex->exchange;
auto cmd = down_cast<Sql_cmd_load_table *>(thd->lex->m_sql_cmd);
int errcode = query_error_code(thd, true);
TABLE_LIST *const table_list = thd->lex->query_tables;
TABLE *table = thd->lex->query_block->table_list.first->table;
if (ex == nullptr || table == nullptr) return -1;
auto pa = fs::path(iop.GetTableName());
std::tie(db_name, tab_name) = std::make_tuple(pa.parent_path().filename().native(), pa.filename().native());
append_identifier(thd, &string_buf, tab_name.c_str(), tab_name.length());
Load_log_event lle(thd, ex, db_name.c_str(), string_buf.c_ptr_safe(), fv, false, DUP_ERROR, false, true);
if (thd->lex->local_file) lle.set_fname_outside_temp_buf(ex->file_name, std::strlen(ex->file_name));
if (!thd->lex->load_field_list.elements) {
Field **field;
for (field = table->field; *field; field++)
thd->lex->load_field_list.push_back(new Item_field(*field));
// bitmap_set_all(table->write_set);
if (setup_fields(thd, Ref_ptr_array(), thd->lex->load_update_list, UPDATE_ACL, 0, 0, 0) ||
setup_fields(thd, Ref_ptr_array(), thd->lex->load_value_list, SELECT_ACL, 0, 0, 0))
return -1;
} else {
if (setup_fields(thd, Ref_ptr_array(), thd->lex->load_field_list, INSERT_ACL, 0, 0, 0) ||
setup_fields(thd, Ref_ptr_array(), thd->lex->load_update_list, UPDATE_ACL, 0, 0, 0))
return -1;
if (setup_fields(thd, Ref_ptr_array(), thd->lex->load_value_list, SELECT_ACL, 0, 0, 0))
return -1;
}
if (!thd->lex->load_field_list.is_empty()) {
List_iterator<Item> li(thd->lex->load_field_list);
pfields.append(" (");
n = 0;
while ((item = li++)) {
if (n++) pfields.append(", ");
if (item->type() == Item::FIELD_ITEM)
append_identifier(thd, &pfields, item->item_name.ptr(), std::strlen(item->item_name.ptr()));
else
item->print(thd, &pfields, QT_ORDINARY); // stonedb8: print() add param thd
}
pfields.append(")");
}
if (!thd->lex->load_update_list.is_empty()) {
List_iterator<Item> lu(thd->lex->load_update_list);
List_iterator<String> ls(thd->lex->load_set_str_list);
pfields.append(" SET ");
n = 0;
while ((item = lu++)) {
str = ls++;
if (n++) pfields.append(", ");
append_identifier(thd, &pfields, item->item_name.ptr(), std::strlen(item->item_name.ptr()));
str->copy();
pfields.append(str->ptr());
str->mem_free();
}
thd->lex->load_set_str_list.empty();
}
p = pfields.c_ptr_safe();
pl = std::strlen(p);
if (!(load_data_query = (char *)thd->alloc(lle.get_query_buffer_length() + 1 + pl))) return -1;
lle.print_query(false, ex->cs ? ex->cs->csname : NULL, load_data_query, &end, &fname_start, &fname_end);
std::strcpy(end, p);
end += pl;
Execute_load_query_log_event e(
thd, load_data_query, end - load_data_query, static_cast<uint>(fname_start - load_data_query - 1),
static_cast<uint>(fname_end - load_data_query), (binary_log::enum_load_dup_handling)0, true, false, false, 0);
return mysql_bin_log.write_event(&e);
}
*/
int RCTable::binlog_load_query_log_event(system::IOParameters &iop) {
return 0;
bool transactional_table = table->file->has_transactions();
bool is_concurrent =
(table_list->lock_descriptor().type == TL_WRITE_CONCURRENT_INSERT);
return cmd->write_execute_load_query_log_event(
thd, table_list->db, table_list->table_name, is_concurrent,
thd->lex->duplicates, transactional_table, errcode);
}
// stonedb8 end

Expand Down
14 changes: 8 additions & 6 deletions storage/tianmu/handler/ha_rcengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace dbhandler {
enum class TIANMUEngineReturnValues { LD_Successed = 100, LD_Failed = 101, LD_Continue = 102 };

void TIANMU_UpdateAndStoreColumnComment(TABLE *table, int field_id, Field *source_field, int source_field_id,
CHARSET_INFO *cs) {
CHARSET_INFO *cs) {
try {
ha_rcengine_->UpdateAndStoreColumnComment(table, field_id, source_field, source_field_id, cs);
} catch (std::exception &e) {
Expand Down Expand Up @@ -72,14 +72,14 @@ bool TIANMU_SetStatementAllowed(THD *thd, LEX *lex) {
}

int TIANMU_HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulong setup_tables_done_option, int &res,
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert=false) {
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert ) {
int ret = RCBASE_QUERY_ROUTE;
try {
// handle_select_ret is introduced here because in case of some exceptions
// (e.g. thrown from ForbiddenMySQLQueryPath) we want to return
// RCBASE_QUERY_ROUTE
int handle_select_ret = ha_rcengine_->HandleSelect(thd, lex, result, setup_tables_done_option, res, optimize_after_tianmu,
tianmu_free_join, with_insert);
int handle_select_ret = ha_rcengine_->HandleSelect(thd, lex, result, setup_tables_done_option, res,
optimize_after_tianmu, tianmu_free_join, with_insert);
if (handle_select_ret == RETURN_QUERY_TO_MYSQL_ROUTE && AtLeastOneTIANMUTableInvolved(lex) &&
ForbiddenMySQLQueryPath(lex)) {
my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR),
Expand All @@ -99,7 +99,8 @@ Either restructure the query with supported syntax, or enable the MySQL core::Qu
return ret;
}

int TIANMU_LoadData(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg, char *errmsg, int len, int &errcode) {
int TIANMU_LoadData(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg, char *errmsg, int len,
int &errcode) {
common::TIANMUError tianmu_error;
int ret = static_cast<int>(TIANMUEngineReturnValues::LD_Failed);

Expand Down Expand Up @@ -129,7 +130,8 @@ int TIANMU_LoadData(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *ar
bool tianmu_load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg) {
char tianmumsg[256];
int tianmu_errcode;
switch (static_cast<TIANMUEngineReturnValues>(TIANMU_LoadData(thd, ex, table_list, arg, tianmumsg, 256, tianmu_errcode))) {
switch (static_cast<TIANMUEngineReturnValues>(
TIANMU_LoadData(thd, ex, table_list, arg, tianmumsg, 256, tianmu_errcode))) {
case TIANMUEngineReturnValues::LD_Continue:
return true;
case TIANMUEngineReturnValues::LD_Failed:
Expand Down

0 comments on commit 124002c

Please sign in to comment.