Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-659] Make Region Mapped To Only One Table #318

Merged
merged 12 commits into from
Nov 13, 2019
2 changes: 1 addition & 1 deletion dbms/src/Debug/ClusterManage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void ClusterManage::findRegionByRange(Context & context, const ASTs & args, Prin

void ClusterManage::checkTableOptimize(DB::Context & context, const DB::ASTs & args, DB::Printer)
{
if (args.size() < 3)
if (args.size() < 2)
throw Exception("Args not matched, should be: table-id, threshold", ErrorCodes::BAD_ARGUMENTS);

auto & tmt = context.getTMTContext();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ DBGInvoker::DBGInvoker()
// TODO: remove this, use sleep in bash script
regSchemalessFunc("sleep", dbgFuncSleep);

regSchemalessFunc("clean_up_region", MockTiDBTable::dbgFuncCleanUpRegions);
regSchemalessFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable);
regSchemalessFunc("mock_tidb_db", MockTiDBTable::dbgFuncMockTiDBDB);
regSchemalessFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class MockTiDB : public ext::singleton<MockTiDB>
using TablePtr = std::shared_ptr<Table>;

public:
TableID newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name);
TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso,
const String & handle_pk_name);

DatabaseID newDataBase(const String & database_name);

Expand Down Expand Up @@ -109,7 +109,7 @@ class MockTiDB : public ext::singleton<MockTiDB>

std::unordered_map<Int64, SchemaDiff> version_diff;

std::atomic<TableID> table_id_allocator = MaxSystemTableID + 1;
std::atomic<TableID> table_id_allocator = 30;

Int64 version = 0;
};
Expand Down
20 changes: 19 additions & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ extern const int LOGICAL_ERROR;
void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3 && args.size() != 4)
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name]", ErrorCodes::BAD_ARGUMENTS);
throw Exception(
"Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
Expand Down Expand Up @@ -287,4 +288,21 @@ void MockTiDBTable::dbgFuncTruncateTiDBTable(Context & /*context*/, const ASTs &
output(ss.str());
}

void MockTiDBTable::dbgFuncCleanUpRegions(DB::Context & context, const DB::ASTs &, DB::DBGInvoker::Printer output)
{
std::vector<RegionID> regions;
auto & kvstore = context.getTMTContext().getKVStore();
auto & region_table = context.getTMTContext().getRegionTable();
{
auto lock = kvstore->genTaskLock();

for (const auto & e : kvstore->regions())
regions.emplace_back(e.first);

for (const auto & region_id : regions)
kvstore->removeRegion(region_id, &region_table, lock);
}
output("all regions have been cleaned");
}

} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Debug/dbgFuncMockTiDBTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ struct MockTiDBTable
// ./storages-client.sh "DBGInvoke reset_syncer()"
static void dbgFuncResetSyncer(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Remove All Region.
// Usage:
// ./storages-client.sh "DBGInvoke clean_up_region()"
static void dbgFuncCleanUpRegions(Context & context, const ASTs & args, DBGInvoker::Printer output);

private:
static void dbgFuncDropTiDBTableImpl(
Context & context, String database_name, String table_name, bool drop_regions, bool is_drop_db, DBGInvoker::Printer output);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void dbgFuncRegionSnapshotWithData(Context & context, const ASTs & args, DBGInvo
MockTiKV::instance().getRaftIndex(region_id);
}

applySnapshot(tmt.getKVStore(), region, &context);
applySnapshot(tmt.getKVStore(), region, &context, false);

std::stringstream ss;
ss << "put region #" << region_id << ", range[" << start << ", " << end << ")"
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
throw RegionException(std::move(region_ids), RegionTable::RegionReadStatus::NOT_FOUND);
throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND);
}
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64(), current_region->getRange()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ CoprocessorHandler::CoprocessorHandler(
: cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler"))
{}

grpc::Status CoprocessorHandler::execute()
try
grpc::Status CoprocessorHandler::execute() try
{
switch (cop_request->tp())
{
Expand Down Expand Up @@ -81,12 +80,12 @@ catch (const RegionException & e)
errorpb::Error * region_err;
switch (e.status)
{
case RegionTable::RegionReadStatus::NOT_FOUND:
case RegionTable::RegionReadStatus::PENDING_REMOVE:
case RegionException::RegionReadStatus::NOT_FOUND:
case RegionException::RegionReadStatus::PENDING_REMOVE:
region_err = cop_response->mutable_region_error();
region_err->mutable_region_not_found()->set_region_id(cop_request->context().region_id());
break;
case RegionTable::RegionReadStatus::VERSION_ERROR:
case RegionException::RegionReadStatus::VERSION_ERROR:
region_err = cop_response->mutable_region_error();
region_err->mutable_epoch_not_match();
break;
Expand Down
29 changes: 11 additions & 18 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,18 @@ RaftService::RaftService(DB::Context & db_context_)
{
LOG_INFO(log, "try to final optimize table " << table_id);
tryOptimizeStorageFinal(db_context, table_id);
LOG_INFO(log, "finish final optimize table " << table_id);
}
return region_table.tryFlushRegions();
});

region_flush_handle = background_pool.addTask([this] {
RegionID region_id;
data_reclaim_handle = background_pool.addTask([this] {
std::list<RegionDataReadInfoList> tmp;
{
std::lock_guard<std::mutex> lock(region_mutex);
if (regions_to_flush.empty())
return false;
region_id = regions_to_flush.front();
regions_to_flush.pop();
tmp = std::move(data_to_reclaim);
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
}
RegionTable & region_table = db_context.getTMTContext().getRegionTable();
region_table.tryFlushRegion(region_id);
return true;
return false;
});

region_decode_handle = background_pool.addTask([this] {
Expand Down Expand Up @@ -83,13 +79,10 @@ RaftService::RaftService(DB::Context & db_context_)
}
}

void RaftService::addRegionToFlush(const Region & region)
void RaftService::dataMemReclaim(DB::RegionDataReadInfoList && data)
{
{
std::lock_guard<std::mutex> lock(region_mutex);
regions_to_flush.push(region.id());
}
region_flush_handle->wake();
std::lock_guard<std::mutex> lock(reclaim_mutex);
data_to_reclaim.emplace_back(std::move(data));
}

void RaftService::addRegionToDecode(const RegionPtr & region)
Expand All @@ -114,10 +107,10 @@ RaftService::~RaftService()
table_flush_handle = nullptr;
}

if (region_flush_handle)
if (data_reclaim_handle)
{
background_pool.removeTask(region_flush_handle);
region_flush_handle = nullptr;
background_pool.removeTask(data_reclaim_handle);
data_reclaim_handle = nullptr;
}

if (region_decode_handle)
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Raft/RaftService.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <Raft/RaftContext.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/Transaction/RegionDataRead.h>
#include <Storages/Transaction/Types.h>
#include <common/logger_useful.h>
#include <boost/noncopyable.hpp>
Expand All @@ -27,7 +28,7 @@ class RaftService final : public enginepb::Engine::Service, public std::enable_s

~RaftService() final;

void addRegionToFlush(const Region & region);
void dataMemReclaim(RegionDataReadInfoList &&);
void addRegionToDecode(const RegionPtr & region);

private:
Expand All @@ -45,12 +46,17 @@ class RaftService final : public enginepb::Engine::Service, public std::enable_s
Logger * log;

std::mutex region_mutex;
std::queue<RegionID> regions_to_flush;
RegionMap regions_to_decode;

std::mutex reclaim_mutex;
std::list<RegionDataReadInfoList> data_to_reclaim;

BackgroundProcessingPool::TaskHandle single_thread_task_handle;
BackgroundProcessingPool::TaskHandle table_flush_handle;
BackgroundProcessingPool::TaskHandle region_flush_handle;

// kvstore will try to flush data into ch when handling raft cmd CompactLog in order to reduce the size of region.
// use this task to reclaim data in another thread.
BackgroundProcessingPool::TaskHandle data_reclaim_handle;
BackgroundProcessingPool::TaskHandle region_decode_handle;
};

Expand Down
28 changes: 13 additions & 15 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
// the index of column is constant after MergeTreeBlockInputStream is constructed. exception will be thrown if not found.
const size_t handle_column_index = 0, version_column_index = 1, delmark_column_index = 2;

const auto func_throw_retry_region = [&](RegionTable::RegionReadStatus status) {
const auto func_throw_retry_region = [&](RegionException::RegionReadStatus status) {
std::vector<RegionID> region_ids;
region_ids.reserve(regions_executor_data.size());
for (const auto & query_info : regions_executor_data)
Expand Down Expand Up @@ -302,8 +302,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
regions_executor_data.reserve(regions.size());
for (const auto & [id, region] : regions)
{
if (region == nullptr)
continue;
regions_executor_data.emplace_back(RegionQueryInfo{id, region->version(), region->confVer(), {0, 0}});
}
}
Expand All @@ -315,7 +313,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
if (region == nullptr)
{
LOG_WARNING(log, "[region " << query_info.info.region_id << "] is not found in KVStore, try again");
func_throw_retry_region(RegionTable::RegionReadStatus::NOT_FOUND);
func_throw_retry_region(RegionException::RegionReadStatus::NOT_FOUND);
}
kvstore_region.emplace(query_info.info.region_id, std::move(region));
}
Expand All @@ -332,13 +330,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
auto start_time = Clock::now();
const size_t mem_region_num = regions_executor_data.size();
const size_t batch_size = mem_region_num / concurrent_num;
std::atomic_uint8_t region_status = RegionTable::RegionReadStatus::OK;
std::atomic_uint8_t region_status = RegionException::RegionReadStatus::OK;

const auto func_run_learner_read = [&](const size_t region_begin) {
const size_t region_end = std::min(region_begin + batch_size, mem_region_num);
for (size_t region_index = region_begin; region_index < region_end; ++region_index)
{
if (region_status != RegionTable::RegionReadStatus::OK)
if (region_status != RegionException::RegionReadStatus::OK)
return;

RegionQueryInfo & region_query_info = regions_executor_data[region_index].info;
Expand All @@ -353,13 +351,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
kvstore_region[region_query_info.region_id], region_query_info.version, region_query_info.conf_version,
mvcc_query_info.resolve_locks, mvcc_query_info.read_tso, region_query_info.range_in_table);

if (status != RegionTable::OK)
if (status != RegionException::RegionReadStatus::OK)
{
LOG_WARNING(log,
"Check memory cache, region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first.toString() << ", "
<< region_query_info.range_in_table.second.toString() << ") , status "
<< RegionTable::RegionReadStatusString(status));
<< RegionException::RegionReadStatusString(status));
region_status = status;
}
else if (block)
Expand All @@ -380,8 +378,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
func_run_learner_read(0);
}

if (region_status != RegionTable::RegionReadStatus::OK)
func_throw_retry_region(static_cast<RegionTable::RegionReadStatus>(region_status.load()));
if (region_status != RegionException::RegionReadStatus::OK)
func_throw_retry_region(static_cast<RegionException::RegionReadStatus>(region_status.load()));

auto end_time = Clock::now();
LOG_DEBUG(log,
Expand Down Expand Up @@ -846,13 +844,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
// check all data, include special region.
{
auto region = tmt.getKVStore()->getRegion(region_query_info.region_id);
RegionTable::RegionReadStatus status = RegionTable::OK;
RegionException::RegionReadStatus status = RegionException::RegionReadStatus::OK;
if (region != kvstore_region[region_query_info.region_id])
status = RegionTable::NOT_FOUND;
status = RegionException::RegionReadStatus::NOT_FOUND;
else if (region->version() != region_query_info.version)
status = RegionTable::VERSION_ERROR;
status = RegionException::RegionReadStatus::VERSION_ERROR;

if (status != RegionTable::OK)
if (status != RegionException::RegionReadStatus::OK)
{
// ABA problem may cause because one region is removed and inserted back.
// if the version of region is changed, the part may has less data because of compaction.
Expand All @@ -861,7 +859,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
<< region_query_info.version << ", handle range ["
<< region_query_info.range_in_table.first.toString() << ", "
<< region_query_info.range_in_table.second.toString() << ") , status "
<< RegionTable::RegionReadStatusString(status));
<< RegionException::RegionReadStatusString(status));
// throw exception and exit.
func_throw_retry_region(status);
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ void StorageMergeTree::alterInternal(
else if (param.type == AlterCommand::RENAME_COLUMN)
{
rename_column = true;
if (params.size() != 1)
if (params.size() != 1)
{
throw Exception("There is an internal error for rename columns", ErrorCodes::LOGICAL_ERROR);
}
Expand Down Expand Up @@ -717,7 +717,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & /*query*/, const ASTPtr & pa
void StorageMergeTree::truncate(const ASTPtr & /*query*/, const Context & /*context*/)
{
auto lock = lockForAlter(__PRETTY_FUNCTION__);

MergeTreeData::DataParts parts = data.getDataParts();

for (const auto & part : parts)
Expand Down
29 changes: 1 addition & 28 deletions dbms/src/Storages/Transaction/ExtraCFData.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@ struct ExtraCFData
template <>
struct ExtraCFData<RegionDefaultCFDataTrait>
{
mutable std::mutex default_cf_decode_mutex;

ExtraCFData() = default;

void add(const std::shared_ptr<const TiKVValue> & e)
{
std::lock_guard<std::mutex> lock(default_cf_decode_mutex);
queue.push_back(e);
}
void add(const std::shared_ptr<const TiKVValue> & e) { queue.push_back(e); }

std::optional<ExtraCFDataQueue> popAll()
{
std::lock_guard<std::mutex> lock(default_cf_decode_mutex);
if (queue.empty())
return {};

Expand All @@ -42,26 +35,6 @@ struct ExtraCFData<RegionDefaultCFDataTrait>

ExtraCFData(const ExtraCFData & src) = delete;

ExtraCFData(ExtraCFData && src) { mergeFrom(src); }

ExtraCFData & operator=(ExtraCFData && src)
{
mergeFrom(src);
return *this;
}

private:
void mergeFrom(ExtraCFData & src)
{
auto res = src.popAll();
if (res)
{
std::lock_guard<std::mutex> lock(default_cf_decode_mutex);
for (auto && e : *res)
queue.emplace_back(std::move(e));
}
}

private:
ExtraCFDataQueue queue;
};
Expand Down
Loading