Skip to content

Commit

Permalink
react(tianmu): refactor some code about tianmu engine(#11) (#407)
Browse files Browse the repository at this point in the history
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
RingsC and mergify[bot] authored Aug 17, 2022
1 parent ad98bb3 commit b1a7141
Show file tree
Hide file tree
Showing 10 changed files with 532 additions and 380 deletions.
9 changes: 5 additions & 4 deletions storage/tianmu/core/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ int Engine::Init(uint engine_slot) {
system::ClearDirectory(cachefolder_path);

m_resourceManager = new system::ResourceManager();

//init the tianmu key-value store, aka, rocksdb engine.
ha_kvstore_ = new index::KVStore();
ha_kvstore_->Init();

#ifdef FUNCTIONS_EXECUTION_TIMES
fet = new FunctionsExecutionTimes();
Expand Down Expand Up @@ -1883,10 +1887,7 @@ void Engine::AddTableIndex(const std::string &table_path, TABLE *table, [[maybe_
auto iter = m_table_keys.find(table_path);
if (iter == m_table_keys.end()) {
std::shared_ptr<index::RCTableIndex> tab = std::make_shared<index::RCTableIndex>(table_path, table);
if (tab->Enable())
m_table_keys[table_path] = tab;
else
tab.reset();
m_table_keys[table_path] = tab;
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage/tianmu/core/rc_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ int RCTable::MergeMemTable(system::IOParameters &iop) {
if ((t3.tv_sec - t2.tv_sec > 15) && index_table) {
TIANMU_LOG(LogCtl_Level::WARN, "Latency of index table %s larger than 15s, compact manually.",
share->Path().c_str());
ha_kvstore_->GetRdb()->CompactRange(rocksdb::CompactRangeOptions(), index_table->rdbkey_->get_cf(), nullptr, nullptr);
ha_kvstore_->GetRdb()->CompactRange(rocksdb::CompactRangeOptions(), index_table->rocksdb_key_->get_cf(), nullptr, nullptr);
}

return no_loaded_rows;
Expand Down
5 changes: 2 additions & 3 deletions storage/tianmu/handler/tianmu_handler_com.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ int rcbase_init_func(void *p) {
if (hent) strmov_str(global_hostIP_, inet_ntoa(*(struct in_addr *)(hent->h_addr_list[0])));
my_snprintf(global_serverinfo_, sizeof(global_serverinfo_), "\tServerIp:%s\tServerHostName:%s\tServerPort:%d",
global_hostIP_, glob_hostname, mysqld_port);
ha_kvstore_ = new index::KVStore();
ha_kvstore_->Init();
//startup tianmu engine.
ha_rcengine_ = new core::Engine();
ret = ha_rcengine_->Init(total_ha);
{
Expand Down Expand Up @@ -367,7 +366,7 @@ int get_UpdatePerMinute_StatusVar([[maybe_unused]] MYSQL_THD thd, SHOW_VAR *outv
return 0;
}

char masteslave_info[8192];
char masteslave_info[8192]={0};

SHOW_VAR tianmu_masterslave_dump[] = {{"info", masteslave_info, SHOW_CHAR, SHOW_SCOPE_UNDEF}, {NullS, NullS, SHOW_LONG, SHOW_SCOPE_UNDEF}};

Expand Down
78 changes: 77 additions & 1 deletion storage/tianmu/index/kv_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ common::ErrorCode KVStore::KVDelTableMeta(const std::string &tablename) {

// Remove the table entry in data dictionary (this will also remove it from
// the persistent data dictionary).
dict_manager_.add_drop_table(tbl->m_rdbkeys, batch);
dict_manager_.add_drop_table(tbl->GetRdbTableKeys(), batch);
ddl_manager_.remove(tbl, batch);
if (!dict_manager_.commit(batch)) {
return common::ErrorCode::FAILED;
Expand Down Expand Up @@ -308,6 +308,82 @@ bool KVStore::KVWriteBatch(rocksdb::WriteOptions &wopts, rocksdb::WriteBatch *ba
return true;
}

std::string KVStore::generate_cf_name(uint index, TABLE *table)
{
char *comment = table->key_info[index].comment.str;
std::string key_comment = comment ? comment : "";
std::string cf_name = RdbKey::parse_comment(key_comment);
if (cf_name.empty() && !key_comment.empty())
return key_comment;

return cf_name;
}
void KVStore::create_rdbkey(TABLE *table, uint pos, std::shared_ptr<RdbKey> &new_key_def, rocksdb::ColumnFamilyHandle *cf_handle)
{
//assign a new id for this index.
uint index_id = ha_kvstore_->GetNextIndexId();

std::vector<ColAttr> vcols;
KEY *key_info = &table->key_info[pos];
bool unsigned_flag;

for (uint n = 0; n < key_info->actual_key_parts; n++) {
Field *f = key_info->key_part[n].field;
switch (f->type()) {
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_TINY: {
unsigned_flag = ((Field_num *)f)->unsigned_flag;

} break;
default:
unsigned_flag = false;
break;
}
vcols.emplace_back(ColAttr{key_info->key_part[n].field->field_index, f->type(), unsigned_flag});
}

const char *const key_name = table->key_info[pos].name;
///* primary_key: Primary key index number(aka:pos), used in TABLE::key_info[] */
uchar index_type = (pos == table->s->primary_key) ? static_cast<uchar>(IndexType::INDEX_TYPE_PRIMARY)
: static_cast<uchar>(IndexType::INDEX_TYPE_SECONDARY);
uint16_t index_ver = (key_info->actual_key_parts > 1)
? static_cast<uint16_t>(IndexInfoType::INDEX_INFO_VERSION_COLS)
: static_cast<uint16_t>(IndexInfoType::INDEX_INFO_VERSION_INITIAL);

new_key_def = std::make_shared<RdbKey>(index_id, pos, cf_handle, index_ver, index_type, false, key_name, vcols);

}

common::ErrorCode KVStore::create_keys_and_cf(TABLE *table, std::shared_ptr<RdbTable> rdb_tbl)
{
//processing the all indexes in order.
for (uint pos = 0; pos < rdb_tbl->GetRdbTableKeys().size(); pos++) {
//gens the column family(cf) name of ith key.
std::string cf_name = generate_cf_name(pos, table);
if (cf_name == DEFAULT_SYSTEM_CF_NAME)
throw common::Exception("column family not valid for storing index data. cf: " + DEFAULT_SYSTEM_CF_NAME);

//isnot default cf, then get the cf name.
rocksdb::ColumnFamilyHandle *cf_handle = ha_kvstore_->GetCfHandle(cf_name);

if (!cf_handle) {
return common::ErrorCode::FAILED;
}
//create the ith index key with cf.
create_rdbkey(table, pos, rdb_tbl->GetRdbTableKeys().at(pos), cf_handle);
}

return common::ErrorCode::SUCCESS;
}

uint KVStore::pk_index(const TABLE *const table, std::shared_ptr<RdbTable> tbl_def) {
return table->s->primary_key == MAX_INDEXES ? tbl_def->GetRdbTableKeys().size() - 1 : table->s->primary_key;
}


bool IndexCompactFilter::Filter([[maybe_unused]] int level, const rocksdb::Slice &key,
[[maybe_unused]] const rocksdb::Slice &existing_value,
[[maybe_unused]] std::string *new_value, [[maybe_unused]] bool *value_changed) const {
Expand Down
25 changes: 16 additions & 9 deletions storage/tianmu/index/kv_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,41 @@ class KVStore final {
bool IndexDroping(GlobalId &index) {
return dict_manager_.is_drop_index_ongoing(index, MetaType::DDL_DROP_INDEX_ONGOING);
}
// kv table meta operation
// find a table by table name returns this table handler
//kv table meta operation
//find a table by table name returns this table handler
std::shared_ptr<RdbTable> FindTable(std::string &name) { return ddl_manager_.find(name); }
// Put table definition of `tbl` into the mapping, and also write it to the
// on-disk data dictionary.
//Put table definition of `tbl` into the mapping, and also write it to the
//on-disk data dictionary.
common::ErrorCode KVWriteTableMeta(std::shared_ptr<RdbTable> tbl);
common::ErrorCode KVDelTableMeta(const std::string &tablename);
common::ErrorCode KVRenameTableMeta(const std::string &s_name, const std::string &d_name);

// kv memory table meta operation
// as KVWriteTableMeta does, but not to on-disk but in-mem
//kv memory table meta operation
//as KVWriteTableMeta does, but not to on-disk but in-mem
std::shared_ptr<core::RCMemTable> FindMemTable(std::string &name) { return ddl_manager_.find_mem(name); }
common::ErrorCode KVWriteMemTableMeta(std::shared_ptr<core::RCMemTable> tb_mem);
common::ErrorCode KVDelMemTableMeta(std::string table_name);
common::ErrorCode KVRenameMemTableMeta(std::string s_name, std::string d_name);

// kv data operation
//kv data operation
bool KVDeleteKey(rocksdb::WriteOptions &wopts, rocksdb::ColumnFamilyHandle *cf, rocksdb::Slice &key);
rocksdb::Iterator *GetScanIter(rocksdb::ReadOptions &ropts, rocksdb::ColumnFamilyHandle *cf) {
return txn_db_->NewIterator(ropts, cf);
}

//write mult-rows in batch mode with write options.
bool KVWriteBatch(rocksdb::WriteOptions &wopts, rocksdb::WriteBatch *batch);
//gets snapshot from rocksdb.
const rocksdb::Snapshot *GetRdbSnapshot() { return txn_db_->GetSnapshot(); }
//release the specific snapshot
void ReleaseRdbSnapshot(const rocksdb::Snapshot *snapshot) { txn_db_->ReleaseSnapshot(snapshot); }

//gets the column family name by table handler.
static std::string generate_cf_name(uint index, TABLE *table);
//creates a ith key of rocksdb table.
static void create_rdbkey(TABLE *table, uint pos, std::shared_ptr<RdbKey> &new_key_def, rocksdb::ColumnFamilyHandle *cf_handle);
//create keys and column family for a rocksdb table.
static common::ErrorCode create_keys_and_cf(TABLE *table, std::shared_ptr<RdbTable> rdb_tbl);
//Returns index of primary key
static uint pk_index(const TABLE *const table, std::shared_ptr<RdbTable> tbl_def);
private:
//initializationed?
bool inited_ = false;
Expand Down
27 changes: 19 additions & 8 deletions storage/tianmu/index/kv_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,23 @@ rocksdb::Status KVTransaction::Get(rocksdb::ColumnFamilyHandle *column_family, c

rocksdb::Status KVTransaction::Put(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
const rocksdb::Slice &value) {
index_batch_->Put(column_family, key, value);
return rocksdb::Status::OK();
return index_batch_->Put(column_family, key, value);
}

rocksdb::Status KVTransaction::Delete(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key) {
index_batch_->Delete(column_family, key);
return rocksdb::Status::OK();

return index_batch_->Delete(column_family, key);
}

rocksdb::Iterator *KVTransaction::GetIterator(rocksdb::ColumnFamilyHandle *const column_family, bool skip_filter) {

if (skip_filter) {
read_opts_.total_order_seek = true;
} else {
read_opts_.total_order_seek = false;
read_opts_.prefix_same_as_start = true;
}

return index_batch_->NewIteratorWithBase(ha_kvstore_->GetRdb()->NewIterator(read_opts_, column_family));
}

Expand All @@ -64,43 +65,53 @@ rocksdb::Status KVTransaction::GetData(rocksdb::ColumnFamilyHandle *column_famil

rocksdb::Status KVTransaction::PutData(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
const rocksdb::Slice &value) {
data_batch_->Put(column_family, key, value);
return rocksdb::Status::OK();

return data_batch_->Put(column_family, key, value);
}

rocksdb::Status KVTransaction::SingleDeleteData(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key) {
// notice: if a key is overwritten (by calling Put() multiple times), then the
// result of calling SingleDelete() on this key is undefined, delete is better
data_batch_->SingleDelete(column_family, key);
return rocksdb::Status::OK();
return data_batch_->SingleDelete(column_family, key);
}

rocksdb::Iterator *KVTransaction::GetDataIterator(rocksdb::ReadOptions &ropts,
rocksdb::ColumnFamilyHandle *const column_family) {

return ha_kvstore_->GetRdb()->NewIterator(ropts, column_family);
}

void KVTransaction::Acquiresnapshot() {

if (read_opts_.snapshot == nullptr) read_opts_.snapshot = ha_kvstore_->GetRdbSnapshot();
}

void KVTransaction::Releasesnapshot() {

if (read_opts_.snapshot != nullptr) {
ha_kvstore_->ReleaseRdbSnapshot(read_opts_.snapshot);
read_opts_.snapshot = nullptr;
}
}

bool KVTransaction::Commit() {

bool res = true;
//firstly, release the snapshot.
Releasesnapshot();
//if we have data to commit, then do writing index data ops by KVWriteBatch.
auto index_write_batch = index_batch_->GetWriteBatch();
if (index_write_batch && index_write_batch->Count() > 0 && !ha_kvstore_->KVWriteBatch(write_opts_, index_write_batch)) {
//write failed.
res = false;
}
//write the data.
if (res && data_batch_->Count() > 0 && !ha_kvstore_->KVWriteBatch(write_opts_, data_batch_.get())) {
//write failed.
res = false;
}

//writes the data sucessfully, then clean up xxx_batch.
index_batch_->Clear();
data_batch_->Clear();
return res;
Expand Down
Loading

0 comments on commit b1a7141

Please sign in to comment.