Skip to content

Commit

Permalink
Merge pull request #2 from pingcap/region-flush-refactor
Browse files Browse the repository at this point in the history
Refactor TMTTableFlusher
  • Loading branch information
flowbehappy authored Feb 22, 2019
2 parents 263a1a6 + fb67fcd commit d6c407b
Show file tree
Hide file tree
Showing 23 changed files with 686 additions and 799 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ DBGInvoker::DBGInvoker()
regFunc("mock_tidb_table", dbgFuncMockTiDBTable);
regFunc("drop_tidb_table", dbgFuncDropTiDBTable);

regFunc("set_flush_rows", dbgFuncSetFlushRows);
regFunc("set_deadline_seconds", dbgFuncSetDeadlineSeconds);
regFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regFunc("raft_insert_row", dbgFuncRaftInsertRow);
regFunc("raft_insert_rows", dbgFuncRaftInsertRows);
Expand Down
27 changes: 7 additions & 20 deletions dbms/src/Debug/dbgFuncMockTiDBData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,19 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

void dbgFuncSetFlushRows(Context & context, const ASTs & args, DBGInvoker::Printer output)
void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
throw Exception("Args not matched, should be: threshold-rows", ErrorCodes::BAD_ARGUMENTS);
if (args.size() != 2)
throw Exception("Args not matched, should be: bytes, seconds", ErrorCodes::BAD_ARGUMENTS);

auto rows = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
TMTContext & tmt = context.getTMTContext();
tmt.table_flushers.setFlushThresholdRows(rows);

std::stringstream ss;
ss << "set flush threshold to " << rows << " rows";
output(ss.str());
}

void dbgFuncSetDeadlineSeconds(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
throw Exception("Args not matched, should be: second-uint", ErrorCodes::BAD_ARGUMENTS);

const UInt64 second = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
auto bytes = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
auto seconds = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[1]).value);

TMTContext & tmt = context.getTMTContext();
tmt.table_flushers.setDeadlineSeconds(second);
tmt.region_partition.setFlushThresholds({{bytes, Seconds(seconds)}});

std::stringstream ss;
ss << "set deadline seconds to " << second << "s";
ss << "set flush threshold to (" << bytes << " bytes, " << seconds << " seconds)";
output(ss.str());
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockTiDBData.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace DB
// Change flush threshold rows
// Usage:
// ./storages-client.sh "DBGInvoke set_flush_rows(threshold_rows)"
void dbgFuncSetFlushRows(Context & context, const ASTs & args, DBGInvoker::Printer output);
void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Change flush deadline seconds
// Usage:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ void dbgFuncDropTiDBTable(Context & context, const ASTs & args, DBGInvoker::Prin

TMTContext & tmt = context.getTMTContext();
tmt.region_partition.dropRegionsInTable(table_id);
tmt.table_flushers.dropRegionsInTable(table_id);

MockTiDB::instance().dropTable(database_name, table_name);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ std::vector<std::tuple<HandleID, HandleID, RegionID>> getPartitionRegionRanges(
};

TMTContext & tmt = context.getTMTContext();
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, context, callback);
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, callback);
return handle_ranges;
}

Expand Down Expand Up @@ -494,7 +494,7 @@ void dbgFuncCheckRegionCorrect(Context & context, const ASTs & args, DBGInvoker:
for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id)
{
std::unordered_map<RegionID, RegionPtr> partition_regions;
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, context, [&](Regions regions)
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, [&](Regions regions)
{
for (auto region : regions)
partition_regions[region->id()] = region;
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
addRequestsToRaftCmd(cmds.add_requests(), region_id, key, value, prewrite_ts, commit_ts, false);
tmt.kvstore->onServiceCommand(cmds, raft_ctx);
}

tmt.table_flushers.onPutTryFlush(region);
}

void remove(const TiDB::TableInfo & table_info, RegionID region_id, HandleID handle_id, Context & context)
Expand All @@ -167,7 +165,6 @@ void remove(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
addRequestsToRaftCmd(cmds.add_requests(), region_id, key, value, prewrite_ts, commit_ts, true);

tmt.kvstore->onServiceCommand(cmds, raft_ctx);
tmt.table_flushers.onPutTryFlush(region);
}

struct BatchCtrl
Expand Down Expand Up @@ -271,7 +268,6 @@ void batchInsert(const TiDB::TableInfo & table_info, std::unique_ptr<BatchCtrl>
}

tmt.kvstore->onServiceCommand(cmds, raft_ctx);
tmt.table_flushers.onPutTryFlush(region);
}
}

Expand Down Expand Up @@ -322,7 +318,7 @@ Int64 concurrentRangeOperate(const TiDB::TableInfo & table_info, HandleID start_
for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id)
{
TMTContext & tmt = context.getTMTContext();
tmt.region_partition.traverseRegionsByTablePartition(table_info.id, partition_id, context, [&](Regions d){
tmt.region_partition.traverseRegionsByTablePartition(table_info.id, partition_id, [&](Regions d){
regions.insert(regions.end(), d.begin(), d.end());
});
}
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,
{
RaftContext rctx(&db_context, grpc_context, stream);
BackgroundProcessingPool::TaskHandle persist_handle;
BackgroundProcessingPool::TaskHandle flush_handle;

RegionPartition & region_partition = db_context.getTMTContext().region_partition;

try
{
kvstore->report(rctx);

persist_handle = background_pool.addTask([&, this] { return kvstore->tryPersistAndReport(rctx); });
flush_handle = background_pool.addTask([&] { return region_partition.tryFlushRegions(); });

enginepb::CommandRequestBatch request;
while (stream->Read(&request))
Expand All @@ -53,6 +58,8 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,

if (persist_handle)
background_pool.removeTask(persist_handle);
if (flush_handle)
background_pool.removeTask(flush_handle);

return grpc::Status::CANCELLED;
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
continue;
auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition(
data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read,
const_cast<Context &>(context), false, true, query_info.resolve_locks, query_info.read_tso);
false, true, query_info.resolve_locks, query_info.read_tso);
if (region_input_stream)
res.emplace_back(region_input_stream);
}
Expand Down Expand Up @@ -671,7 +671,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
{
auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition(
data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read,
const_cast<Context &>(context), false, true, query_info.resolve_locks, query_info.read_tso);
false, true, query_info.resolve_locks, query_info.read_tso);
if (region_input_stream)
{
BlockInputStreamPtr version_filtered_stream = std::make_shared<VersionFilterBlockInputStream>(
Expand Down
39 changes: 19 additions & 20 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ extern const int LOGICAL_ERROR;
}

// TODO move to Settings.h
//static constexpr Int64 REGION_PERSIST_PERIOD = 60 * 1000 * 1000; // 1 minutes
//static constexpr Int64 KVSTORE_TRY_PERSIST_PERIOD = 10 * 1000 * 1000; // 10 seconds
static Seconds REGION_PERSIST_PERIOD(60); // 1 minutes
static Seconds KVSTORE_TRY_PERSIST_PERIOD(10); // 10 seconds

KVStore::KVStore(const std::string & data_dir, Context * context) : region_persister(data_dir), log(&Logger::get("KVStore"))
{
Expand Down Expand Up @@ -65,7 +65,7 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context)
}

if (tmt_ctx && old_region)
tmt_ctx->region_partition.removeRegion(old_region, *context);
tmt_ctx->region_partition.removeRegion(old_region);

region_persister.persist(region);

Expand All @@ -75,7 +75,7 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context)
}

if (tmt_ctx)
tmt_ctx->table_flushers.onPutTryFlush(region);
tmt_ctx->region_partition.applySnapshotRegion(region);
}

void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx)
Expand Down Expand Up @@ -134,7 +134,9 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
continue;
}

auto [new_region, split_regions, sync] = curr_region->onCommand(cmd, callback);
auto before_cache_bytes = curr_region->dataSize();

auto [new_region, split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback);

if (curr_region->isPendingRemove())
{
Expand All @@ -147,10 +149,11 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
continue;
}

// Persist current region and split regions, and mange data in partition
// Add to regions map so that queries can see them.

if (!split_regions.empty())
{
// Persist current region and split regions, and mange data in partition
// Add to regions map so that queries can see them.
// TODO: support atomic or idempotent operation.
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -168,23 +171,19 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
}

if (tmt_ctx)
{
tmt_ctx->region_partition.splitRegion(curr_region, split_regions, *context);

tmt_ctx->table_flushers.onPutTryFlush(curr_region);
for (const auto & region : split_regions)
tmt_ctx->table_flushers.onPutTryFlush(region);
}
tmt_ctx->region_partition.splitRegion(curr_region, split_regions);

region_persister.persist(curr_region);
for (const auto & region : split_regions)
region_persister.persist(region);
}
else if (sync)
else
{
if (tmt_ctx)
tmt_ctx->table_flushers.onPutTryFlush(curr_region);
region_persister.persist(curr_region);
tmt_ctx->region_partition.updateRegion(curr_region, before_cache_bytes, table_ids);

if (sync)
region_persister.persist(curr_region);
}

if (sync)
Expand Down Expand Up @@ -218,7 +217,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context)
{
std::lock_guard<std::mutex> lock(mutex);

Poco::Timestamp now;
Timepoint now = Clock::now();
if (now < (last_try_persist_time + KVSTORE_TRY_PERSIST_PERIOD))
return false;
last_try_persist_time = now;
Expand All @@ -229,7 +228,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context)
for (const auto & p : regions)
{
const auto region = p.second;
if (Poco::Timestamp() < (region->lastPersistTime() + REGION_PERSIST_PERIOD))
if (now < (region->lastPersistTime() + REGION_PERSIST_PERIOD))
continue;

persist_job = true;
Expand Down Expand Up @@ -263,7 +262,7 @@ void KVStore::removeRegion(RegionID region_id, Context * context)

region_persister.drop(region_id);
if (context)
context->getTMTContext().region_partition.removeRegion(region, *context);
context->getTMTContext().region_partition.removeRegion(region);
}

} // namespace DB
7 changes: 1 addition & 6 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@
#include <Storages/Transaction/Consistency.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/TMTTableFlusher.h>
#include <Storages/Transaction/TiKVKeyValue.h>


namespace DB
{
// TODO move to Settings.h
static constexpr Int64 REGION_PERSIST_PERIOD = 60 * 1000 * 1000; // 1 minutes
static constexpr Int64 KVSTORE_TRY_PERSIST_PERIOD = 10 * 1000 * 1000; // 10 seconds

/// TODO: brief design document.
class KVStore final : private boost::noncopyable
Expand All @@ -40,7 +36,6 @@ class KVStore final : private boost::noncopyable
// Currently we also trigger region files GC in it.
bool tryPersistAndReport(RaftContext & context);

// TODO: Value copy instead of value ref
// For test, please do NOT remove.
RegionMap & _regions() { return regions; }

Expand All @@ -54,7 +49,7 @@ class KVStore final : private boost::noncopyable
std::mutex mutex;

Consistency consistency;
Poco::Timestamp last_try_persist_time{};
Timepoint last_try_persist_time = Clock::now();

Logger * log;
};
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/LockException.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#pragma once

#include <Common/Exception.h>
#include <Storages/Transaction/Region.h>

namespace DB
{

class LockException : public Exception
{
public:
explicit LockException(Region::LockInfos && lock_infos_) : lock_infos(std::move(lock_infos_))
{}
explicit LockException(Region::LockInfos && lock_infos_) : lock_infos(std::move(lock_infos_)) {}

Region::LockInfos lock_infos;
};

}
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Storages/Transaction/PartitionDataMover.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include "Storages/Transaction/Region.h"
#include "Storages/Transaction/TiKVKeyValue.h"
#include <Interpreters/Context.h>

#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/StorageMergeTree.h>

namespace DB
{
Expand Down
Loading

0 comments on commit d6c407b

Please sign in to comment.