Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tianmu): Improve tianmu readability(#11) #644

Merged
merged 2 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sql/sql_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ bool Sql_cmd_load_table::execute_inner(THD *thd,
return true;
}
// TIANMU UPGRADE BEGIN
if (!Tianmu::dbhandler::tianmu_load(thd, &m_exchange, table_list,
if (!Tianmu::DBHandler::Tianmu_Load(thd, &m_exchange, table_list,
(void *)&lf_info)) {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions storage/tianmu/core/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#include "util/thread_pool.h"

namespace Tianmu {
namespace dbhandler {
namespace DBHandler {
extern void resolve_async_join_settings(const std::string &settings);
}
namespace core {
Expand Down Expand Up @@ -303,7 +303,7 @@ int Engine::Init(uint engine_slot) {
}

if (tianmu_sysvar_start_async > 0) ResetTaskExecutor(tianmu_sysvar_start_async);
dbhandler::resolve_async_join_settings(tianmu_sysvar_async_join);
DBHandler::resolve_async_join_settings(tianmu_sysvar_async_join);

return 0;
}
Expand Down
39 changes: 28 additions & 11 deletions storage/tianmu/core/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Engine final {
Engine();
~Engine();

// The basic operations.
int Init(uint engine_slot);
void CreateTable(const std::string &table, TABLE *from, dd::Table *table_def);
void DeleteTable(const char *table, const dd::Table *table_def, THD *thd);
Expand All @@ -96,43 +97,58 @@ class Engine final {
unsigned long GetLDT() const { return LDT; }
unsigned long GetUPM() const { return UPM; }
unsigned long GetUT() const { return UT; }

void IncTianmuStatUpdate() { ++tianmu_stat.update; }

std::vector<AttrInfo> GetTableAttributesInfo(const std::string &table_path, TABLE_SHARE *table_share);
void UpdateAndStoreColumnComment(TABLE *table, int field_id, Field *source_field, int source_field_id,
CHARSET_INFO *cs);

// table operations.
std::shared_ptr<RCTable> GetTableRD(const std::string &table_path);
std::shared_ptr<TableShare> GetTableShare(const TABLE_SHARE *table_share);
void UnRegisterTable(const std::string &table_path);
void GetTableIterator(const std::string &table_path, RCTable::Iterator &iter_begin, RCTable::Iterator &iter_end,
std::shared_ptr<RCTable> &table, const std::vector<bool> &, THD *thd);
void AddTableIndex(const std::string &table_path, TABLE *table, THD *thd);
std::shared_ptr<index::RCTableIndex> GetTableIndex(const std::string &table_path);
bool has_pk(TABLE *table) const { return table->s->primary_key != MAX_INDEXES; }
void RenameRdbTable(const std::string &from, const std::string &to);
void AddMemTable(TABLE *form, std::shared_ptr<TableShare> share);
void UnregisterMemTable(const std::string &from, const std::string &to);

// For Load Data.
common::TIANMUError RunLoader(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg);

// transaction operations.
void CommitTx(THD *thd, bool all);
void Rollback(THD *thd, bool all, bool force_error_message = false);
Transaction *CreateTx(THD *thd);
Transaction *GetTx(THD *thd);
void ClearTx(THD *thd);
int HandleSelect(THD *thd, LEX *lex, Query_result *&result_output, ulong setup_tables_done_option, int &res,

// processing the queries which routed to Tianmu.
int Handle_Query(THD *thd, LEX *lex, Query_result *&result_output, ulong setup_tables_done_option, int &res,
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert = false);

system::ResourceManager *getResourceManager() const { return m_resourceManager; }
std::shared_ptr<RCTable> GetTableRD(const std::string &table_path);

// row operations.
int InsertRow(const std::string &tablename, Transaction *trans_, TABLE *table, std::shared_ptr<TableShare> &share);
void InsertDelayed(const std::string &table_path, int tid, TABLE *table);
void InsertMemRow(const std::string &table_path, std::shared_ptr<TableShare> &share, TABLE *table);
std::string DelayedBufferStat() { return insert_buffer.Status(); }
std::string RowStoreStat();
void UnRegisterTable(const std::string &table_path);
std::shared_ptr<TableShare> GetTableShare(const TABLE_SHARE *table_share);

common::TX_ID MinXID() const { return min_xid; }
common::TX_ID MaxXID() const { return max_xid; }

void DeferRemove(const fs::path &file, int32_t cookie);
void HandleDeferredJobs();
// support for primary key
void AddTableIndex(const std::string &table_path, TABLE *table, THD *thd);
std::shared_ptr<index::RCTableIndex> GetTableIndex(const std::string &table_path);
bool has_pk(TABLE *table) const { return table->s->primary_key != MAX_INDEXES; }
void RenameRdbTable(const std::string &from, const std::string &to);

void DropSignal() { cv_drop_.notify_one(); }
void ResetTaskExecutor(int percent);
TaskExecutor *GetTaskExecutor() const { return task_executor.get(); }
void AddMemTable(TABLE *form, std::shared_ptr<TableShare> share);
void UnregisterMemTable(const std::string &from, const std::string &to);

public:
utils::thread_pool delay_insert_thread_pool;
Expand Down Expand Up @@ -342,6 +358,7 @@ int get_parameter(THD *thd, enum tianmu_var_name vn, int64_t &value);
int get_parameter(THD *thd, enum tianmu_var_name vn, std::string &value);

bool parameter_equals(THD *thd, enum tianmu_var_name vn, longlong value);

} // namespace core
} // namespace Tianmu

Expand Down
9 changes: 7 additions & 2 deletions storage/tianmu/core/engine_execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ error through res'. If the query can not be compiled by Tianmu engine
RETURN_QUERY_TO_MYSQL_ROUTE is returned and MySQL engine continues query
execution.
*/
int Engine::HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulong setup_tables_done_option, int &res,
int Engine::Handle_Query(THD *thd, LEX *lex, Query_result *&result, ulong setup_tables_done_option, int &res,
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert) {
KillTimer timer(thd, tianmu_sysvar_max_execution_time);

Expand Down Expand Up @@ -327,11 +327,13 @@ int optimize_select(THD *thd, ulong select_options, Query_result *result, Query_
join = select_lex->join;
// here is EXPLAIN of subselect or derived table
if (select_lex->linkage != DERIVED_TABLE_TYPE || (select_options & (1ULL << 2))) {
// global_options_type means a global option, such as limit order by, etc., of a union query. refer to
// Query_expression::add_fake_query_block
if (select_lex->linkage != GLOBAL_OPTIONS_TYPE) {
if (result->prepare(thd, *select_lex->join->fields, select_lex->master_query_expression())) {
return true;
}
} else {
} else { // it is a global_opton query block, such as limit, order by, etc.
if ((err = select_lex->prepare(thd, nullptr))) // stonedb8
{
return err;
Expand All @@ -353,8 +355,11 @@ int optimize_select(THD *thd, ulong select_options, Query_result *result, Query_
if (!(join = new JOIN(thd, select_lex))) return true; /* purecov: inspected */
select_lex->set_join(thd, join);
}

join->best_rowcount = 2;
optimize_after_tianmu = true;
// all the preparation operations are done, therefore, we set the `part` to 1, then pass it to JOIN::optimize.
// 1 means we have done the preparation.
if ((err = join->optimize(1))) return err;
return false;
}
Expand Down
24 changes: 12 additions & 12 deletions storage/tianmu/handler/ha_my_tianmu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
#include "vc/virtual_column.h"

namespace Tianmu {
namespace dbhandler {
namespace DBHandler {

enum class TIANMUEngineReturnValues { LD_Successed = 100, LD_Failed = 101, LD_Continue = 102 };

void TIANMU_UpdateAndStoreColumnComment(TABLE *table, int field_id, Field *source_field, int source_field_id,
void Tianmu_UpdateAndStoreColumnComment(TABLE *table, int field_id, Field *source_field, int source_field_id,
CHARSET_INFO *cs) {
try {
ha_rcengine_->UpdateAndStoreColumnComment(table, field_id, source_field, source_field_id, cs);
Expand All @@ -52,7 +52,7 @@ bool AtLeastOneTIANMUTableInvolved(LEX *lex) {
bool ForbiddenMySQLQueryPath([[maybe_unused]] LEX *lex) { return (tianmu_sysvar_allowmysqlquerypath == 0); }
} // namespace

bool TIANMU_SetStatementAllowed(THD *thd, LEX *lex) {
bool Tianmu_SetStatementAllowed(THD *thd, LEX *lex) {
if (AtLeastOneTIANMUTableInvolved(lex)) {
if (ForbiddenMySQLQueryPath(lex)) {
my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR),
Expand All @@ -71,14 +71,14 @@ bool TIANMU_SetStatementAllowed(THD *thd, LEX *lex) {
return true;
}

int TIANMU_HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulong setup_tables_done_option, int &res,
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert) {
int Tianm_Handle_Query(THD *thd, LEX *lex, Query_result *&result, ulong setup_tables_done_option, int &res,
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert) {
int ret = RCBASE_QUERY_ROUTE;
try {
// handle_select_ret is introduced here because in case of some exceptions
// (e.g. thrown from ForbiddenMySQLQueryPath) we want to return
// RCBASE_QUERY_ROUTE
int handle_select_ret = ha_rcengine_->HandleSelect(thd, lex, result, setup_tables_done_option, res,
int handle_select_ret = ha_rcengine_->Handle_Query(thd, lex, result, setup_tables_done_option, res,
optimize_after_tianmu, tianmu_free_join, with_insert);
if (handle_select_ret == RETURN_QUERY_TO_MYSQL_ROUTE && AtLeastOneTIANMUTableInvolved(lex) &&
ForbiddenMySQLQueryPath(lex)) {
Expand All @@ -91,16 +91,16 @@ Either restructure the query with supported syntax, or enable the MySQL core::Qu
ret = handle_select_ret;
} catch (std::exception &e) {
my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
TIANMU_LOG(LogCtl_Level::ERROR, "HandleSelect Error: %s", e.what());
TIANMU_LOG(LogCtl_Level::ERROR, "Handle_Query Error: %s", e.what());
} catch (...) {
my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
}
return ret;
}

int TIANMU_LoadData(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg, char *errmsg, int len,
int &errcode) {
static int Tianmu_LoadData(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg, char *errmsg, int len,
int &errcode) {
common::TIANMUError tianmu_error;
int ret = static_cast<int>(TIANMUEngineReturnValues::LD_Failed);

Expand All @@ -127,11 +127,11 @@ int TIANMU_LoadData(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *ar
}

// returning true means 'to continue'
bool tianmu_load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg) {
bool Tianmu_Load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg) {
char tianmumsg[256];
int tianmu_errcode;
switch (static_cast<TIANMUEngineReturnValues>(
TIANMU_LoadData(thd, ex, table_list, arg, tianmumsg, 256, tianmu_errcode))) {
Tianmu_LoadData(thd, ex, table_list, arg, tianmumsg, 256, tianmu_errcode))) {
case TIANMUEngineReturnValues::LD_Continue:
return true;
case TIANMUEngineReturnValues::LD_Failed:
Expand All @@ -146,5 +146,5 @@ bool tianmu_load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg)
return false;
}

} // namespace dbhandler
} // namespace DBHandler
} // namespace Tianmu
17 changes: 10 additions & 7 deletions storage/tianmu/handler/ha_my_tianmu.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

// mysql <--> tianmu interface functions
namespace Tianmu {
namespace dbhandler {
namespace DBHandler {

void TIANMU_UpdateAndStoreColumnComment(TABLE *table, int field_id, Field *source_field, int source_field_id,
void Tianmu_UpdateAndStoreColumnComment(TABLE *table, int field_id, Field *source_field, int source_field_id,
CHARSET_INFO *cs);

bool TIANMU_SetStatementAllowed(THD *thd, LEX *lex);
bool Tianmu_SetStatementAllowed(THD *thd, LEX *lex);

int TIANMU_HandleSelect(THD *thd, LEX *lex, Query_result *&result_output, ulong setup_tables_done_option, int &res,
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert = false);
bool tianmu_load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg);
// processing the queries which routed to Tianmu Engine.
int Tianm_Handle_Query(THD *thd, LEX *lex, Query_result *&result_output, ulong setup_tables_done_option, int &res,
int &optimize_after_tianmu, int &tianmu_free_join, int with_insert = false);

} // namespace dbhandler
// processing the load operation.
bool Tianmu_Load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg);

} // namespace DBHandler
} // namespace Tianmu

#endif // HA_MY_TIANMU_H_
14 changes: 7 additions & 7 deletions storage/tianmu/handler/ha_rcengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct SYS_VAR {
};

namespace Tianmu {
namespace dbhandler {
namespace DBHandler {

/*
If frm_error() is called then we will use this to to find out what file
Expand Down Expand Up @@ -737,22 +737,22 @@ static struct SYS_VAR *tianmu_showvars[] = {MYSQL_SYSVAR(bg_load_threads),
MYSQL_SYSVAR(start_async),
MYSQL_SYSVAR(result_sender_rows),
NULL};
} // namespace dbhandler
} // namespace DBHandler
} // namespace Tianmu

mysql_declare_plugin(tianmu){
MYSQL_STORAGE_ENGINE_PLUGIN,
&Tianmu::dbhandler::tianmu_storage_engine,
&Tianmu::DBHandler::tianmu_storage_engine,
"TIANMU",
"StoneAtom Group Holding Limited",
"Tianmu storage engine",
PLUGIN_LICENSE_GPL,
Tianmu::dbhandler::rcbase_init_func, /* Plugin Init */
Tianmu::DBHandler::rcbase_init_func, /* Plugin Init */
nullptr, /* Plugin Check uninstall */
Tianmu::dbhandler::rcbase_done_func, /* Plugin Deinit */
Tianmu::DBHandler::rcbase_done_func, /* Plugin Deinit */
0x0001 /* 0.1 */,
Tianmu::dbhandler::statusvars, /* status variables */
Tianmu::dbhandler::tianmu_showvars, /* system variables */
Tianmu::DBHandler::statusvars, /* status variables */
Tianmu::DBHandler::tianmu_showvars, /* system variables */
nullptr, /* config options */
0 /* flags for plugin */
} mysql_declare_plugin_end;
4 changes: 2 additions & 2 deletions storage/tianmu/handler/ha_tianmu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#define MYSQL_SERVER 1

namespace Tianmu {
namespace dbhandler {
namespace DBHandler {

const Alter_inplace_info::HA_ALTER_FLAGS TianmuHandler::TIANMU_SUPPORTED_ALTER_ADD_DROP_ORDER =
Alter_inplace_info::ADD_COLUMN | Alter_inplace_info::DROP_COLUMN | Alter_inplace_info::ALTER_STORED_COLUMN_ORDER;
Expand Down Expand Up @@ -1684,5 +1684,5 @@ void TianmuHandler::key_convert(const uchar *key, uint key_len, std::vector<uint
}
}

} // namespace dbhandler
} // namespace DBHandler
} // namespace Tianmu
4 changes: 2 additions & 2 deletions storage/tianmu/handler/ha_tianmu.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "core/engine.h"

namespace Tianmu {
namespace dbhandler {
namespace DBHandler {

// Class definition for the storage engine
class TianmuHandler final : public handler {
Expand Down Expand Up @@ -191,7 +191,7 @@ class TianmuHandler final : public handler {
bool m_partitioned = false;
};

} // namespace dbhandler
} // namespace DBHandler
} // namespace Tianmu

#endif // HA_TIANMU_H_