Skip to content

Commit

Permalink
[FLASH-463]Flush regions into DM as soon as raft commands processed (#…
Browse files Browse the repository at this point in the history
…316)

* add configuration to disable background flush

* Add log to trace disable_bg_flush

* Add log for debug

* enable background KVStore persist

* judge whether if a region is dirty with result

* add time count for snapshot

* enable background flush task in tests

* Apply suggestions from code review

Co-Authored-By: JaySon <jayson.hjs@gmail.com>

* add disable_bg_flush to example configs
  • Loading branch information
leiysky authored Nov 12, 2019
1 parent 05fb49c commit 674317a
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 63 deletions.
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1411,12 +1411,13 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & flash_service_address,
::TiDB::StorageEngine engine)
::TiDB::StorageEngine engine,
bool disable_bg_flush)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address, engine);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address, engine, disable_bg_flush);
}

void Context::initializePartPathSelector(std::vector<std::string> && all_normal_path, std::vector<std::string> && all_fast_path)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ class Context
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & flash_service_address,
::TiDB::StorageEngine engine);
::TiDB::StorageEngine engine,
bool disable_bg_tasks);
RaftService & getRaftService();

void initializeSchemaSyncService();
Expand Down
70 changes: 43 additions & 27 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,40 @@ RaftService::RaftService(DB::Context & db_context_)
},
false);

table_flush_handle = background_pool.addTask([this] {
auto & tmt = db_context.getTMTContext();
RegionTable & region_table = tmt.getRegionTable();
if (!db_context.getTMTContext().disableBgFlush())
{

// if all regions of table is removed, try to optimize data.
if (auto table_id = region_table.popOneTableToOptimize(); table_id != InvalidTableID)
{
LOG_INFO(log, "try to final optimize table " << table_id);
tryOptimizeStorageFinal(db_context, table_id);
}
return region_table.tryFlushRegions();
});
table_flush_handle = background_pool.addTask([this] {
auto & tmt = db_context.getTMTContext();
RegionTable & region_table = tmt.getRegionTable();

region_flush_handle = background_pool.addTask([this] {
RegionID region_id;
{
std::lock_guard<std::mutex> lock(region_mutex);
if (regions_to_flush.empty())
return false;
region_id = regions_to_flush.front();
regions_to_flush.pop();
}
RegionTable & region_table = db_context.getTMTContext().getRegionTable();
region_table.tryFlushRegion(region_id);
return true;
});
// if all regions of table is removed, try to optimize data.
if (auto table_id = region_table.popOneTableToOptimize(); table_id != InvalidTableID)
{
LOG_INFO(log, "try to final optimize table " << table_id);
tryOptimizeStorageFinal(db_context, table_id);
}
return region_table.tryFlushRegions();
});

region_flush_handle = background_pool.addTask([this] {
RegionID region_id;
{
std::lock_guard<std::mutex> lock(region_mutex);
if (regions_to_flush.empty())
return false;
region_id = regions_to_flush.front();
regions_to_flush.pop();
}
RegionTable & region_table = db_context.getTMTContext().getRegionTable();
region_table.tryFlushRegion(region_id);
return true;
});
}
else
{
LOG_INFO(log, "Configuration raft.disable_bg_flush is set to true, background flush tasks are disabled.");
}

region_decode_handle = background_pool.addTask([this] {
RegionPtr region;
Expand Down Expand Up @@ -85,11 +93,19 @@ RaftService::RaftService(DB::Context & db_context_)

void RaftService::addRegionToFlush(const Region & region)
{
if (!db_context.getTMTContext().disableBgFlush())
{
std::lock_guard<std::mutex> lock(region_mutex);
regions_to_flush.push(region.id());
{
std::lock_guard<std::mutex> lock(region_mutex);
regions_to_flush.push(region.id());
}
region_flush_handle->wake();
}
else
{
auto & region_table = db_context.getTMTContext().getRegionTable();
region_table.tryFlushRegion(region.id());
}
region_flush_handle->wake();
}

void RaftService::addRegionToDecode(const RegionPtr & region)
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::string kvstore_path = path + "kvstore/";
String flash_server_addr = config().getString("flash.service_addr", "0.0.0.0:3930");

bool disable_bg_flush = false;

::TiDB::StorageEngine engine_if_empty = ::TiDB::StorageEngine::TMT;
::TiDB::StorageEngine engine = engine_if_empty;

Expand Down Expand Up @@ -422,12 +424,19 @@ int Server::main(const std::vector<std::string> & /*args*/)
else
engine = engine_if_empty;
}

if (config().has("raft.disable_bg_flush"))
{
bool disable = config().getBool("raft.disable_bg_flush");
if (disable)
disable_bg_flush = true;
}
}

{
LOG_DEBUG(log, "Default storage engine: " << static_cast<Int64>(engine));
/// create TMTContext
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr, engine);
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr, engine, disable_bg_flush);
global_context->getTMTContext().getRegionTable().setTableCheckerThreshold(config().getDouble("flash.overlap_threshold", 0.9));
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@
<kvstore_path>/var/lib/clickhouse/kvstore</kvstore_path>
<regmap>/var/lib/clickhouse/regmap</regmap>
<pd_addr>http://127.0.0.1:13579</pd_addr>
<disable_bg_flush>false</disable_bg_flush>
</raft>

<flash>
Expand Down
60 changes: 32 additions & 28 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,40 +365,44 @@ inline void doLearnerRead(const TiDB::TableID table_id, //
LOG_DEBUG(log,
"[Learner Read] wait index cost " << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data
start_time = Clock::now();
std::set<RegionID> regions_flushing_in_bg_threads;
auto & region_table = tmt.getRegionTable();
for (auto && [region_id, region] : kvstore_region)
{
(void)region;
bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false);
// If region is flushing by other bg threads, we should mark those regions to wait.
if (!is_flushed)
{
regions_flushing_in_bg_threads.insert(region_id);
LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread.");
}
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of " << kvstore_region.size()
<< " to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// Maybe there is some data not flush to store yet, we should wait till all regions is flushed.
if (!regions_flushing_in_bg_threads.empty())
/// If `disable_bg_flush` is true, we don't need to do both flushing KVStore or waiting for background tasks.
if (!tmt.disableBgFlush())
{
// After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data
start_time = Clock::now();
for (const auto & region_id : regions_flushing_in_bg_threads)
std::set<RegionID> regions_flushing_in_bg_threads;
auto & region_table = tmt.getRegionTable();
for (auto && [region_id, region] : kvstore_region)
{
region_table.waitTillRegionFlushed(region_id);
(void)region;
bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false);
// If region is flushing by other bg threads, we should mark those regions to wait.
if (!is_flushed)
{
regions_flushing_in_bg_threads.insert(region_id);
LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread.");
}
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count()
<< " ms");
"[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of "
<< kvstore_region.size() << " to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// Maybe there is some data not flush to store yet, we should wait till all regions is flushed.
if (!regions_flushing_in_bg_threads.empty())
{
start_time = Clock::now();
for (const auto & region_id : regions_flushing_in_bg_threads)
{
region_table.waitTillRegionFlushed(region_id);
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count()
<< " ms");
}
}
}

Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include <chrono>

#include <Interpreters/Context.h>
#include <Raft/RaftContext.h>
#include <Raft/RaftService.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/RaftCommandResult.h>
#include <Storages/Transaction/Region.h>
Expand Down Expand Up @@ -148,6 +151,16 @@ bool KVStore::onSnapshot(RegionPtr new_region, Context * context, const RegionsA
{
context->getRaftService().addRegionToDecode(new_region);
context->getTMTContext().getRegionTable().applySnapshotRegion(*new_region);
if (context->getTMTContext().disableBgFlush())
{
auto s_time = Clock::now();
context->getTMTContext().getRegionTable().tryFlushRegion(new_region->id());
auto e_time = Clock::now();
LOG_DEBUG(log,
"[syncFlush] Apply snapshot for region " << new_region->id() << ", cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(e_time - s_time).count()
<< "ms");
}
}

return true;
Expand All @@ -170,6 +183,9 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex

auto task_lock = genTaskLock();

TableIDSet tables_to_flush;
std::unordered_set<RegionID> dirty_regions;

for (auto && cmd : *cmds.mutable_requests())
{
const auto & header = cmd.header();
Expand Down Expand Up @@ -200,6 +216,17 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
curr_region.makeRaftCommandDelegate(task_lock).onCommand(std::move(cmd), *this, region_table, *raft_cmd_res);
RaftCommandResult & result = *raft_cmd_res;

if (tmt_context != nullptr && tmt_context->disableBgFlush())
{
for (auto id : result.table_ids)
{
tables_to_flush.emplace(id);
}

if (!result.table_ids.empty())
dirty_regions.emplace(curr_region_id);
}

const auto region_report = [&]() { *(responseBatch.add_responses()) = curr_region.toCommandResponse(); };

const auto report_sync_log = [&]() {
Expand Down Expand Up @@ -314,6 +341,28 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
}
}

if (tmt_context != nullptr && tmt_context->disableBgFlush())
{
auto & region_table = tmt_context->getRegionTable();
for (auto table_id : tables_to_flush)
{
auto s_time = Clock::now();
auto regions_to_flush = region_table.getRegionsByTable(table_id);
for (auto region : regions_to_flush)
{
if (auto && itr = dirty_regions.find(region.first); itr != dirty_regions.end())
{
region_table.tryFlushRegion(region.first, table_id, false);
}
}
auto e_time = Clock::now();
LOG_DEBUG(log,
"[syncFlush]"
<< " table_id " << table_id << ", cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(e_time - s_time).count() << "ms");
}
}

if (responseBatch.responses_size())
raft_ctx.send(responseBatch);
}
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace DB
TMTContext::TMTContext(Context & context, const std::vector<std::string> & addrs, const std::string & learner_key,
const std::string & learner_value, const std::unordered_set<std::string> & ignore_databases_, const std::string & kvstore_path,
const std::string & flash_service_address_,
::TiDB::StorageEngine engine_)
::TiDB::StorageEngine engine_,
bool disable_bg_flush_)
: kvstore(std::make_shared<KVStore>(kvstore_path)),
region_table(context),
pd_client(addrs.size() == 0 ? static_cast<pingcap::pd::IClient *>(new pingcap::pd::MockPDClient())
Expand All @@ -26,7 +27,8 @@ TMTContext::TMTContext(Context & context, const std::vector<std::string> & addrs
? std::static_pointer_cast<SchemaSyncer>(std::make_shared<TiDBSchemaSyncer<true>>(pd_client, region_cache, rpc_client))
: std::static_pointer_cast<SchemaSyncer>(std::make_shared<TiDBSchemaSyncer<false>>(pd_client, region_cache, rpc_client))),
flash_service_address(flash_service_address_),
engine(engine_)
engine(engine_),
disable_bg_flush(disable_bg_flush_)
{}

void TMTContext::restore()
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ class TMTContext : private boost::noncopyable

bool isInitialized() const;

bool disableBgFlush() const { return disable_bg_flush; }

// TODO: get flusher args from config file
explicit TMTContext(Context & context, const std::vector<std::string> & addrs, const std::string & learner_key,
const std::string & learner_value, const std::unordered_set<std::string> & ignore_databases_, const std::string & kv_store_path,
const std::string & flash_service_address_,
TiDB::StorageEngine engine_);
TiDB::StorageEngine engine_,
bool disable_bg_flush_);

SchemaSyncerPtr getSchemaSyncer() const;
void setSchemaSyncer(SchemaSyncerPtr);
Expand Down Expand Up @@ -68,6 +71,8 @@ class TMTContext : private boost::noncopyable

String flash_service_address;
::TiDB::StorageEngine engine;

bool disable_bg_flush;
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/test_utils/TiflashTestBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TiFlashTestEnv
}
catch (Exception & e)
{
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "", TiDB::StorageEngine::TMT);
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "", TiDB::StorageEngine::TMT, false);
context.getTMTContext().restore();
}
context.getSettingsRef() = settings;
Expand Down
1 change: 1 addition & 0 deletions tests/docker/config/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<ignore_databases>system</ignore_databases>
<learner_key>engine</learner_key>
<learner_value>tiflash</learner_value>
<disable_bg_flush>false</disable_bg_flush>
</raft>

<flash>
Expand Down
1 change: 1 addition & 0 deletions tests/docker/config/tiflash.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<learner_value>tiflash</learner_value>
<!--specify what engine we use. tmt or dm -->
<storage_engine>tmt</storage_engine>
<disable_bg_flush>false</disable_bg_flush>
</raft>

<flash>
Expand Down

0 comments on commit 674317a

Please sign in to comment.