Skip to content

Commit

Permalink
ha_rocksdb.cc cleanups to minimize sysvar split diff
Browse files Browse the repository at this point in the history
- Move #include directives to the right places.
- Factor out repeated THDVAR & ha_thd calls, & current_thd reads.
- Use the C++11 range for loop in one place, use auto more for local variable
  declarations, add assert(false) to one can't-happen branch.
- Reformat the source file.
  • Loading branch information
laurynas-biveinis committed Jun 19, 2024
1 parent 4f31d90 commit 9a6bc71
Showing 1 changed file with 57 additions and 52 deletions.
109 changes: 57 additions & 52 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */

#include <cassert>
#include "mysqld_error.h"
#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation // gcc: Class implementation
#endif
Expand All @@ -35,9 +33,11 @@
/* C++ standard header files */
#include <inttypes.h>
#include <algorithm>
#include <cassert>
#include <deque>
#include <limits>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <string_view>
Expand All @@ -49,13 +49,14 @@
#include <mysql/thread_pool_priv.h>
#include <mysys_err.h>
#include "my_sys.h"
#include "mysqld_error.h"
#include "scope_guard.h"
#include "sql-common/json_dom.h"
#include "sql/binlog.h"
#include "sql/dd/cache/dictionary_client.h" // dd::cache::Dictionary_client
#include "sql/dd/dd.h" // dd::get_dictionary
#include "sql/dd/dd.h" // dd::get_dictionary
#include "sql/dd/dictionary.h" // dd::Dictionary
#include "sql/debug_sync.h"
#include "sql-common/json_dom.h"
#include "sql/rpl_rli.h"
#include "sql/sql_audit.h"
#include "sql/sql_class.h"
Expand Down Expand Up @@ -4017,7 +4018,8 @@ class Rdb_transaction {

assert(!is_ac_nl_ro_rc_transaction());

if (THDVAR(m_thd, trace_sst_api)) {
const auto trace_sst_api = THDVAR(m_thd, trace_sst_api);
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(
INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
Expand All @@ -4040,7 +4042,7 @@ class Rdb_transaction {

// PREPARE phase: finish all on-going bulk loading Rdb_sst_info and
// collect all Rdb_sst_commit_info containing (SST files, cf)
if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
"SST Tracing : Finishing '%zu' active SST files",
Expand Down Expand Up @@ -4069,7 +4071,7 @@ class Rdb_transaction {
}
}

if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
"SST Tracing : All active SST files finished");
Expand All @@ -4083,7 +4085,7 @@ class Rdb_transaction {
// Rdb_sst_info and collect all Rdb_sst_commit_info containing
// (SST files, cf)
if (!m_key_merge.empty()) {
if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(
INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
Expand Down Expand Up @@ -4141,7 +4143,7 @@ class Rdb_transaction {
}
const std::string &index_name = keydef->get_name();

if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
std::string full_name;
int err = rdb_normalize_tablename(table_name, &full_name);
if (err != HA_EXIT_SUCCESS) {
Expand All @@ -4160,9 +4162,12 @@ class Rdb_transaction {

auto sst_info = std::make_shared<Rdb_sst_info>(
rdb, table_name, index_name, rdb_merge.get_cf(),
*rocksdb_db_options, THDVAR(get_thd(), trace_sst_api),
*rocksdb_db_options, trace_sst_api,
THDVAR(get_thd(), bulk_load_compression_parallel_threads));

const auto enable_unique_key_check =
THDVAR(m_thd, bulk_load_enable_unique_key_check);

if (keydef->is_partial_index()) {
if (!THDVAR(m_thd, bulk_load_partial_index)) continue;
// For partial indexes, we only want to materialize groups that reach
Expand Down Expand Up @@ -4277,7 +4282,7 @@ class Rdb_transaction {
#endif
} else {
// skip unique index check if the unique key check flag is disabled
if (!THDVAR(m_thd, bulk_load_enable_unique_key_check)) {
if (!enable_unique_key_check) {
check_unique_index = false;
}
}
Expand Down Expand Up @@ -4317,7 +4322,7 @@ class Rdb_transaction {
*is_critical_error = false;
}

if (THDVAR(m_thd, bulk_load_enable_unique_key_check)) {
if (enable_unique_key_check) {
my_printf_error(
ER_DUP_ENTRY,
"Duplicate entry found for key name: %s, key:%s", MYF(0),
Expand Down Expand Up @@ -4364,7 +4369,7 @@ class Rdb_transaction {
}
}

if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(
INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
Expand Down Expand Up @@ -4410,7 +4415,7 @@ class Rdb_transaction {
file_count += cf_files_pair.second.external_files.size();
}

if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(
INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
Expand All @@ -4419,7 +4424,7 @@ class Rdb_transaction {
}

const rocksdb::Status s = ingest_bulk_load_files(args);
if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
"SST Tracing: IngestExternalFile '%zu' files returned %s",
Expand Down Expand Up @@ -4448,7 +4453,7 @@ class Rdb_transaction {
commit_info.commit();
}

if (THDVAR(m_thd, trace_sst_api)) {
if (trace_sst_api) {
// NO_LINT_DEBUG
LogPluginErrMsg(
INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
Expand Down Expand Up @@ -4659,11 +4664,11 @@ class Rdb_transaction {
if (create_snapshot) acquire_snapshot(true, table_type);

rocksdb::ReadOptions options = m_read_opts[table_type];
const bool fill_cache = !THDVAR(get_thd(), skip_fill_cache);
auto *const thd = get_thd();
const auto fill_cache = !THDVAR(thd, skip_fill_cache);

if (skip_bloom_filter) {
const bool enable_iterate_bounds =
THDVAR(get_thd(), enable_iterate_bounds);
const auto enable_iterate_bounds = THDVAR(thd, enable_iterate_bounds);
options.total_order_seek = true;
options.iterate_lower_bound =
enable_iterate_bounds ? &eq_cond_lower_bound : nullptr;
Expand Down Expand Up @@ -6123,8 +6128,7 @@ static bool rocksdb_is_supported_system_table(const char *db_name,
replication progress.
*/
static int rocksdb_prepare(handlerton *const hton MY_ATTRIBUTE((__unused__)),
THD *const thd,
bool prepare_tx) {
THD *const thd, bool prepare_tx) {
Rdb_transaction *tx = get_tx_from_thd(thd);
if (!tx->can_prepare()) {
return HA_EXIT_FAILURE;
Expand Down Expand Up @@ -7730,7 +7734,8 @@ static int rocksdb_init_internal(void *const p) {

if (rdb_has_rocksdb_corruption()) {
// NO_LINT_DEBUG
LogPluginErrMsg(ERROR_LEVEL, ER_LOG_PRINTF_MSG,
LogPluginErrMsg(
ERROR_LEVEL, ER_LOG_PRINTF_MSG,
"RocksDB: There was a corruption detected in RocksDB files. "
"Check error log emitted earlier for more details.");
if (rocksdb_allow_to_start_after_corruption) {
Expand Down Expand Up @@ -9320,8 +9325,8 @@ int ha_rocksdb::alloc_key_buffers(const TABLE &table_arg,
const auto m_pack_buffer_offset = buf_size;
buf_size += max_packed_sk_len;

buffers.reset(static_cast<uchar *>(
my_malloc(PSI_NOT_INSTRUMENTED, buf_size, MYF(0))));
buffers.reset(
static_cast<uchar *>(my_malloc(PSI_NOT_INSTRUMENTED, buf_size, MYF(0))));
if (buffers == nullptr) {
free_key_buffers();

Expand Down Expand Up @@ -11399,7 +11404,7 @@ int ha_rocksdb::check(THD *const thd MY_ATTRIBUTE((__unused__)),
rows++;
continue;

print_and_error : {
print_and_error: {
std::string buf;
buf = rdb_hexdump(rowkey_copy.ptr(), rowkey_copy.length());
// NO_LINT_DEBUG
Expand Down Expand Up @@ -11659,8 +11664,7 @@ int ha_rocksdb::get_row_by_sk(uchar *buf, const Rdb_key_def &kd,
Rdb_transaction *const tx = get_tx_from_thd(thd);
assert(tx != nullptr);

tx->acquire_snapshot(true /* acquire_now */,
m_tbl_def->get_table_type());
tx->acquire_snapshot(true /* acquire_now */, m_tbl_def->get_table_type());

int rc = m_iterator->get(key, &m_retrieved_record, RDB_LOCK_NONE);
if (rc) DBUG_RETURN(rc);
Expand Down Expand Up @@ -12562,9 +12566,9 @@ int ha_rocksdb::acquire_prefix_lock(const Rdb_key_def &kd, Rdb_transaction *tx,
HA_EXIT_SUCCESS OK
other HA_ERR error code (can be SE-specific)
*/
int ha_rocksdb::check_and_lock_sk(
const uint key_id, const struct update_row_info &row_info,
bool *const found) {
int ha_rocksdb::check_and_lock_sk(const uint key_id,
const struct update_row_info &row_info,
bool *const found) {
assert(
(row_info.old_data == table->record[1] &&
row_info.new_data == table->record[0]) ||
Expand Down Expand Up @@ -15107,13 +15111,13 @@ int ha_rocksdb::extra(enum ha_extra_function operation) {
ha_rows ha_rocksdb::records_in_range(uint inx, key_range *const min_key,
key_range *const max_key) {
DBUG_ENTER_FUNC();

ha_rows ret = THDVAR(ha_thd(), records_in_range);
auto *const thd = ha_thd();
ha_rows ret = THDVAR(thd, records_in_range);
if (ret) {
DBUG_RETURN(ret);
}
if (table->force_index) {
const ha_rows force_rows = THDVAR(ha_thd(), force_index_records_in_range);
const ha_rows force_rows = THDVAR(thd, force_index_records_in_range);
if (force_rows) {
DBUG_RETURN(force_rows);
}
Expand Down Expand Up @@ -16544,12 +16548,13 @@ int ha_rocksdb::inplace_populate_sk(
dict_manager.get_dict_manager_selector_non_const(table_default_cf_id);
const std::unique_ptr<rocksdb::WriteBatch> wb = local_dict_manager->begin();
rocksdb::WriteBatch *const batch = wb.get();
auto *const thd = ha_thd();

DBUG_EXECUTE_IF("rocksdb_inplace_populate_sk", {
const char act[] =
"now signal ready_to_mark_cf_dropped_in_populate_sk "
"wait_for mark_cf_dropped_done_in_populate_sk";
assert(!debug_sync_set_action(ha_thd(), STRING_WITH_LEN(act)));
assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
});

{
Expand Down Expand Up @@ -16635,7 +16640,7 @@ int ha_rocksdb::inplace_populate_sk(

for (const auto &index : indexes) {
// Skip populating partial indexes.
if (index->is_partial_index() && !THDVAR(ha_thd(), bulk_load_partial_index))
if (index->is_partial_index() && !THDVAR(thd, bulk_load_partial_index))
continue;

/*
Expand Down Expand Up @@ -16704,7 +16709,7 @@ int ha_rocksdb::inplace_populate_sk(
assert(new_table_arg->key_info[index->get_keyno()].flags & HA_NOSAME);
print_keydup_error(new_table_arg,
&new_table_arg->key_info[index->get_keyno()], MYF(0),
ha_thd());
thd);
}

if (res && is_critical_error) {
Expand Down Expand Up @@ -17083,13 +17088,13 @@ bool ha_rocksdb::is_dd_update() const {
}

#define DEF_STATUS_VAR(name) \
{ "rocksdb_" #name, (char *)&SHOW_FNAME(name), SHOW_FUNC, SHOW_SCOPE_GLOBAL }
{"rocksdb_" #name, (char *)&SHOW_FNAME(name), SHOW_FUNC, SHOW_SCOPE_GLOBAL}

#define DEF_STATUS_VAR_PTR(name, ptr, option) \
{ "rocksdb_" name, (char *)ptr, option, SHOW_SCOPE_GLOBAL }
{"rocksdb_" name, (char *)ptr, option, SHOW_SCOPE_GLOBAL}

#define DEF_STATUS_VAR_FUNC(name, ptr, option) \
{ name, reinterpret_cast<char *>(ptr), option, SHOW_SCOPE_GLOBAL }
{name, reinterpret_cast<char *>(ptr), option, SHOW_SCOPE_GLOBAL}

struct rocksdb_status_counters_t {
uint64_t block_cache_miss;
Expand Down Expand Up @@ -18893,12 +18898,11 @@ static int rocksdb_validate_update_cf_options(THD * /* unused */,
}

// Loop through option_map and create missing column families
for (Rdb_cf_options::Name_to_config_t::iterator it = option_map.begin();
it != option_map.end(); ++it) {
for (const auto &option : option_map) {
// If the CF is removed at this point, i.e., cf_manager.drop_cf() has
// been called, it is OK to create a new CF.

const auto &cf_name = it->first;
const auto &cf_name = option.first;
{
auto local_dict_manager =
dict_manager.get_dict_manager_selector_non_const(cf_name);
Expand Down Expand Up @@ -18944,15 +18948,15 @@ static void rocksdb_set_update_cf_options(THD *const /* unused */,

// This should never fail, because of rocksdb_validate_update_cf_options
if (!Rdb_cf_options::parse_cf_options(val, &option_map)) {
assert(false);
return;
}

// For each CF we have, see if we need to update any settings.
for (const auto &cf_name : cf_manager.get_cf_names()) {
assert(!cf_name.empty());

std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
cf_manager.get_cf(cf_name);
auto cfh = cf_manager.get_cf(cf_name);

if (!cfh) {
// NO_LINT_DEBUG
Expand All @@ -18964,11 +18968,11 @@ static void rocksdb_set_update_cf_options(THD *const /* unused */,
}

const auto it = option_map.find(cf_name);
std::string per_cf_options = (it != option_map.end()) ? it->second : "";
const auto per_cf_options = (it != option_map.end()) ? it->second : "";

if (!per_cf_options.empty()) {
Rdb_cf_options::Name_to_config_t opt_map;
rocksdb::Status s = rocksdb::StringToMap(per_cf_options, &opt_map);
auto s = rocksdb::StringToMap(per_cf_options, &opt_map);

if (s != rocksdb::Status::OK()) {
// NO_LINT_DEBUG
Expand Down Expand Up @@ -19002,7 +19006,7 @@ static void rocksdb_set_update_cf_options(THD *const /* unused */,
// the CF options. This is necessary also to make sure that the CF
// options will be correctly reflected in the relevant table:
// ROCKSDB_CF_OPTIONS in INFORMATION_SCHEMA.
rocksdb::ColumnFamilyOptions cf_options = rdb->GetOptions(cfh.get());
const auto cf_options = rdb->GetOptions(cfh.get());
std::string updated_options;

s = rocksdb::GetStringFromColumnFamilyOptions(&updated_options,
Expand Down Expand Up @@ -19338,9 +19342,9 @@ rocksdb::Status rdb_tx_get_for_update(Rdb_transaction *tx,
rocksdb::PinnableSlice *const value,
TABLE_TYPE table_type, bool exclusive,
bool skip_wait) {
bool do_validate =
!(my_core::thd_tx_isolation(tx->get_thd()) <= ISO_READ_COMMITTED ||
THDVAR(tx->get_thd(), skip_snapshot_validation));
auto *const thd = tx->get_thd();
const auto do_validate = !(thd_tx_isolation(thd) <= ISO_READ_COMMITTED ||
THDVAR(thd, skip_snapshot_validation));
rocksdb::Status s = tx->get_for_update(kd, key, value, table_type, exclusive,
do_validate, skip_wait);

Expand Down Expand Up @@ -19600,13 +19604,14 @@ int ha_rocksdb::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
uint n_ranges, uint mode,
HANDLER_BUFFER *buf) {
m_need_build_decoder = true;
auto &thd = *ha_thd();

int res;

if (!current_thd->optimizer_switch_flag(OPTIMIZER_SWITCH_MRR) ||
if (!thd.optimizer_switch_flag(OPTIMIZER_SWITCH_MRR) ||
(mode & HA_MRR_USE_DEFAULT_IMPL) != 0 ||
(buf->buffer_end - buf->buffer < mrr_get_length_per_rec()) ||
(THDVAR(current_thd, mrr_batch_size) == 0)) {
(THDVAR(&thd, mrr_batch_size) == 0)) {
mrr_uses_default_impl = true;
res = handler::multi_range_read_init(seq, seq_init_param, n_ranges, mode,
buf);
Expand Down

0 comments on commit 9a6bc71

Please sign in to comment.