diff --git a/storage/tianmu/core/engine.cpp b/storage/tianmu/core/engine.cpp index 3e6ea7fe7..c024c1065 100644 --- a/storage/tianmu/core/engine.cpp +++ b/storage/tianmu/core/engine.cpp @@ -228,6 +228,10 @@ int Engine::Init(uint engine_slot) { system::ClearDirectory(cachefolder_path); m_resourceManager = new system::ResourceManager(); + + //init the tianmu key-value store, aka, rocksdb engine. + ha_kvstore_ = new index::KVStore(); + ha_kvstore_->Init(); #ifdef FUNCTIONS_EXECUTION_TIMES fet = new FunctionsExecutionTimes(); @@ -1883,10 +1887,7 @@ void Engine::AddTableIndex(const std::string &table_path, TABLE *table, [[maybe_ auto iter = m_table_keys.find(table_path); if (iter == m_table_keys.end()) { std::shared_ptr tab = std::make_shared(table_path, table); - if (tab->Enable()) - m_table_keys[table_path] = tab; - else - tab.reset(); + m_table_keys[table_path] = tab; } } diff --git a/storage/tianmu/core/rc_table.cpp b/storage/tianmu/core/rc_table.cpp index 88fec6ff8..3c90e7272 100644 --- a/storage/tianmu/core/rc_table.cpp +++ b/storage/tianmu/core/rc_table.cpp @@ -1348,7 +1348,7 @@ int RCTable::MergeMemTable(system::IOParameters &iop) { if ((t3.tv_sec - t2.tv_sec > 15) && index_table) { TIANMU_LOG(LogCtl_Level::WARN, "Latency of index table %s larger than 15s, compact manually.", share->Path().c_str()); - ha_kvstore_->GetRdb()->CompactRange(rocksdb::CompactRangeOptions(), index_table->rdbkey_->get_cf(), nullptr, nullptr); + ha_kvstore_->GetRdb()->CompactRange(rocksdb::CompactRangeOptions(), index_table->rocksdb_key_->get_cf(), nullptr, nullptr); } return no_loaded_rows; diff --git a/storage/tianmu/handler/tianmu_handler_com.cpp b/storage/tianmu/handler/tianmu_handler_com.cpp index 328deb8be..ecb6009f3 100644 --- a/storage/tianmu/handler/tianmu_handler_com.cpp +++ b/storage/tianmu/handler/tianmu_handler_com.cpp @@ -218,8 +218,7 @@ int rcbase_init_func(void *p) { if (hent) strmov_str(global_hostIP_, inet_ntoa(*(struct in_addr *)(hent->h_addr_list[0]))); my_snprintf(global_serverinfo_, sizeof(global_serverinfo_), "\tServerIp:%s\tServerHostName:%s\tServerPort:%d", global_hostIP_, glob_hostname, mysqld_port); - ha_kvstore_ = new index::KVStore(); - ha_kvstore_->Init(); + //startup tianmu engine. ha_rcengine_ = new core::Engine(); ret = ha_rcengine_->Init(total_ha); { @@ -367,7 +366,7 @@ int get_UpdatePerMinute_StatusVar([[maybe_unused]] MYSQL_THD thd, SHOW_VAR *outv return 0; } -char masteslave_info[8192]; +char masteslave_info[8192]={0}; SHOW_VAR tianmu_masterslave_dump[] = {{"info", masteslave_info, SHOW_CHAR, SHOW_SCOPE_UNDEF}, {NullS, NullS, SHOW_LONG, SHOW_SCOPE_UNDEF}}; diff --git a/storage/tianmu/index/kv_store.cpp b/storage/tianmu/index/kv_store.cpp index 4f623ee35..b583acf5e 100644 --- a/storage/tianmu/index/kv_store.cpp +++ b/storage/tianmu/index/kv_store.cpp @@ -199,7 +199,7 @@ common::ErrorCode KVStore::KVDelTableMeta(const std::string &tablename) { // Remove the table entry in data dictionary (this will also remove it from // the persistent data dictionary). - dict_manager_.add_drop_table(tbl->m_rdbkeys, batch); + dict_manager_.add_drop_table(tbl->GetRdbTableKeys(), batch); ddl_manager_.remove(tbl, batch); if (!dict_manager_.commit(batch)) { return common::ErrorCode::FAILED; @@ -308,6 +308,82 @@ bool KVStore::KVWriteBatch(rocksdb::WriteOptions &wopts, rocksdb::WriteBatch *ba return true; } +std::string KVStore::generate_cf_name(uint index, TABLE *table) +{ + char *comment = table->key_info[index].comment.str; + std::string key_comment = comment ? comment : ""; + std::string cf_name = RdbKey::parse_comment(key_comment); + if (cf_name.empty() && !key_comment.empty()) + return key_comment; + + return cf_name; +} +void KVStore::create_rdbkey(TABLE *table, uint pos, std::shared_ptr &new_key_def, rocksdb::ColumnFamilyHandle *cf_handle) +{ + //assign a new id for this index. + uint index_id = ha_kvstore_->GetNextIndexId(); + + std::vector vcols; + KEY *key_info = &table->key_info[pos]; + bool unsigned_flag; + + for (uint n = 0; n < key_info->actual_key_parts; n++) { + Field *f = key_info->key_part[n].field; + switch (f->type()) { + case MYSQL_TYPE_LONGLONG: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_TINY: { + unsigned_flag = ((Field_num *)f)->unsigned_flag; + + } break; + default: + unsigned_flag = false; + break; + } + vcols.emplace_back(ColAttr{key_info->key_part[n].field->field_index, f->type(), unsigned_flag}); + } + + const char *const key_name = table->key_info[pos].name; + ///* primary_key: Primary key index number(aka:pos), used in TABLE::key_info[] */ + uchar index_type = (pos == table->s->primary_key) ? static_cast(IndexType::INDEX_TYPE_PRIMARY) + : static_cast(IndexType::INDEX_TYPE_SECONDARY); + uint16_t index_ver = (key_info->actual_key_parts > 1) + ? static_cast(IndexInfoType::INDEX_INFO_VERSION_COLS) + : static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL); + + new_key_def = std::make_shared(index_id, pos, cf_handle, index_ver, index_type, false, key_name, vcols); + +} + +common::ErrorCode KVStore::create_keys_and_cf(TABLE *table, std::shared_ptr rdb_tbl) +{ + //processing the all indexes in order. + for (uint pos = 0; pos < rdb_tbl->GetRdbTableKeys().size(); pos++) { + //gens the column family(cf) name of ith key. + std::string cf_name = generate_cf_name(pos, table); + if (cf_name == DEFAULT_SYSTEM_CF_NAME) + throw common::Exception("column family not valid for storing index data. cf: " + DEFAULT_SYSTEM_CF_NAME); + + //isnot default cf, then get the cf name. + rocksdb::ColumnFamilyHandle *cf_handle = ha_kvstore_->GetCfHandle(cf_name); + + if (!cf_handle) { + return common::ErrorCode::FAILED; + } + //create the ith index key with cf. + create_rdbkey(table, pos, rdb_tbl->GetRdbTableKeys().at(pos), cf_handle); + } + + return common::ErrorCode::SUCCESS; +} + +uint KVStore::pk_index(const TABLE *const table, std::shared_ptr tbl_def) { + return table->s->primary_key == MAX_INDEXES ? tbl_def->GetRdbTableKeys().size() - 1 : table->s->primary_key; +} + + bool IndexCompactFilter::Filter([[maybe_unused]] int level, const rocksdb::Slice &key, [[maybe_unused]] const rocksdb::Slice &existing_value, [[maybe_unused]] std::string *new_value, [[maybe_unused]] bool *value_changed) const { diff --git a/storage/tianmu/index/kv_store.h b/storage/tianmu/index/kv_store.h index ffed44054..04928fdb8 100644 --- a/storage/tianmu/index/kv_store.h +++ b/storage/tianmu/index/kv_store.h @@ -67,34 +67,41 @@ class KVStore final { bool IndexDroping(GlobalId &index) { return dict_manager_.is_drop_index_ongoing(index, MetaType::DDL_DROP_INDEX_ONGOING); } - // kv table meta operation - // find a table by table name returns this table handler + //kv table meta operation + //find a table by table name returns this table handler std::shared_ptr FindTable(std::string &name) { return ddl_manager_.find(name); } - // Put table definition of `tbl` into the mapping, and also write it to the - // on-disk data dictionary. + //Put table definition of `tbl` into the mapping, and also write it to the + //on-disk data dictionary. common::ErrorCode KVWriteTableMeta(std::shared_ptr tbl); common::ErrorCode KVDelTableMeta(const std::string &tablename); common::ErrorCode KVRenameTableMeta(const std::string &s_name, const std::string &d_name); - // kv memory table meta operation - // as KVWriteTableMeta does, but not to on-disk but in-mem + //kv memory table meta operation + //as KVWriteTableMeta does, but not to on-disk but in-mem std::shared_ptr FindMemTable(std::string &name) { return ddl_manager_.find_mem(name); } common::ErrorCode KVWriteMemTableMeta(std::shared_ptr tb_mem); common::ErrorCode KVDelMemTableMeta(std::string table_name); common::ErrorCode KVRenameMemTableMeta(std::string s_name, std::string d_name); - // kv data operation + //kv data operation bool KVDeleteKey(rocksdb::WriteOptions &wopts, rocksdb::ColumnFamilyHandle *cf, rocksdb::Slice &key); rocksdb::Iterator *GetScanIter(rocksdb::ReadOptions &ropts, rocksdb::ColumnFamilyHandle *cf) { return txn_db_->NewIterator(ropts, cf); } - + //write mult-rows in batch mode with write options. bool KVWriteBatch(rocksdb::WriteOptions &wopts, rocksdb::WriteBatch *batch); //gets snapshot from rocksdb. const rocksdb::Snapshot *GetRdbSnapshot() { return txn_db_->GetSnapshot(); } //release the specific snapshot void ReleaseRdbSnapshot(const rocksdb::Snapshot *snapshot) { txn_db_->ReleaseSnapshot(snapshot); } - + //gets the column family name by table handler. + static std::string generate_cf_name(uint index, TABLE *table); + //creates a ith key of rocksdb table. + static void create_rdbkey(TABLE *table, uint pos, std::shared_ptr &new_key_def, rocksdb::ColumnFamilyHandle *cf_handle); + //create keys and column family for a rocksdb table. + static common::ErrorCode create_keys_and_cf(TABLE *table, std::shared_ptr rdb_tbl); + //Returns index of primary key + static uint pk_index(const TABLE *const table, std::shared_ptr tbl_def); private: //initializationed? bool inited_ = false; diff --git a/storage/tianmu/index/kv_transaction.cpp b/storage/tianmu/index/kv_transaction.cpp index 7bc81d87b..9940e2fb1 100644 --- a/storage/tianmu/index/kv_transaction.cpp +++ b/storage/tianmu/index/kv_transaction.cpp @@ -37,22 +37,23 @@ rocksdb::Status KVTransaction::Get(rocksdb::ColumnFamilyHandle *column_family, c rocksdb::Status KVTransaction::Put(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key, const rocksdb::Slice &value) { - index_batch_->Put(column_family, key, value); - return rocksdb::Status::OK(); + return index_batch_->Put(column_family, key, value); } rocksdb::Status KVTransaction::Delete(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key) { - index_batch_->Delete(column_family, key); - return rocksdb::Status::OK(); + + return index_batch_->Delete(column_family, key); } rocksdb::Iterator *KVTransaction::GetIterator(rocksdb::ColumnFamilyHandle *const column_family, bool skip_filter) { + if (skip_filter) { read_opts_.total_order_seek = true; } else { read_opts_.total_order_seek = false; read_opts_.prefix_same_as_start = true; } + return index_batch_->NewIteratorWithBase(ha_kvstore_->GetRdb()->NewIterator(read_opts_, column_family)); } @@ -64,27 +65,29 @@ rocksdb::Status KVTransaction::GetData(rocksdb::ColumnFamilyHandle *column_famil rocksdb::Status KVTransaction::PutData(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key, const rocksdb::Slice &value) { - data_batch_->Put(column_family, key, value); - return rocksdb::Status::OK(); + + return data_batch_->Put(column_family, key, value); } rocksdb::Status KVTransaction::SingleDeleteData(rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key) { // notice: if a key is overwritten (by calling Put() multiple times), then the // result of calling SingleDelete() on this key is undefined, delete is better - data_batch_->SingleDelete(column_family, key); - return rocksdb::Status::OK(); + return data_batch_->SingleDelete(column_family, key); } rocksdb::Iterator *KVTransaction::GetDataIterator(rocksdb::ReadOptions &ropts, rocksdb::ColumnFamilyHandle *const column_family) { + return ha_kvstore_->GetRdb()->NewIterator(ropts, column_family); } void KVTransaction::Acquiresnapshot() { + if (read_opts_.snapshot == nullptr) read_opts_.snapshot = ha_kvstore_->GetRdbSnapshot(); } void KVTransaction::Releasesnapshot() { + if (read_opts_.snapshot != nullptr) { ha_kvstore_->ReleaseRdbSnapshot(read_opts_.snapshot); read_opts_.snapshot = nullptr; @@ -92,15 +95,23 @@ void KVTransaction::Releasesnapshot() { } bool KVTransaction::Commit() { + bool res = true; + //firstly, release the snapshot. Releasesnapshot(); + //if we have data to commit, then do writing index data ops by KVWriteBatch. auto index_write_batch = index_batch_->GetWriteBatch(); if (index_write_batch && index_write_batch->Count() > 0 && !ha_kvstore_->KVWriteBatch(write_opts_, index_write_batch)) { + //write failed. res = false; } + //write the data. if (res && data_batch_->Count() > 0 && !ha_kvstore_->KVWriteBatch(write_opts_, data_batch_.get())) { + //write failed. res = false; } + + //writes the data sucessfully, then clean up xxx_batch. index_batch_->Clear(); data_batch_->Clear(); return res; diff --git a/storage/tianmu/index/rc_table_index.cpp b/storage/tianmu/index/rc_table_index.cpp index 386661141..639bf558a 100644 --- a/storage/tianmu/index/rc_table_index.cpp +++ b/storage/tianmu/index/rc_table_index.cpp @@ -27,92 +27,21 @@ namespace Tianmu { namespace index { - -const std::string generate_cf_name(uint index, TABLE *table) { - char *comment = table->key_info[index].comment.str; - std::string key_comment = comment ? comment : ""; - std::string cf_name = RdbKey::parse_comment(key_comment); - if (cf_name.empty() && !key_comment.empty()) return key_comment; - - return cf_name; -} - -void create_rdbkey(TABLE *table, uint i, std::shared_ptr &new_key_def, rocksdb::ColumnFamilyHandle *cf_handle) { - uint index_id = ha_kvstore_->GetNextIndexId(); - std::vector vcols; - KEY *key_info = &table->key_info[i]; - bool unsigned_flag; - for (uint n = 0; n < key_info->actual_key_parts; n++) { - Field *f = key_info->key_part[n].field; - switch (f->type()) { - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_TINY: { - unsigned_flag = ((Field_num *)f)->unsigned_flag; - - } break; - default: - unsigned_flag = false; - break; - } - vcols.emplace_back(ColAttr{key_info->key_part[n].field->field_index, f->type(), unsigned_flag}); - } - - const char *const key_name = table->key_info[i].name; - uchar index_type = (i == table->s->primary_key) ? static_cast(enumIndexType::INDEX_TYPE_PRIMARY) - : static_cast(enumIndexType::INDEX_TYPE_SECONDARY); - uint16_t index_ver = (key_info->actual_key_parts > 1) - ? static_cast(enumIndexInfo::INDEX_INFO_VERSION_COLS) - : static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL); - new_key_def = std::make_shared(index_id, i, cf_handle, index_ver, index_type, false, key_name, vcols); -} - -// Create structures needed for storing data in rocksdb. This is called when the -// table is created. -common::ErrorCode create_keys_and_cf(TABLE *table, std::shared_ptr rdb_tbl) { - for (uint i = 0; i < rdb_tbl->m_rdbkeys.size(); i++) { - std::string cf_name = generate_cf_name(i, table); - if (cf_name == DEFAULT_SYSTEM_CF_NAME) - throw common::Exception("column family not valid for storing index data. cf: " + DEFAULT_SYSTEM_CF_NAME); - - rocksdb::ColumnFamilyHandle *cf_handle = ha_kvstore_->GetCfHandle(cf_name); - - if (!cf_handle) { - return common::ErrorCode::FAILED; - } - create_rdbkey(table, i, rdb_tbl->m_rdbkeys[i], cf_handle); - } - - return common::ErrorCode::SUCCESS; -} - -/* Returns index of primary key */ -uint pk_index(const TABLE *const table, std::shared_ptr tbl_def) { - return table->s->primary_key == MAX_INDEXES ? tbl_def->m_rdbkeys.size() - 1 : table->s->primary_key; -} - RCTableIndex::RCTableIndex(const std::string &name, TABLE *table) { std::string fullname; + //normalize the table name. + NormalizeName(name, fullname); + //does the table exists now. + rocksdb_tbl_ = ha_kvstore_->FindTable(fullname); + TIANMU_LOG(LogCtl_Level::WARN, "normalize tablename %s, table_full_name %s!", name.c_str(), fullname.c_str()); - if (!NormalizeName(name, fullname)) { - TIANMU_LOG(LogCtl_Level::WARN, "normalize tablename %s fail!", name.c_str()); - return; - } - tbl_ = ha_kvstore_->FindTable(fullname); - if (tbl_ == nullptr) { - TIANMU_LOG(LogCtl_Level::WARN, "find table %s fail!", fullname.c_str()); - return; - } keyid_ = table->s->primary_key; - rdbkey_ = tbl_->m_rdbkeys[pk_index(table, tbl_)]; + rocksdb_key_ = rocksdb_tbl_->GetRdbTableKeys().at(KVStore::pk_index(table, rocksdb_tbl_)); // compatible version that primary key make up of one part if (table->key_info[keyid_].actual_key_parts == 1) - cols_.push_back(table->key_info[keyid_].key_part[0].field->field_index); + index_of_columns_.push_back(table->key_info[keyid_].key_part[0].field->field_index); else - rdbkey_->get_key_cols(cols_); - enable_ = true; + rocksdb_key_->get_key_cols(index_of_columns_); } bool RCTableIndex::FindIndexTable(const std::string &name) { @@ -143,9 +72,9 @@ common::ErrorCode RCTableIndex::CreateIndexTable(const std::string &name, TABLE // Create table/key descriptions and put them into the data dictionary std::shared_ptr tbl = std::make_shared(str); - tbl->m_rdbkeys.resize(table->s->keys); + tbl->GetRdbTableKeys().resize(table->s->keys); - if (create_keys_and_cf(table, tbl) != common::ErrorCode::SUCCESS) { + if (KVStore::create_keys_and_cf(table, tbl) != common::ErrorCode::SUCCESS) { return common::ErrorCode::FAILED; } @@ -168,12 +97,12 @@ common::ErrorCode RCTableIndex::RefreshIndexTable(const std::string &name) { if (!NormalizeName(name, fullname)) { return common::ErrorCode::FAILED; } - tbl_ = ha_kvstore_->FindTable(fullname); - if (tbl_ == nullptr) { + rocksdb_tbl_ = ha_kvstore_->FindTable(fullname); + if (rocksdb_tbl_ == nullptr) { TIANMU_LOG(LogCtl_Level::WARN, "table %s init ddl error", fullname.c_str()); return common::ErrorCode::FAILED; } - rdbkey_ = tbl_->m_rdbkeys[keyid_]; + rocksdb_key_ = rocksdb_tbl_->GetRdbTableKeys().at(keyid_); return common::ErrorCode::SUCCESS; } @@ -195,15 +124,15 @@ void RCTableIndex::TruncateIndexTable() { rocksdb::ReadOptions ropts; ropts.total_order_seek = true; uchar key_buf[INDEX_NUMBER_SIZE]; - for (auto &rdbkey_ : tbl_->m_rdbkeys) { - be_store_index(key_buf, rdbkey_->get_gl_index_id().index_id); - auto cf = rdbkey_->get_cf(); + for (auto &rocksdb_key_ : rocksdb_tbl_->GetRdbTableKeys()) { + be_store_index(key_buf, rocksdb_key_->get_gl_index_id().index_id); + auto cf = rocksdb_key_->get_cf(); std::unique_ptr it(ha_kvstore_->GetScanIter(ropts, cf)); it->Seek({(const char *)key_buf, INDEX_NUMBER_SIZE}); while (it->Valid()) { auto key = it->key(); - if (!rdbkey_->covers_key(key)) { + if (!rocksdb_key_->covers_key(key)) { break; } if (!ha_kvstore_->KVDeleteKey(wopts, cf, key)) { @@ -217,16 +146,15 @@ void RCTableIndex::TruncateIndexTable() { common::ErrorCode RCTableIndex::CheckUniqueness(core::Transaction *tx, const rocksdb::Slice &pk_slice) { std::string retrieved_value; - rocksdb::Status s = tx->KVTrans().Get(rdbkey_->get_cf(), pk_slice, &retrieved_value); + rocksdb::Status s = tx->KVTrans().Get(rocksdb_key_->get_cf(), pk_slice, &retrieved_value); if (s.IsBusy() && !s.IsDeadlock()) { tx->KVTrans().Releasesnapshot(); tx->KVTrans().Acquiresnapshot(); - s = tx->KVTrans().Get(rdbkey_->get_cf(), pk_slice, &retrieved_value); + s = tx->KVTrans().Get(rocksdb_key_->get_cf(), pk_slice, &retrieved_value); } if (!s.ok() && !s.IsNotFound()) { - // TIANMU_LOG(LogCtl_Level::ERROR, "RockDb read fail:%s", s.ToString().c_str()); TIANMU_LOG(LogCtl_Level::ERROR, "RockDb read fail:%s", s.getState()); return common::ErrorCode::FAILED; } @@ -242,13 +170,13 @@ common::ErrorCode RCTableIndex::InsertIndex(core::Transaction *tx, std::vectorpack_key(key, fields, value); + rocksdb_key_->pack_key(key, fields, value); common::ErrorCode rc = CheckUniqueness(tx, {(const char *)key.ptr(), key.length()}); if (rc != common::ErrorCode::SUCCESS) return rc; value.write_uint64(row); - const auto cf = rdbkey_->get_cf(); + const auto cf = rocksdb_key_->get_cf(); const auto s = tx->KVTrans().Put(cf, {(const char *)key.ptr(), key.length()}, {(const char *)value.ptr(), value.length()}); if (!s.ok()) { @@ -266,10 +194,10 @@ common::ErrorCode RCTableIndex::UpdateIndex(core::Transaction *tx, std::string_v ofields.emplace_back(okey); nfields.emplace_back(nkey); - rdbkey_->pack_key(packkey, ofields, value); + rocksdb_key_->pack_key(packkey, ofields, value); common::ErrorCode rc = CheckUniqueness(tx, {(const char *)packkey.ptr(), packkey.length()}); if (rc == common::ErrorCode::DUPP_KEY) { - const auto cf = rdbkey_->get_cf(); + const auto cf = rocksdb_key_->get_cf(); tx->KVTrans().Delete(cf, {(const char *)packkey.ptr(), packkey.length()}); } else { TIANMU_LOG(LogCtl_Level::WARN, "RockDb: don't have the key for update!"); @@ -282,8 +210,8 @@ common::ErrorCode RCTableIndex::GetRowByKey(core::Transaction *tx, std::vectorpack_key(packkey, fields, info); - rocksdb::Status s = tx->KVTrans().Get(rdbkey_->get_cf(), {(const char *)packkey.ptr(), packkey.length()}, &value); + rocksdb_key_->pack_key(packkey, fields, info); + rocksdb::Status s = tx->KVTrans().Get(rocksdb_key_->get_cf(), {(const char *)packkey.ptr(), packkey.length()}, &value); if (!s.IsNotFound() && !s.ok()) { return common::ErrorCode::FAILED; @@ -293,7 +221,7 @@ common::ErrorCode RCTableIndex::GetRowByKey(core::Transaction *tx, std::vectorm_index_ver > static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL)) { + if (rocksdb_key_->GetIndexVersion() > static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL)) { uint16_t packlen; reader.read_uint16(&packlen); reader.read(packlen); @@ -304,35 +232,35 @@ common::ErrorCode RCTableIndex::GetRowByKey(core::Transaction *tx, std::vector tab, std::vector &fields, common::Operator op) { - if (!tab || !trans_) { + if (!tab || !txn_) { return; } StringWriter packkey, info; valid = true; - rdbkey_ = tab->rdbkey_; - rdbkey_->pack_key(packkey, fields, info); + rocksdb_key_ = tab->rocksdb_key_; + rocksdb_key_->pack_key(packkey, fields, info); rocksdb::Slice key_slice((const char *)packkey.ptr(), packkey.length()); - iter_ = std::shared_ptr(trans_->GetIterator(rdbkey_->get_cf(), true)); + iter_ = std::shared_ptr(txn_->GetIterator(rocksdb_key_->get_cf(), true)); switch (op) { case common::Operator::O_EQ: //== iter_->Seek(key_slice); - if (!iter_->Valid() || !rdbkey_->value_matches_prefix(iter_->key(), key_slice)) valid = false; + if (!iter_->Valid() || !rocksdb_key_->value_matches_prefix(iter_->key(), key_slice)) valid = false; break; case common::Operator::O_MORE_EQ: //'>=' iter_->Seek(key_slice); - if (!iter_->Valid() || !rdbkey_->covers_key(iter_->key())) valid = false; + if (!iter_->Valid() || !rocksdb_key_->covers_key(iter_->key())) valid = false; break; case common::Operator::O_MORE: //'>' iter_->Seek(key_slice); - if (!iter_->Valid() || rdbkey_->value_matches_prefix(iter_->key(), key_slice)) { - if (rdbkey_->m_is_reverse) { + if (!iter_->Valid() || rocksdb_key_->value_matches_prefix(iter_->key(), key_slice)) { + if (rocksdb_key_->IsInReverseOrder()) { iter_->Prev(); } else { iter_->Next(); } } - if (!iter_->Valid() || !rdbkey_->covers_key(iter_->key())) valid = false; + if (!iter_->Valid() || !rocksdb_key_->covers_key(iter_->key())) valid = false; break; default: TIANMU_LOG(LogCtl_Level::ERROR, "key not support this op:%d", op); @@ -342,37 +270,37 @@ void KeyIterator::ScanToKey(std::shared_ptr tab, std::vector tab, bool forward) { - if (!tab || !trans_) { + if (!tab || !txn_) { return; } valid = true; - rdbkey_ = tab->rdbkey_; - iter_ = std::shared_ptr(trans_->GetIterator(rdbkey_->get_cf(), true)); - std::string key = rdbkey_->get_boundary_key(forward); + rocksdb_key_ = tab->rocksdb_key_; + iter_ = std::shared_ptr(txn_->GetIterator(rocksdb_key_->get_cf(), true)); + std::string key = rocksdb_key_->get_boundary_key(forward); rocksdb::Slice key_slice((const char *)key.data(), key.length()); iter_->Seek(key_slice); if (forward) { - if (!iter_->Valid() || !rdbkey_->value_matches_prefix(iter_->key(), key_slice)) valid = false; + if (!iter_->Valid() || !rocksdb_key_->value_matches_prefix(iter_->key(), key_slice)) valid = false; } else { if (!iter_) valid = false; else { iter_->Prev(); - if (!iter_->Valid() || !rdbkey_->covers_key(iter_->key())) valid = false; + if (!iter_->Valid() || !rocksdb_key_->covers_key(iter_->key())) valid = false; } } } KeyIterator &KeyIterator::operator++() { - if (!iter_ || !iter_->Valid() || !rdbkey_) { + if (!iter_ || !iter_->Valid() || !rocksdb_key_) { valid = false; return *this; } iter_->Next(); - if (!iter_->Valid() || !rdbkey_->covers_key(iter_->key())) { + if (!iter_->Valid() || !rocksdb_key_->covers_key(iter_->key())) { valid = false; return *this; } @@ -381,13 +309,13 @@ KeyIterator &KeyIterator::operator++() { } KeyIterator &KeyIterator::operator--() { - if (!iter_ || !iter_->Valid() || !rdbkey_) { + if (!iter_ || !iter_->Valid() || !rocksdb_key_) { valid = false; return *this; } iter_->Prev(); - if (!iter_->Valid() || !rdbkey_->covers_key(iter_->key())) { + if (!iter_->Valid() || !rocksdb_key_->covers_key(iter_->key())) { valid = false; return *this; } @@ -399,7 +327,7 @@ common::ErrorCode KeyIterator::GetCurKV(std::vector &keys, uint64_t StringReader key({iter_->key().data(), iter_->key().size()}); StringReader value({iter_->value().data(), iter_->value().size()}); - common::ErrorCode ret = rdbkey_->unpack_key(key, value, keys); + common::ErrorCode ret = rocksdb_key_->unpack_key(key, value, keys); value.read_uint64(&row); return ret; } diff --git a/storage/tianmu/index/rc_table_index.h b/storage/tianmu/index/rc_table_index.h index 3340b39fc..f44dcb0d4 100644 --- a/storage/tianmu/index/rc_table_index.h +++ b/storage/tianmu/index/rc_table_index.h @@ -29,47 +29,50 @@ namespace Tianmu { namespace core { class Transaction; } + namespace index { class RdbKey; class RdbTable; class KVTransaction; + class RCTableIndex final { public: RCTableIndex(const RCTableIndex &) = delete; - RCTableIndex &operator=(RCTableIndex &) = delete; RCTableIndex(const std::string &name, TABLE *table); + + RCTableIndex &operator=(RCTableIndex &) = delete; RCTableIndex() = delete; - ~RCTableIndex() = default; + virtual ~RCTableIndex() = default; + + const std::vector &KeyCols() { return index_of_columns_; } - bool Enable() const { return enable_; } - const std::vector &KeyCols() { return cols_; } static common::ErrorCode CreateIndexTable(const std::string &name, TABLE *table); static common::ErrorCode DropIndexTable(const std::string &name); static bool FindIndexTable(const std::string &name); - static bool NormalLizeName(const std::string &path, std::string &name); + + void TruncateIndexTable(); + common::ErrorCode RefreshIndexTable(const std::string &name); common::ErrorCode RenameIndexTable(const std::string &from, const std::string &to); - void TruncateIndexTable(); common::ErrorCode InsertIndex(core::Transaction *tx, std::vector &fields, uint64_t row); common::ErrorCode UpdateIndex(core::Transaction *tx, std::string_view &nkey, std::string_view &okey, uint64_t row); common::ErrorCode GetRowByKey(core::Transaction *tx, std::vector &fields, uint64_t &row); - private: - common::ErrorCode CheckUniqueness(core::Transaction *tx, const rocksdb::Slice &pk_slice); - public: - std::shared_ptr tbl_; - std::shared_ptr rdbkey_; - std::vector cols_; - bool enable_ = false; + std::shared_ptr rocksdb_tbl_; + std::shared_ptr rocksdb_key_; + std::vector index_of_columns_; + uint keyid_ = 0; + private: + common::ErrorCode CheckUniqueness(core::Transaction *tx, const rocksdb::Slice &pk_slice); }; class KeyIterator final { public: KeyIterator() = delete; - KeyIterator(const KeyIterator &sec) : valid(sec.valid), iter_(sec.iter_), rdbkey_(sec.rdbkey_){}; - KeyIterator(KVTransaction *tx) : trans_(tx){}; + KeyIterator(const KeyIterator &sec) : valid(sec.valid), iter_(sec.iter_), rocksdb_key_(sec.rocksdb_key_){}; + KeyIterator(KVTransaction *tx) : txn_(tx){}; void ScanToKey(std::shared_ptr tab, std::vector &fields, common::Operator op); void ScanToEdge(std::shared_ptr tab, bool forward); common::ErrorCode GetCurKV(std::vector &keys, uint64_t &row); @@ -84,8 +87,8 @@ class KeyIterator final { protected: bool valid = false; std::shared_ptr iter_; - std::shared_ptr rdbkey_; - KVTransaction *trans_; + std::shared_ptr rocksdb_key_; + KVTransaction *txn_; }; } // namespace index diff --git a/storage/tianmu/index/rdb_meta_manager.cpp b/storage/tianmu/index/rdb_meta_manager.cpp index 62d1ab9b0..c2751eee1 100644 --- a/storage/tianmu/index/rdb_meta_manager.cpp +++ b/storage/tianmu/index/rdb_meta_manager.cpp @@ -37,30 +37,29 @@ namespace Tianmu { namespace index { -RdbKey::RdbKey(uint indexnr, uint keyno, rocksdb::ColumnFamilyHandle *cf_handle, uint16_t index_ver, uchar index_type, +RdbKey::RdbKey(uint pos, uint keyno, rocksdb::ColumnFamilyHandle *cf_handle, uint16_t index_ver, uchar index_type, bool is_reverse_cf, const char *_name, std::vector &cols) - : m_indexnr(indexnr), - m_cf_handle(cf_handle), - m_index_ver(index_ver), - m_index_type(index_type), - m_is_reverse(is_reverse_cf), - m_name(_name), - m_keyno(keyno), + : index_pos_(pos), + cf_handle_(cf_handle), + index_ver_(index_ver), + index_type_(index_type), + is_reverse_order_(is_reverse_cf), + key_name_(_name), cols_(cols) { - be_store_index(m_index_nr_be, m_indexnr); - ASSERT(m_cf_handle != nullptr, "m_cf_handle is NULL"); + be_store_index(index_pos_be_, index_pos_); + ASSERT(cf_handle_ != nullptr, "cf_handle_ is NULL"); } RdbKey::RdbKey(const RdbKey &k) - : m_indexnr(k.m_indexnr), - m_cf_handle(k.m_cf_handle), - m_index_ver(k.m_index_ver), - m_is_reverse(k.m_is_reverse), - m_name(k.m_name), - m_keyno(k.m_keyno), + : index_pos_(k.index_pos_), + cf_handle_(k.cf_handle_), + index_ver_(k.index_ver_), + is_reverse_order_(k.is_reverse_order_), + key_name_(k.key_name_), cols_(k.cols_) { - be_store_index(m_index_nr_be, m_indexnr); + + be_store_index(index_pos_be_, index_pos_); } const std::vector RdbKey::parse_into_tokens(const std::string &s, const char delim) { @@ -75,26 +74,30 @@ const std::vector RdbKey::parse_into_tokens(const std::string &s, c } const std::string RdbKey::parse_comment(const std::string &comment) { - std::string empty_result; + std::string result; if (comment.empty()) { - return empty_result; + return result; } std::vector v = parse_into_tokens(comment, QUALIFIER_SEP); - std::string search = CF_NAME_QUALIFIER; + std::string qualifier = CF_NAME_QUALIFIER; + for (const auto &it : v) { - if (it.substr(0, search.length()) == search) { - std::vector tokens = parse_into_tokens(it, QUALIFIER_VALUE_SEP); - if (tokens.size() == 2) { - return tokens[1]; - } else { - return empty_result; - } - } - } + //can not find the qualifier, return; + if (it.substr(0, qualifier.length()) != qualifier) + break; + + std::vector tokens = parse_into_tokens(it, QUALIFIER_VALUE_SEP); + if (tokens.size() == 2) { + //tokens[0] is comment char. + result = tokens[1]; + break; + }//if tokens.size() == 2 + } //for - return empty_result; + return result; } + void RdbKey::get_key_cols(std::vector &cols) { for (auto &col : cols_) { cols.push_back(col.col_no); @@ -102,48 +105,54 @@ void RdbKey::get_key_cols(std::vector &cols) { } void RdbKey::pack_field_number(StringWriter &key, std::string_view &field, uchar flag) { + uchar tuple[INTSIZE] = {0}; copy_integer(tuple, INTSIZE, (const uchar *)field.data(), INTSIZE, flag); key.write(tuple, sizeof(tuple)); } -int RdbKey::unpack_field_number(StringReader &key, std::string &field, uchar flag) { +common::ErrorCode RdbKey::unpack_field_number(StringReader &key, std::string &field, uchar flag) { const uchar *from; - if (!(from = (const uchar *)key.read(INTSIZE))) return -1; + if (!(from = (const uchar *)key.read(INTSIZE))) return common::ErrorCode::FAILED; int64_t value = 0; char *buf = reinterpret_cast(&value); char sign_byte = from[0]; - if (flag) - buf[INTSIZE - 1] = sign_byte; - else - buf[INTSIZE - 1] = sign_byte ^ 128; // Reverse the sign bit. + + flag? buf[INTSIZE - 1] = sign_byte : buf[INTSIZE - 1] = sign_byte ^ 128; // Reverse the sign bit. + for (uint i = 0, j = INTSIZE - 1; i < INTSIZE - 1; ++i, --j) buf[i] = from[j]; field.append(reinterpret_cast(&value), INTSIZE); - return 0; + return common::ErrorCode::SUCCESS; } void RdbKey::pack_field_string(StringWriter &info, StringWriter &key, std::string_view &field) { // version compatible - if (m_index_ver == static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL)) { + if (index_ver_ == static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL)) { key.write((const uchar *)field.data(), field.length()); return; } + + //pack the string into data. StringReader data(field); size_t pad_bytes = 0; while (true) { size_t copy_len = std::min(CHUNKSIZE - 1, data.remain_len()); pad_bytes = CHUNKSIZE - 1 - copy_len; + //write the data len. key.write((const uchar *)data.read(copy_len), copy_len); + Separator separator; - if (pad_bytes) { + if (pad_bytes) { //not full of A pack string. + //write the data. key.write((const uchar *)SPACE.data(), pad_bytes); separator = Separator::EQ_SPACES; - } else { + } else { //a full pack string. int cmp = 0; size_t bytes = std::min(CHUNKSIZE - 1, data.remain_len()); if (bytes > 0) cmp = memcmp(data.current_ptr(), SPACE.data(), bytes); + if (cmp < 0) { separator = Separator::LE_SPACES; } else if (cmp > 0) { @@ -153,6 +162,7 @@ void RdbKey::pack_field_string(StringWriter &info, StringWriter &key, std::strin separator = Separator::EQ_SPACES; } } + key.write_uint8(static_cast(separator)); // last segment if (separator == Separator::EQ_SPACES) break; @@ -161,14 +171,14 @@ void RdbKey::pack_field_string(StringWriter &info, StringWriter &key, std::strin info.write_uint16(pad_bytes); } -int RdbKey::unpack_field_string(StringReader &key, StringReader &info, std::string &field) { +common::ErrorCode RdbKey::unpack_field_string(StringReader &key, StringReader &info, std::string &field) { bool finished = false; const char *ptr; uint16_t pad_bytes = 0; // version compatible - if (m_index_ver == static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL)) { + if (index_ver_ == static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL)) { field.append(key.current_ptr(), key.remain_len()); - return 0; + return common::ErrorCode::SUCCESS; } // Decode the length of padding bytes of field for value info.read_uint16(&pad_bytes); @@ -179,24 +189,24 @@ int RdbKey::unpack_field_string(StringReader &key, StringReader &info, std::stri if (last_byte == static_cast(Separator::EQ_SPACES)) { // this is the last segment - if (pad_bytes > (CHUNKSIZE - 1)) return -1; + if (pad_bytes > (CHUNKSIZE - 1)) + return common::ErrorCode::FAILED; used_bytes = (CHUNKSIZE - 1) - pad_bytes; finished = true; } else { if (last_byte != static_cast(Separator::LE_SPACES) && last_byte != static_cast(Separator::GE_SPACES)) - return -1; + return common::ErrorCode::FAILED; used_bytes = CHUNKSIZE - 1; } // Now, need to decode used_bytes of data and append them to the value. - field.append(ptr, used_bytes); if (finished) { break; } } - return 0; + return common::ErrorCode::SUCCESS; } // cmp packed @@ -204,9 +214,9 @@ void RdbKey::pack_key(StringWriter &key, std::vector &fields, ASSERT(cols_.size() >= fields.size(), "fields size larger than keyparts size"); key.clear(); info.clear(); - key.write_uint32(m_indexnr); + key.write_uint32(index_pos_); // version compatible - if (m_index_ver > static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL)) info.write_uint16(0); + if (index_ver_ > static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL)) info.write_uint16(0); size_t pos = info.length(); for (uint i = 0; i < fields.size(); i++) { @@ -232,6 +242,7 @@ void RdbKey::pack_key(StringWriter &key, std::vector &fields, pack_field_number(key, fields[i], cols_[i].col_flag); break; } + case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: @@ -242,13 +253,14 @@ void RdbKey::pack_key(StringWriter &key, std::vector &fields, pack_field_string(info, key, fields[i]); break; } + default: break; } } // version compatible - if (m_index_ver > static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL)) { + if (index_ver_ > static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL)) { // process packinfo len size_t len = info.length() - pos; info.write_uint16_at(0, len); @@ -261,7 +273,7 @@ common::ErrorCode RdbKey::unpack_key(StringReader &key, StringReader &value, std key.read_uint32(&index_number); // version compatible - if (m_index_ver > static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL)) value.read_uint16(&info_len); + if (index_ver_ > static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL)) value.read_uint16(&info_len); for (auto &col : cols_) { std::string field; @@ -284,7 +296,7 @@ common::ErrorCode RdbKey::unpack_key(StringReader &key, StringReader &value, std case MYSQL_TYPE_DOUBLE: case MYSQL_TYPE_FLOAT: case MYSQL_TYPE_NEWDECIMAL: { - if (unpack_field_number(key, field, col.col_flag) < 0) { + if (unpack_field_number(key, field, col.col_flag) != common::ErrorCode::SUCCESS) { TIANMU_LOG(LogCtl_Level::ERROR, "unpack numeric field failed!"); return common::ErrorCode::FAILED; } @@ -298,16 +310,18 @@ common::ErrorCode RdbKey::unpack_key(StringReader &key, StringReader &value, std case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_STRING: { // case sensitive for character - if (unpack_field_string(key, value, field) < 0) { + if (unpack_field_string(key, value, field) != common::ErrorCode::SUCCESS) { TIANMU_LOG(LogCtl_Level::ERROR, "unpack string field failed!"); return common::ErrorCode::FAILED; } } break; + default: break; } fields.emplace_back(field); } + return common::ErrorCode::SUCCESS; } @@ -317,28 +331,33 @@ RdbTable::RdbTable(const rocksdb::Slice &slice, const size_t &pos) { set_name(std::string(slice.data() + pos, slice.size() - pos)); } -RdbTable::~RdbTable() { m_rdbkeys.clear(); } +RdbTable::~RdbTable() { rdb_keys_.clear(); } // Put table definition DDL entry. Actual write is done at DICTManager::commit void RdbTable::put_dict(DICTManager *dict, rocksdb::WriteBatch *const batch, uchar *const key, size_t keylen) { + StringWriter value; - value.write_uint16(static_cast(enumVersion::DDL_VERSION)); + //write the ddl version firstly. + value.write_uint16(static_cast(VersionType::DDL_VERSION)); - for (auto &kd : m_rdbkeys) { - uchar flags = (kd->m_is_reverse ? REVERSE_CF_FLAG : 0); + for (auto &kd : rdb_keys_) { + uchar flags = (kd->IsInReverseOrder() ? REVERSE_CF_FLAG : 0); const uint cf_id = kd->get_cf()->GetID(); if (!if_exist_cf(dict)) { dict->add_cf_flags(batch, cf_id, flags); } + //write column family id. value.write_uint32(cf_id); - value.write_uint32(kd->m_indexnr); - dict->save_index_info(batch, kd->m_index_ver, kd->m_index_type, kd->m_indexnr, cf_id, kd->cols_); + //index pos. + value.write_uint32(kd->GetIndexPos()); + dict->save_index_info(batch, kd->GetIndexVersion(), kd->GetIndexType(), kd->GetIndexPos(), cf_id, kd->cols_); } + //put the index key. dict->put_key(batch, {(char *)key, keylen}, {(char *)value.ptr(), value.length()}); } bool RdbTable::if_exist_cf(DICTManager *dict) { - for (auto &kd : m_rdbkeys) { + for (auto &kd : rdb_keys_) { uint32_t flags; const uint cf_id = kd->get_cf()->GetID(); if (dict->get_cf_flags(cf_id, flags)) { @@ -350,25 +369,27 @@ bool RdbTable::if_exist_cf(DICTManager *dict) { void RdbTable::set_name(const std::string &name) { // Normalize dbname.tablename. - m_fullname = name; + full_name_ = name; size_t dotpos = name.find('.'); if (dotpos == std::string::npos) { TIANMU_LOG(LogCtl_Level::ERROR, "table name :%s format wrong", name.data()); return; } - m_dbname = name.substr(0, dotpos); - m_tablename = name.substr(++dotpos); + db_name_ = name.substr(0, dotpos); + table_name_ = name.substr(++dotpos); } bool DDLManager::init(DICTManager *const dict, CFManager *const cf_manager_) { - m_dict = dict; - m_cf = cf_manager_; - uchar ddl_entry[INDEX_NUMBER_SIZE]; + + dict_ = dict; + cf_ = cf_manager_; + uchar ddl_entry[INDEX_NUMBER_SIZE] = {0}; + //write meta type : ddl index. be_store_index(ddl_entry, static_cast(MetaType::DDL_INDEX)); - std::shared_ptr it = m_dict->new_iterator(); + std::shared_ptr it = dict_->new_iterator(); uint max_index_id = 0; - m_dict->get_max_index_id(&max_index_id); + dict_->get_max_index_id(&max_index_id); for (it->Seek({(char *)ddl_entry, INDEX_NUMBER_SIZE}); it->Valid(); it->Next()) { const uchar *ptr, *ptr_end; @@ -390,17 +411,18 @@ bool DDLManager::init(DICTManager *const dict, CFManager *const cf_manager_) { return false; } - tdef->m_rdbkeys.resize(real_val_size / sizeof(GlobalId)); + tdef->GetRdbTableKeys().resize(real_val_size / sizeof(GlobalId)); ptr = reinterpret_cast(val.data()); const int version = be_read_uint16(&ptr); - if (version != static_cast(enumVersion::DDL_VERSION)) { + if (version != static_cast(VersionType::DDL_VERSION)) { TIANMU_LOG(LogCtl_Level::ERROR, "RocksDB: DDL ENTRY Version was not expected.Expected: %d, " "Actual: %d", - static_cast(enumVersion::DDL_VERSION), version); + static_cast(VersionType::DDL_VERSION), version); return false; } + ptr_end = ptr + real_val_size; for (uint keyno = 0; ptr < ptr_end; keyno++) { GlobalId gl_index_id; @@ -409,7 +431,7 @@ bool DDLManager::init(DICTManager *const dict, CFManager *const cf_manager_) { uint32_t flags = 0; uchar index_type = 0; std::vector vcols; - if (!m_dict->get_index_info(gl_index_id, index_ver, index_type, vcols)) { + if (!dict_->get_index_info(gl_index_id, index_ver, index_type, vcols)) { TIANMU_LOG(LogCtl_Level::ERROR, "RocksDB: Could not get INDEXINFO for Index Number " "(%u,%u), table %s", @@ -432,23 +454,24 @@ bool DDLManager::init(DICTManager *const dict, CFManager *const cf_manager_) { gl_index_id.cf_id, tdef->fullname().c_str()); gl_index_id.cf_id = 0; } - if (!m_dict->get_cf_flags(gl_index_id.cf_id, flags)) { + if (!dict_->get_cf_flags(gl_index_id.cf_id, flags)) { TIANMU_LOG(LogCtl_Level::ERROR, "RocksDB: Could not get Column Family Flags for CF Number " "%d, table %s", gl_index_id.cf_id, tdef->fullname().c_str()); return false; } + rocksdb::ColumnFamilyHandle *const cfh = cf_manager_->get_cf_by_id(gl_index_id.cf_id); - tdef->m_rdbkeys[keyno] = std::make_shared(gl_index_id.index_id, keyno, cfh, index_ver, index_type, + tdef->GetRdbTableKeys().at(keyno) = std::make_shared(gl_index_id.index_id, keyno, cfh, index_ver, index_type, flags & REVERSE_CF_FLAG, "", vcols); } put(tdef); } // create memory table - std::scoped_lock guard(m_mem_lock); + std::scoped_lock guard(mem_lock_); uchar mem_table_entry[INDEX_NUMBER_SIZE]; be_store_index(mem_table_entry, static_cast(MetaType::DDL_MEMTABLE)); const rocksdb::Slice mem_table_entry_slice((char *)mem_table_entry, INDEX_NUMBER_SIZE); @@ -464,11 +487,11 @@ bool DDLManager::init(DICTManager *const dict, CFManager *const cf_manager_) { const uchar *ptr = reinterpret_cast(val.data()); const int version = be_read_uint16(&ptr); - if (version != static_cast(enumVersion::DDL_VERSION)) { + if (version != static_cast(VersionType::DDL_VERSION)) { TIANMU_LOG(LogCtl_Level::ERROR, "RocksDB: DDL MEMTABLE ENTRY Version was not expected or " "currupt.Expected: %d, Actual: %d", - static_cast(enumVersion::DDL_VERSION), version); + static_cast(VersionType::DDL_VERSION), version); return false; } @@ -481,23 +504,23 @@ bool DDLManager::init(DICTManager *const dict, CFManager *const cf_manager_) { return false; } std::shared_ptr tb_mem = std::make_shared(table_name, memtable_id, cf_id); - m_mem_hash[table_name] = tb_mem; + mem_hash_[table_name] = tb_mem; } if (max_index_id < static_cast(MetaType::END_DICT_INDEX_ID)) { max_index_id = static_cast(MetaType::END_DICT_INDEX_ID); } - m_sequence.init(max_index_id + 1); + seq_gen_.init(max_index_id + 1); return true; } std::shared_ptr DDLManager::find(const std::string &table_name) { std::shared_ptr rec; - std::scoped_lock guard(m_lock); + std::scoped_lock guard(lock_); - auto iter = m_ddl_hash.find(table_name); - if (iter != m_ddl_hash.end()) rec = iter->second; + auto iter = ddl_hash_.find(table_name); + if (iter != ddl_hash_.end()) rec = iter->second; return rec; } @@ -511,19 +534,19 @@ void DDLManager::put_and_write(std::shared_ptr tbl, rocksdb::WriteBatc const std::string &dbname_tablename = tbl->fullname(); key.write((uchar *)dbname_tablename.data(), dbname_tablename.length()); - tbl->put_dict(m_dict, batch, key.ptr(), key.length()); + tbl->put_dict(dict_, batch, key.ptr(), key.length()); put(tbl); } void DDLManager::put(std::shared_ptr tbl) { - std::scoped_lock guard(m_lock); + std::scoped_lock guard(lock_); const std::string &dbname_tablename = tbl->fullname(); - m_ddl_hash[dbname_tablename] = tbl; + ddl_hash_[dbname_tablename] = tbl; } void DDLManager::remove(std::shared_ptr tbl, rocksdb::WriteBatch *const batch) { - std::scoped_lock guard(m_lock); + std::scoped_lock guard(lock_); uchar buf[FN_LEN * 2 + INDEX_NUMBER_SIZE]; uint pos = 0; @@ -535,22 +558,22 @@ void DDLManager::remove(std::shared_ptr tbl, rocksdb::WriteBatch *cons memcpy(buf + pos, dbname_tablename.c_str(), dbname_tablename.size()); pos += dbname_tablename.size(); - m_dict->delete_key(batch, {(const char *)buf, pos}); - auto iter = m_ddl_hash.find(dbname_tablename); - if (iter != m_ddl_hash.end()) { - m_ddl_hash.erase(iter); + dict_->delete_key(batch, {(const char *)buf, pos}); + auto iter = ddl_hash_.find(dbname_tablename); + if (iter != ddl_hash_.end()) { + ddl_hash_.erase(iter); } } bool DDLManager::rename(const std::string &from, const std::string &to, rocksdb::WriteBatch *const batch) { - std::scoped_lock guard(m_lock); + std::scoped_lock guard(lock_); std::shared_ptr rec = find(from); if (!rec) { return false; } std::shared_ptr new_rec = std::make_shared(to); - new_rec->m_rdbkeys = rec->m_rdbkeys; + new_rec->GetRdbTableKeys() = rec->GetRdbTableKeys(); // Create a new key StringWriter key; @@ -558,38 +581,39 @@ bool DDLManager::rename(const std::string &from, const std::string &to, rocksdb: const std::string &dbname_tablename = new_rec->fullname(); key.write((uchar *)dbname_tablename.data(), dbname_tablename.length()); - if (rec->if_exist_cf(m_dict)) { + if (rec->if_exist_cf(dict_)) { remove(rec, batch); - new_rec->put_dict(m_dict, batch, key.ptr(), key.length()); + new_rec->put_dict(dict_, batch, key.ptr(), key.length()); put(new_rec); } else { TIANMU_LOG(LogCtl_Level::WARN, "Rename table:%s have no cf_definition", from.data()); } + return true; } void DDLManager::cleanup() { { - std::scoped_lock rdb_guard(m_lock); - m_ddl_hash.clear(); + std::scoped_lock rdb_guard(lock_); + ddl_hash_.clear(); } { - std::scoped_lock mem_guard(m_mem_lock); - m_mem_hash.clear(); + std::scoped_lock mem_guard(mem_lock_); + mem_hash_.clear(); } } std::shared_ptr DDLManager::find_mem(const std::string &table_name) { - std::scoped_lock guard(m_mem_lock); + std::scoped_lock guard(mem_lock_); - auto iter = m_mem_hash.find(table_name); - if (iter != m_mem_hash.end()) return iter->second; + auto iter = mem_hash_.find(table_name); + if (iter != mem_hash_.end()) return iter->second; return nullptr; } void DDLManager::put_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch) { - std::scoped_lock guard(m_mem_lock); + std::scoped_lock guard(mem_lock_); StringWriter key; std::string table_name = tb_mem->FullName(); @@ -597,61 +621,62 @@ void DDLManager::put_mem(std::shared_ptr tb_mem, rocksdb::Writ key.write((const uchar *)table_name.c_str(), table_name.size()); StringWriter value; - value.write_uint16(static_cast(enumVersion::DDL_VERSION)); + value.write_uint16(static_cast(VersionType::DDL_VERSION)); value.write_uint32(tb_mem->GetCFHandle()->GetID()); value.write_uint32(tb_mem->GetMemID()); - m_dict->put_key(batch, {(char *)key.ptr(), key.length()}, {(char *)value.ptr(), value.length()}); - m_mem_hash[table_name] = tb_mem; + dict_->put_key(batch, {(char *)key.ptr(), key.length()}, {(char *)value.ptr(), value.length()}); + mem_hash_[table_name] = tb_mem; } void DDLManager::remove_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch) { - std::scoped_lock guard(m_mem_lock); + std::scoped_lock guard(mem_lock_); StringWriter key; const std::string &table_name = tb_mem->FullName(); key.write_uint32(static_cast(MetaType::DDL_MEMTABLE)); key.write((const uchar *)table_name.c_str(), table_name.size()); - m_dict->delete_key(batch, {(const char *)key.ptr(), key.length()}); + dict_->delete_key(batch, {(const char *)key.ptr(), key.length()}); - auto iter = m_mem_hash.find(table_name); - if (iter != m_mem_hash.end()) { - m_mem_hash.erase(iter); + auto iter = mem_hash_.find(table_name); + if (iter != mem_hash_.end()) { + mem_hash_.erase(iter); } } bool DDLManager::rename_mem(std::string &from, std::string &to, rocksdb::WriteBatch *const batch) { - std::scoped_lock guard(m_mem_lock); + std::scoped_lock guard(mem_lock_); StringWriter skey; skey.write_uint32(static_cast(MetaType::DDL_MEMTABLE)); skey.write((const uchar *)from.c_str(), from.size()); std::string origin_value; - m_dict->get_value({(const char *)skey.ptr(), skey.length()}, &origin_value); - m_dict->delete_key(batch, {(const char *)skey.ptr(), skey.length()}); + dict_->get_value({(const char *)skey.ptr(), skey.length()}, &origin_value); + dict_->delete_key(batch, {(const char *)skey.ptr(), skey.length()}); StringWriter dkey; dkey.write_uint32(static_cast(MetaType::DDL_MEMTABLE)); dkey.write((const uchar *)to.c_str(), to.size()); - m_dict->put_key(batch, {(const char *)dkey.ptr(), dkey.length()}, origin_value); + dict_->put_key(batch, {(const char *)dkey.ptr(), dkey.length()}, origin_value); - auto iter = m_mem_hash.find(from); - if (iter == m_mem_hash.end()) return false; + auto iter = mem_hash_.find(from); + if (iter == mem_hash_.end()) return false; auto tb_mem = iter->second; - m_mem_hash.erase(iter); - m_mem_hash[to] = tb_mem; + mem_hash_.erase(iter); + mem_hash_[to] = tb_mem; return true; } bool DICTManager::init(rocksdb::DB *const rdb_dict, CFManager *const cf_manager_) { - m_db = rdb_dict; - m_system_cfh = cf_manager_->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME); - if (m_system_cfh == nullptr) return false; + db_ = rdb_dict; + system_cf_ = cf_manager_->get_or_create_cf(db_, DEFAULT_SYSTEM_CF_NAME); + + if (system_cf_ == nullptr) return false; - be_store_index(m_max_index, static_cast(MetaType::MAX_INDEX_ID)); + be_store_index(max_index_, static_cast(MetaType::MAX_INDEX_ID)); return true; } @@ -662,23 +687,23 @@ std::unique_ptr DICTManager::begin() const { void DICTManager::put_key(rocksdb::WriteBatchBase *const batch, const rocksdb::Slice &key, const rocksdb::Slice &value) const { - batch->Put(m_system_cfh, key, value); + batch->Put(system_cf_, key, value); } rocksdb::Status DICTManager::get_value(const rocksdb::Slice &key, std::string *const value) const { rocksdb::ReadOptions options; options.total_order_seek = true; - return m_db->Get(options, m_system_cfh, key, value); + return db_->Get(options, system_cf_, key, value); } void DICTManager::delete_key(rocksdb::WriteBatchBase *batch, const rocksdb::Slice &key) const { - batch->Delete(m_system_cfh, key); + batch->Delete(system_cf_, key); } std::shared_ptr DICTManager::new_iterator() const { rocksdb::ReadOptions read_options; read_options.total_order_seek = true; - return std::shared_ptr(m_db->NewIterator(read_options, m_system_cfh)); + return std::shared_ptr(db_->NewIterator(read_options, system_cf_)); } bool DICTManager::commit(rocksdb::WriteBatch *const batch, const bool &sync) const { @@ -686,11 +711,12 @@ bool DICTManager::commit(rocksdb::WriteBatch *const batch, const bool &sync) con rocksdb::WriteOptions options; options.sync = sync; - rocksdb::Status s = m_db->Write(options, batch); + rocksdb::Status s = db_->Write(options, batch); bool res = s.ok(); if (!res) { TIANMU_LOG(LogCtl_Level::ERROR, "DICTManager::commit error"); } + batch->Clear(); return res; } @@ -719,7 +745,7 @@ void DICTManager::save_index_info(rocksdb::WriteBatch *batch, uint16_t index_ver value.write_uint16(index_ver); value.write_uint8(index_type); - if (index_ver > static_cast(enumIndexInfo::INDEX_INFO_VERSION_INITIAL)) { + if (index_ver > static_cast(IndexInfoType::INDEX_INFO_VERSION_INITIAL)) { value.write_uint32(cols.size()); for (auto &col : cols) { value.write_uint16(col.col_no); @@ -731,7 +757,7 @@ void DICTManager::save_index_info(rocksdb::WriteBatch *batch, uint16_t index_ver value.write_uint8(cols[0].col_flag); } - batch->Put(m_system_cfh, {(char *)dumpid.ptr(), dumpid.length()}, {(char *)value.ptr(), value.length()}); + batch->Put(system_cf_, {(char *)dumpid.ptr(), dumpid.length()}, {(char *)value.ptr(), value.length()}); } void DICTManager::add_cf_flags(rocksdb::WriteBatch *const batch, const uint32_t &cf_id, @@ -742,10 +768,10 @@ void DICTManager::add_cf_flags(rocksdb::WriteBatch *const batch, const uint32_t key.write_uint32(cf_id); StringWriter value; - value.write_uint16(static_cast(enumVersion::CF_VERSION)); + value.write_uint16(static_cast(VersionType::CF_VERSION)); value.write_uint32(cf_flags); - batch->Put(m_system_cfh, {(char *)key.ptr(), key.length()}, {(char *)value.ptr(), value.length()}); + batch->Put(system_cf_, {(char *)key.ptr(), key.length()}, {(char *)value.ptr(), value.length()}); } void DICTManager::delete_index_info(rocksdb::WriteBatch *batch, const GlobalId &gl_index_id) const { @@ -766,8 +792,8 @@ bool DICTManager::get_index_info(const GlobalId &gl_index_id, uint16_t &index_ve StringReader reader(value); reader.read_uint16(&index_ver); - switch (static_cast(index_ver)) { - case enumIndexInfo::INDEX_INFO_VERSION_INITIAL: { + switch (static_cast(index_ver)) { + case IndexInfoType::INDEX_INFO_VERSION_INITIAL: { uchar type; uchar flag; reader.read_uint8(&index_type); @@ -778,7 +804,7 @@ bool DICTManager::get_index_info(const GlobalId &gl_index_id, uint16_t &index_ve found = true; break; } - case enumIndexInfo::INDEX_INFO_VERSION_COLS: { + case IndexInfoType::INDEX_INFO_VERSION_COLS: { uint32_t cols_sz = 0; reader.read_uint8(&index_type); reader.read_uint32(&cols_sz); @@ -796,11 +822,13 @@ bool DICTManager::get_index_info(const GlobalId &gl_index_id, uint16_t &index_ve break; } } + if (error) { TIANMU_LOG(LogCtl_Level::ERROR, "RocksDB: Found invalid key version number (%u, %u) ", index_ver, index_type); } return found; } + bool DICTManager::get_cf_flags(const uint32_t &cf_id, uint32_t &cf_flags) const { bool found = false; std::string value; @@ -814,7 +842,7 @@ bool DICTManager::get_cf_flags(const uint32_t &cf_id, uint32_t &cf_flags) const uint16_t version = 0; StringReader reader(value); reader.read_uint16(&version); - if (version == static_cast(enumVersion::CF_VERSION)) { + if (version == static_cast(VersionType::CF_VERSION)) { reader.read_uint32(&cf_flags); found = true; } @@ -844,12 +872,12 @@ bool DICTManager::get_max_index_id(uint32_t *const index_id) const { bool found = false; std::string value; - const rocksdb::Status status = get_value({(char *)m_max_index, INDEX_NUMBER_SIZE}, &value); + const rocksdb::Status status = get_value({(char *)max_index_, INDEX_NUMBER_SIZE}, &value); if (status.ok()) { uint16_t version = 0; StringReader reader({value.data(), value.length()}); reader.read_uint16(&version); - if (version == static_cast(enumVersion::MAX_INDEX_ID_VERSION)) { + if (version == static_cast(VersionType::MAX_INDEX_ID_VERSION)) { reader.read_uint32(index_id); found = true; } @@ -867,9 +895,9 @@ bool DICTManager::update_max_index_id(rocksdb::WriteBatch *const batch, const ui } } StringWriter value; - value.write_uint16(static_cast(enumVersion::MAX_INDEX_ID_VERSION)); + value.write_uint16(static_cast(VersionType::MAX_INDEX_ID_VERSION)); value.write_uint32(index_id); - batch->Put(m_system_cfh, {(char *)m_max_index, INDEX_NUMBER_SIZE}, {(char *)value.ptr(), value.length()}); + batch->Put(system_cf_, {(char *)max_index_, INDEX_NUMBER_SIZE}, {(char *)value.ptr(), value.length()}); return false; } @@ -884,7 +912,7 @@ void DICTManager::get_ongoing_index(std::vector &ids, MetaType dd_type rocksdb::ReadOptions read_options; read_options.iterate_upper_bound = &upper_slice; - auto it = std::shared_ptr(m_db->NewIterator(read_options, m_system_cfh)); + auto it = std::shared_ptr(db_->NewIterator(read_options, system_cf_)); for (it->Seek({reinterpret_cast(index_buf), INDEX_NUMBER_SIZE}); it->Valid(); it->Next()) { rocksdb::Slice key = it->key(); const uchar *const ptr = (const uchar *)key.data(); @@ -905,12 +933,12 @@ void DICTManager::start_ongoing_index(rocksdb::WriteBatch *const batch, const Gl StringWriter value; // version as needed if (dd_type == MetaType::DDL_DROP_INDEX_ONGOING) { - value.write_uint16(static_cast(enumVersion::DROP_INDEX_ONGOING_VERSION)); + value.write_uint16(static_cast(VersionType::DROP_INDEX_ONGOING_VERSION)); } else { - value.write_uint16(static_cast(enumVersion::CREATE_INDEX_ONGOING_VERSION)); + value.write_uint16(static_cast(VersionType::CREATE_INDEX_ONGOING_VERSION)); } - batch->Put(m_system_cfh, {(char *)dumpid.ptr(), dumpid.length()}, {(char *)value.ptr(), value.length()}); + batch->Put(system_cf_, {(char *)dumpid.ptr(), dumpid.length()}, {(char *)value.ptr(), value.length()}); } void DICTManager::end_ongoing_index(rocksdb::WriteBatch *const batch, const GlobalId &id, MetaType dd_type) const { @@ -936,9 +964,9 @@ bool DICTManager::is_drop_index_ongoing(const GlobalId &gl_index_id, MetaType dd uint SeqGenerator::get_and_update_next_number(DICTManager *const dict) { uint res; - std::scoped_lock guard(m_mutex); - - res = m_next_number++; + //it not global or shared var, why do we use mutext to protect it? non-sense + //std::scoped_lock guard(seq_mutex_); + res = next_number_++; const std::unique_ptr wb = dict->begin(); rocksdb::WriteBatch *const batch = wb.get(); @@ -950,13 +978,13 @@ uint SeqGenerator::get_and_update_next_number(DICTManager *const dict) { void CFManager::init(std::vector &handles) { for (auto cfh : handles) { - m_cf_name_map[cfh->GetName()] = cfh; - m_cf_id_map[cfh->GetID()] = cfh; + cf_name_map_[cfh->GetName()] = cfh; + cf_id_map_[cfh->GetID()] = cfh; } } void CFManager::cleanup() { - for (auto it : m_cf_name_map) { + for (auto it : cf_name_map_) { delete it.second; } } @@ -966,18 +994,18 @@ rocksdb::ColumnFamilyHandle *CFManager::get_or_create_cf(rocksdb::DB *const rdb_ rocksdb::ColumnFamilyHandle *cf_handle = nullptr; const std::string &cf_name = cf_name_arg.empty() ? DEFAULT_CF_NAME : cf_name_arg; - std::scoped_lock guard(m_mutex); + std::scoped_lock guard(cf_mutex_); - auto it = m_cf_name_map.find(cf_name); - if (it != m_cf_name_map.end()) { + auto it = cf_name_map_.find(cf_name); + if (it != cf_name_map_.end()) { cf_handle = it->second; } else { rocksdb::ColumnFamilyOptions opts; if (!IsRowStoreCF(cf_name)) opts.compaction_filter_factory.reset(new index::IndexCompactFilterFactory); const rocksdb::Status s = rdb_->CreateColumnFamily(opts, cf_name, &cf_handle); if (s.ok()) { - m_cf_name_map[cf_handle->GetName()] = cf_handle; - m_cf_id_map[cf_handle->GetID()] = cf_handle; + cf_name_map_[cf_handle->GetName()] = cf_handle; + cf_id_map_[cf_handle->GetID()] = cf_handle; } else { cf_handle = nullptr; } @@ -988,19 +1016,19 @@ rocksdb::ColumnFamilyHandle *CFManager::get_or_create_cf(rocksdb::DB *const rdb_ rocksdb::ColumnFamilyHandle *CFManager::get_cf_by_id(const uint32_t &id) { rocksdb::ColumnFamilyHandle *cf_handle = nullptr; - std::scoped_lock guard(m_mutex); + std::scoped_lock guard(cf_mutex_); - const auto it = m_cf_id_map.find(id); - if (it != m_cf_id_map.end()) cf_handle = it->second; + const auto it = cf_id_map_.find(id); + if (it != cf_id_map_.end()) cf_handle = it->second; return cf_handle; } std::vector CFManager::get_all_cf(void) { std::vector list; - std::scoped_lock guard(m_mutex); + std::scoped_lock guard(cf_mutex_); - for (auto it : m_cf_id_map) { + for (auto it : cf_id_map_) { ASSERT(it.second != nullptr, "it.second is NULL"); list.push_back(it.second); } diff --git a/storage/tianmu/index/rdb_meta_manager.h b/storage/tianmu/index/rdb_meta_manager.h index 1a1573d3f..b65452cca 100644 --- a/storage/tianmu/index/rdb_meta_manager.h +++ b/storage/tianmu/index/rdb_meta_manager.h @@ -46,9 +46,11 @@ const std::string DEFAULT_CF_NAME("default"); const std::string DEFAULT_ROWSTORE_NAME("__rowstore__.default"); const std::string DEFAULT_ROWSTORE_PREFIX("__rowstore__."); const std::string DEFAULT_SYSTEM_CF_NAME("__system__"); + const char QUALIFIER_VALUE_SEP = '='; const char *const CF_NAME_QUALIFIER = "cfname"; const char QUALIFIER_SEP = ';'; +//mem for pack string. const std::string SPACE(CHUNKSIZE, 0x20); constexpr int REVERSE_CF_FLAG = 0x1; constexpr int INDEX_NUMBER_SIZE = 0x4; @@ -64,7 +66,7 @@ enum class MetaType { END_DICT_INDEX_ID = 255 }; -enum class enumVersion { +enum class VersionType { DDL_VERSION = 1, CF_VERSION = 1, MAX_INDEX_ID_VERSION = 1, @@ -72,17 +74,22 @@ enum class enumVersion { CREATE_INDEX_ONGOING_VERSION = 1, }; -enum class enumIndexInfo { +enum class IndexInfoType { INDEX_INFO_VERSION_INITIAL = 1, INDEX_INFO_VERSION_COLS = 2, }; -enum class enumIndexType { INDEX_TYPE_PRIMARY = 1, INDEX_TYPE_SECONDARY, INDEX_TYPE_HIDDEN_PRIMARY }; +enum class IndexType { + INDEX_TYPE_PRIMARY = 1, + INDEX_TYPE_SECONDARY, + INDEX_TYPE_HIDDEN_PRIMARY +}; +//index of rocksdb table for tianmu. class RdbKey { public: RdbKey(const RdbKey &k); - RdbKey(uint indexnr, uint keyno, rocksdb::ColumnFamilyHandle *cf_handle, uint16_t index_ver, uchar index_type, + RdbKey(uint index_pos, uint keyno, rocksdb::ColumnFamilyHandle *cf_handle, uint16_t index_ver, uchar index_type, bool is_reverse_cf, const char *name, std::vector &cols); RdbKey &operator=(const RdbKey &) = delete; @@ -90,127 +97,188 @@ class RdbKey { // Convert a key from KeyTupleFormat to mem-comparable form void pack_key(StringWriter &key, std::vector &fields, StringWriter &info); common::ErrorCode unpack_key(StringReader &key, StringReader &value, std::vector &fields); + + //pack and unpack field num. void pack_field_number(StringWriter &key, std::string_view &field, uchar flag); - int unpack_field_number(StringReader &key, std::string &field, uchar flag); + common::ErrorCode unpack_field_number(StringReader &key, std::string &field, uchar flag); + + //pack and unpack field value. void pack_field_string(StringWriter &info, StringWriter &key, std::string_view &field); - int unpack_field_string(StringReader &key, StringReader &value, std::string &field); + common::ErrorCode unpack_field_string(StringReader &key, StringReader &value, std::string &field); + void get_key_cols(std::vector &cols); + int cmp_full_keys(const rocksdb::Slice &a, const rocksdb::Slice &b) const { ASSERT(covers_key(a), "covers_key fail!"); ASSERT(covers_key(b), "covers_key fail!"); return memcmp(a.data(), b.data(), std::min(a.size(), b.size())); } + // Check if given mem-comparable key belongs to this index bool covers_key(const rocksdb::Slice &slice) const { if (slice.size() < INDEX_NUMBER_SIZE) return false; - if (memcmp(slice.data(), m_index_nr_be, INDEX_NUMBER_SIZE)) return false; + if (memcmp(slice.data(), index_pos_be_, INDEX_NUMBER_SIZE)) return false; return true; } + + //Check if prefix is matched. bool value_matches_prefix(const rocksdb::Slice &value, const rocksdb::Slice &prefix) const { return covers_key(value) && !cmp_full_keys(value, prefix); } + GlobalId get_gl_index_id() const { - const GlobalId gl_index_id = {m_cf_handle->GetID(), m_indexnr}; + const GlobalId gl_index_id = {cf_handle_->GetID(), index_pos_}; return gl_index_id; } - rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf_handle; } + + rocksdb::ColumnFamilyHandle *get_cf() const { return cf_handle_; } + std::string get_boundary_key(bool lower) { uchar index[INDEX_NUMBER_SIZE]; - lower ? be_store_index(index, m_indexnr) : be_store_index(index, m_indexnr + 1); + lower ? be_store_index(index, index_pos_) : be_store_index(index, index_pos_ + 1); return std::string(reinterpret_cast(index), INDEX_NUMBER_SIZE); } + static const std::vector parse_into_tokens(const std::string &s, const char delim); static const std::string parse_comment(const std::string &comment); + //gets index pos + uint32_t GetIndexPos() const { return index_pos_; } + //gets index version. + uint16_t GetIndexVersion() const { return index_ver_;} + //gets index type. + uchar GetIndexType() const {return index_type_;} + //key is in reverse order. + bool IsInReverseOrder() const { return is_reverse_order_; } + //index key name. + std::string GetIndexName() { return key_name_; } + //gets Attrs of Cols. + std::vector GetColAttrs () const { return cols_; } private: friend class RdbTable; - const uint32_t m_indexnr; - uchar m_index_nr_be[INDEX_NUMBER_SIZE]; - rocksdb::ColumnFamilyHandle *m_cf_handle; - - public: - uint16_t m_index_ver; - uchar m_index_type; - bool m_is_reverse; - std::string m_name; - - private: - uint m_keyno; + //ith pos in TABLE_SHARE::key_info[]. + const uint32_t index_pos_; + //pos of index in big endian order. + uchar index_pos_be_[INDEX_NUMBER_SIZE]; + //handler of column family. + rocksdb::ColumnFamilyHandle *cf_handle_; + //index version. + uint16_t index_ver_; + //index type. + uchar index_type_; + //is in reverse order. + bool is_reverse_order_; + //index key name. + std::string key_name_; + + //uint m_keyno; + //atributes of key. std::vector cols_; }; +//rocksdb table for tianmu. class RdbTable { public: RdbTable(const RdbTable &) = delete; RdbTable &operator=(const RdbTable &) = delete; + explicit RdbTable(const std::string &name); explicit RdbTable(const rocksdb::Slice &slice, const size_t &pos = 0); - ~RdbTable(); + virtual ~RdbTable(); + + //does the cf exists in dicts. bool if_exist_cf(DICTManager *dict); + //stores the dicts. void put_dict(DICTManager *dict, rocksdb::WriteBatch *const batch, uchar *const key, size_t keylen); - const std::string &fullname() const { return m_fullname; } - const std::string &dbname() const { return m_dbname; } - const std::string &tablename() const { return m_tablename; } - std::vector> m_rdbkeys; + //gets the mem vars. + const std::string &fullname() const { return full_name_; } + const std::string &dbname() const { return db_name_; } + const std::string &tablename() const { return table_name_; } + std::vector >& GetRdbTableKeys () { return rdb_keys_;} + private: void set_name(const std::string &name); - - private: - std::string m_fullname; - std::string m_dbname; - std::string m_tablename; + //keys of rdb table. + std::vector> rdb_keys_; + //full table name. + std::string full_name_; + //db name. + std::string db_name_; + //table name. + std::string table_name_; }; +//Seq Gen. the step length use default value: 1. class SeqGenerator { public: SeqGenerator(const SeqGenerator &) = delete; SeqGenerator &operator=(const SeqGenerator &) = delete; SeqGenerator() = default; - void init(const uint &initial_number) { m_next_number = initial_number; } + + //do initialization. + void init(const uint &initial_number) { next_number_ = initial_number; } + //get the next seq id. uint get_and_update_next_number(DICTManager *const dict); private: - uint m_next_number = 0; - std::recursive_mutex m_mutex; + //the next seq num. + std::atomic next_number_ {0}; + //the mutex of seq, make sure it's mult-thread safe. not necessary. + //std::recursive_mutex seq_mutex_; }; +//DDL manager. class DDLManager { public: DDLManager(const DDLManager &) = delete; DDLManager &operator=(const DDLManager &) = delete; DDLManager() = default; + //do initialization. bool init(DICTManager *const dict, CFManager *const cf_manager_); + //do cleanup. void cleanup(); + + //find the handler by table name. std::shared_ptr find(const std::string &table_name); - void put_and_write(std::shared_ptr key_descr, rocksdb::WriteBatch *const batch); - void remove(std::shared_ptr tbl, rocksdb::WriteBatch *const batch); - bool rename(const std::string &from, const std::string &to, rocksdb::WriteBatch *const batch); + //find the mem handler by table name. std::shared_ptr find_mem(const std::string &table_name); + //store a rc mem table into DDL mananger. void put_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch); + + //write dictionary into tbl. + void put_and_write(std::shared_ptr tbl, rocksdb::WriteBatch *const batch); + //rename fromm `from` to `to`. + bool rename(const std::string &from, const std::string &to, rocksdb::WriteBatch *const batch); + + //remove the tbl from dictionary. + void remove(std::shared_ptr tbl, rocksdb::WriteBatch *const batch); + //remove tbl from mem hash table. void remove_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch); bool rename_mem(std::string &from, std::string &to, rocksdb::WriteBatch *const batch); - - uint get_and_update_next_number(DICTManager *const dict) { return m_sequence.get_and_update_next_number(dict); } + //get the next seq num. + uint get_and_update_next_number(DICTManager *const dict) { return seq_gen_.get_and_update_next_number(dict); } private: // Put the data into in-memory table (only) void put(std::shared_ptr tbl); + //dict mananger handler. + DICTManager *dict_ = nullptr; + //column family manager handler. + CFManager *cf_ = nullptr; - private: - DICTManager *m_dict = nullptr; - CFManager *m_cf = nullptr; // Contains RdbTable elements - std::unordered_map> m_ddl_hash; - std::unordered_map> m_mem_hash; - std::recursive_mutex m_lock; - std::recursive_mutex m_mem_lock; - SeqGenerator m_sequence; + std::unordered_map> ddl_hash_; + std::unordered_map> mem_hash_; + std::recursive_mutex lock_; + std::recursive_mutex mem_lock_; + SeqGenerator seq_gen_; }; + /* Meta description -------------------------------------------------------------- @@ -239,47 +307,71 @@ class DICTManager { DICTManager &operator=(const DICTManager &) = delete; DICTManager() = default; + //do initalization, gets or create a column family. bool init(rocksdb::DB *const rdb_dict, CFManager *const cf_manager_); - inline void lock() { m_mutex.lock(); } - inline void unlock() { m_mutex.unlock(); } + //do cleanup. void cleanup(){}; - inline rocksdb::ColumnFamilyHandle *get_system_cf() const { return m_system_cfh; } + + //locks operations. + inline void lock() { dict_mutex_.lock(); } + inline void unlock() { dict_mutex_.unlock(); } + + inline rocksdb::ColumnFamilyHandle *get_system_cf() const { return system_cf_; } + std::unique_ptr begin() const; bool commit(rocksdb::WriteBatch *const batch, const bool &sync = true) const; + rocksdb::Status get_value(const rocksdb::Slice &key, std::string *const value) const; + + //key operations, such as put and delete key. void put_key(rocksdb::WriteBatchBase *const batch, const rocksdb::Slice &key, const rocksdb::Slice &value) const; void delete_key(rocksdb::WriteBatchBase *batch, const rocksdb::Slice &key) const; + + //get a new interator. std::shared_ptr new_iterator() const; + + //index info operations. void save_index_info(rocksdb::WriteBatch *batch, uint16_t index_ver, uchar index_type, uint index_id, uint cf_id, std::vector &cols) const; void delete_index_info(rocksdb::WriteBatch *batch, const GlobalId &index_id) const; bool get_index_info(const GlobalId &gl_index_id, uint16_t &index_ver, uchar &index_type, std::vector &cols) const; + + //column family flags operations. void add_cf_flags(rocksdb::WriteBatch *const batch, const uint &cf_id, const uint &cf_flags) const; bool get_cf_flags(const uint &cf_id, uint32_t &cf_flags) const; + //table operations. void add_drop_table(std::vector> &keys, rocksdb::WriteBatch *const batch) const; void add_drop_index(const std::vector &gl_index_ids, rocksdb::WriteBatch *const batch) const; + //index id operations. bool get_max_index_id(uint32_t *const index_id) const; bool update_max_index_id(rocksdb::WriteBatch *const batch, const uint32_t &index_id) const; + + //ongoing_index operations. void get_ongoing_index(std::vector &ids, MetaType dd_type) const; void start_ongoing_index(rocksdb::WriteBatch *const batch, const GlobalId &index_id, MetaType dd_type) const; void end_ongoing_index(rocksdb::WriteBatch *const batch, const GlobalId &id, MetaType dd_type) const; - void finish_indexes(const std::vector &ids, MetaType dd_type) const; bool is_drop_index_ongoing(const GlobalId &gl_index_id, MetaType dd_type) const; + void finish_indexes(const std::vector &ids, MetaType dd_type) const; + private: void dump_index_id(StringWriter &id, MetaType dict_type, const GlobalId &gl_index_id) const; void delete_with_prefix(rocksdb::WriteBatch *const batch, MetaType dict_type, const GlobalId &gl_index_id) const; - private: - std::mutex m_mutex; - rocksdb::DB *m_db = nullptr; - rocksdb::ColumnFamilyHandle *m_system_cfh = nullptr; - uchar m_max_index[INDEX_NUMBER_SIZE] = {0}; + //mutex for dict. + std::mutex dict_mutex_; + //handler of db; + rocksdb::DB *db_ = nullptr; + //system column family. + rocksdb::ColumnFamilyHandle *system_cf_ = nullptr; + //max index of ervery INDEX_NUMBER_SIZE. + uchar max_index_[INDEX_NUMBER_SIZE] = {0}; }; +//Column Family Mananger. class CFManager { public: CFManager(const CFManager &) = delete; @@ -288,14 +380,21 @@ class CFManager { void init(std::vector &handles); void cleanup(); + + //Gets the column family if exists, otherwise, create a new one. rocksdb::ColumnFamilyHandle *get_or_create_cf(rocksdb::DB *const rdb_, const std::string &cf_name); + //Gets the column family by cf id. rocksdb::ColumnFamilyHandle *get_cf_by_id(const uint32_t &id); + //Gets all of the column family in this db. std::vector get_all_cf(); private: - std::map m_cf_name_map; - std::map m_cf_id_map; - std::recursive_mutex m_mutex; + //name to column family map. + std::map cf_name_map_; + //id to column family map. + std::map cf_id_map_; + //mutex for the maps. + std::recursive_mutex cf_mutex_; }; inline bool IsRowStoreCF(std::string cf_name) {