Skip to content

Commit

Permalink
Merge branch 'master' into cop
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexuany committed Sep 12, 2019
2 parents a9f9b48 + c92b972 commit 1ccfbd4
Show file tree
Hide file tree
Showing 36 changed files with 957 additions and 215 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_
if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(partition.id))
kvstore->removeRegion(e.first, &region_table);
kvstore->removeRegion(e.first, &region_table, kvstore->genTaskLock());
region_table.mockDropRegionsInTable(partition.id);
}
}
Expand All @@ -62,7 +62,7 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_
if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(table->id()))
kvstore->removeRegion(e.first, &region_table);
kvstore->removeRegion(e.first, &region_table, kvstore->genTaskLock());
region_table.mockDropRegionsInTable(table->id());
}

Expand Down
109 changes: 78 additions & 31 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,47 @@ KVStore::KVStore(const std::string & data_dir)

void KVStore::restore(const RegionClientCreateFunc & region_client_create)
{
std::lock_guard<std::mutex> lock(mutex());
auto task_lock = genTaskLock();
auto manage_lock = genRegionManageLock();

LOG_INFO(log, "start to restore regions");
region_persister.restore(regions(), const_cast<RegionClientCreateFunc *>(&region_client_create));
regionsMut() = region_persister.restore(const_cast<RegionClientCreateFunc *>(&region_client_create));

// init range index
for (const auto & region : regions())
region_range_index.add(region.second);

LOG_INFO(log, "restore regions done");

// Remove regions whose state = Tombstone, those regions still exist because progress crash after persisted and before removal.
{
std::vector<RegionID> regions_to_remove;
for (auto & p : regions())
{
RegionPtr & region = p.second;
const RegionPtr & region = p.second;
if (region->isPendingRemove())
regions_to_remove.push_back(region->id());
}
for (const auto region_id : regions_to_remove)
removeRegion(region_id, nullptr);
removeRegion(region_id, nullptr, task_lock);
}
}

RegionPtr KVStore::getRegion(const RegionID region_id) const
{
std::lock_guard<std::mutex> lock(mutex());
auto manage_lock = genRegionManageLock();
if (auto it = regions().find(region_id); it != regions().end())
return it->second;
return nullptr;
}

void KVStore::handleRegionsByRangeOverlap(
const RegionRange & range, std::function<void(RegionMap, const KVStoreTaskLock &)> && callback) const
{
auto task_lock = genTaskLock();
callback(region_range_index.findByRangeOverlap(range), task_lock);
}

const RegionManager::RegionTaskElement & RegionManager::getRegionTaskCtrl(const RegionID region_id) const
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -65,55 +79,76 @@ RegionTaskLock RegionManager::genRegionTaskLock(const RegionID region_id) const

size_t KVStore::regionSize() const
{
std::lock_guard<std::mutex> lock(mutex());
auto manage_lock = genRegionManageLock();
return regions().size();
}

void KVStore::traverseRegions(std::function<void(RegionID region_id, const RegionPtr & region)> && callback) const
void KVStore::traverseRegions(std::function<void(RegionID, const RegionPtr &)> && callback) const
{
std::lock_guard<std::mutex> lock(mutex());
auto manage_lock = genRegionManageLock();
for (auto it = regions().begin(); it != regions().end(); ++it)
callback(it->first, it->second);
}

bool KVStore::onSnapshot(RegionPtr new_region, Context * context)
bool KVStore::onSnapshot(RegionPtr new_region, Context * context, const RegionsAppliedindexMap & regions_to_check)
{
RegionID region_id = new_region->id();
{
auto region_lock = region_manager.genRegionTaskLock(region_id);
region_persister.persist(*new_region, region_lock);
}

{
auto task_lock = genTaskLock();
auto region_lock = region_manager.genRegionTaskLock(region_id);

RegionPtr old_region = getRegion(region_id);
if (old_region != nullptr)
for (const auto & region_info : regions_to_check)
{
UInt64 old_index = old_region->appliedIndex();
const auto & region = region_info.second.first;

LOG_DEBUG(log, "KVStore::onSnapshot previous " << old_region->toString(true) << " ; new " << new_region->toString(true));
if (old_index >= new_region->appliedIndex())
if (auto it = regions().find(region_info.first); it != regions().end())
{
if (it->second != region)
{
LOG_WARNING(log, "[onSnapshot] " << it->second->toString() << " instance changed");
return false;
}
if (region->appliedIndex() != region_info.second.second)
{
LOG_WARNING(log, "[onSnapshot] " << it->second->toString() << " instance changed");
return false;
}
}
else
{
LOG_INFO(log, "KVStore::onSnapshot discard new region because of index is outdated");
LOG_WARNING(log, "[onSnapshot] " << region->toString(false) << " not found");
return false;
}
}

RegionPtr old_region = getRegion(region_id);
if (old_region != nullptr)
{
LOG_DEBUG(log, "[onSnapshot] previous " << old_region->toString(true) << " ; new " << new_region->toString(true));
region_range_index.remove(old_region->makeRaftCommandDelegate(task_lock).getRange().comparableKeys(), region_id);
old_region->assignRegion(std::move(*new_region));
new_region = old_region;
}
else
{
std::lock_guard<std::mutex> lock(mutex());
regions().emplace(region_id, new_region);
auto manage_lock = genRegionManageLock();
regionsMut().emplace(region_id, new_region);
}

if (context)
context->getRaftService().addRegionToDecode(new_region);
region_range_index.add(new_region);
}

// if the operation about RegionTable is out of the protection of task_mutex, we should make sure that it can't delete any mapping relation.
if (context)
{
context->getRaftService().addRegionToDecode(new_region);
context->getTMTContext().getRegionTable().applySnapshotRegion(*new_region);
}

return true;
}
Expand Down Expand Up @@ -143,7 +178,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
const RegionPtr curr_region_ptr = getRegion(curr_region_id);
if (curr_region_ptr == nullptr)
{
LOG_WARNING(log, "[KVStore::onServiceCommand] [region " << curr_region_id << "] is not found, might be removed already");
LOG_WARNING(log, "[onServiceCommand] [region " << curr_region_id << "] is not found, might be removed already");
report_region_destroy(curr_region_id);
continue;
}
Expand All @@ -155,7 +190,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
{
LOG_INFO(log, "Try to remove " << curr_region.toString() << " because of tombstone.");
curr_region.setPendingRemove();
removeRegion(curr_region_id, region_table);
removeRegion(curr_region_id, region_table, task_lock);

report_region_destroy(curr_region_id);

Expand Down Expand Up @@ -189,11 +224,11 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex

const auto handle_batch_split = [&](Regions & split_regions) {
{
std::lock_guard<std::mutex> lock(mutex());
auto manage_lock = genRegionManageLock();

for (auto & new_region : split_regions)
{
auto [it, ok] = regions().emplace(new_region->id(), new_region);
auto [it, ok] = regionsMut().emplace(new_region->id(), new_region);
if (!ok)
{
// definitely, any region's index is greater or equal than the initial one.
Expand All @@ -205,6 +240,14 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
}
}

{
region_range_index.remove(result.range_before_split->comparableKeys(), curr_region_id);
region_range_index.add(curr_region_ptr);

for (auto & new_region : split_regions)
region_range_index.add(new_region);
}

{
// update region_table first is safe, because the core rule is established: the range in RegionTable
// is always >= range in KVStore.
Expand Down Expand Up @@ -241,7 +284,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
const auto handle_change_peer = [&]() {
if (curr_region.isPendingRemove())
{
removeRegion(curr_region_id, region_table);
removeRegion(curr_region_id, region_table, task_lock);
report_sync_log();
}
else
Expand Down Expand Up @@ -281,7 +324,7 @@ void KVStore::report(RaftContext & raft_ctx)

enginepb::CommandResponseBatch responseBatch;
{
std::lock_guard<std::mutex> lock(mutex());
auto manage_lock = genRegionManageLock();

if (regions().empty())
return;
Expand Down Expand Up @@ -344,16 +387,20 @@ bool KVStore::tryPersist(const Seconds kvstore_try_persist_period, const Seconds
return persist_job || gc_job;
}

void KVStore::removeRegion(const RegionID region_id, RegionTable * region_table)
void KVStore::removeRegion(const RegionID region_id, RegionTable * region_table, const KVStoreTaskLock & task_lock)
{
LOG_INFO(log, "Start to remove [region " << region_id << "]");

RegionPtr region;
{
std::lock_guard<std::mutex> lock(mutex());
auto manage_lock = genRegionManageLock();
auto it = regions().find(region_id);
region = it->second;
regions().erase(it);
regionsMut().erase(it);
}
{
// remove index
region_range_index.remove(region->makeRaftCommandDelegate(task_lock).getRange().comparableKeys(), region_id);
}

region_persister.drop(region_id);
Expand All @@ -366,15 +413,15 @@ void KVStore::removeRegion(const RegionID region_id, RegionTable * region_table)

void KVStore::updateRegionTableBySnapshot(RegionTable & region_table)
{
std::lock_guard<std::mutex> lock(mutex());
auto manage_lock = genRegionManageLock();
LOG_INFO(log, "start to update RegionTable by snapshot");
region_table.applySnapshotRegions(regions());
LOG_INFO(log, "update RegionTable done");
}

KVStoreTaskLock KVStore::genTaskLock() const { return KVStoreTaskLock(task_mutex); }
RegionMap & KVStore::regions() { return region_manager.regions; }
RegionMap & KVStore::regionsMut() { return region_manager.regions; }
const RegionMap & KVStore::regions() const { return region_manager.regions; }
std::mutex & KVStore::mutex() const { return region_manager.mutex; }
KVStore::RegionManageLock KVStore::genRegionManageLock() const { return RegionManageLock(region_manager.mutex); }

} // namespace DB
23 changes: 18 additions & 5 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Storages/Transaction/RegionClientCreate.h>
#include <Storages/Transaction/RegionManager.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/RegionsRangeIndex.h>

namespace DB
{
Expand All @@ -25,6 +26,7 @@ struct RaftCommandResult;
class KVStoreTaskLock;

struct MockTiDBTable;
struct TiKVRangeKey;

/// TODO: brief design document.
class KVStore final : private boost::noncopyable
Expand All @@ -35,9 +37,15 @@ class KVStore final : private boost::noncopyable

RegionPtr getRegion(const RegionID region_id) const;

void traverseRegions(std::function<void(RegionID region_id, const RegionPtr & region)> && callback) const;
using RegionsAppliedindexMap = std::unordered_map<RegionID, std::pair<RegionPtr, UInt64>>;
using RegionRange = std::pair<TiKVRangeKey, TiKVRangeKey>;
/// Get and callback all regions whose range overlapped with start/end key.
void handleRegionsByRangeOverlap(const RegionRange & range, std::function<void(RegionMap, const KVStoreTaskLock &)> && callback) const;

void traverseRegions(std::function<void(RegionID, const RegionPtr &)> && callback) const;

bool onSnapshot(RegionPtr new_region, Context * context, const RegionsAppliedindexMap & regions_to_check = {});

bool onSnapshot(RegionPtr new_region, Context * context);
// TODO: remove RaftContext and use Context + CommandServerReaderWriter
void onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContext & context);

Expand All @@ -58,12 +66,14 @@ class KVStore final : private boost::noncopyable
private:
friend class MockTiDB;
friend struct MockTiDBTable;
void removeRegion(const RegionID region_id, RegionTable * region_table);
void removeRegion(const RegionID region_id, RegionTable * region_table, const KVStoreTaskLock & task_lock);
KVStoreTaskLock genTaskLock() const;

RegionMap & regions();
using RegionManageLock = std::lock_guard<std::mutex>;
RegionManageLock genRegionManageLock() const;

RegionMap & regionsMut();
const RegionMap & regions() const;
std::mutex & mutex() const;

private:
RegionManager region_manager;
Expand All @@ -74,6 +84,9 @@ class KVStore final : private boost::noncopyable

// onServiceCommand and onSnapshot should not be called concurrently
mutable std::mutex task_mutex;
// region_range_index must be protected by task_mutex. It's used to search for region by range.
// region merge/split/apply-snapshot/remove will change the range.
RegionsRangeIndex region_range_index;

// raft_cmd_res stores the result of applying raft cmd. It must be protected by task_mutex.
std::unique_ptr<RaftCommandResult> raft_cmd_res;
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Transaction/RaftCommandResult.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <memory>

#include <Storages/Transaction/Types.h>

namespace DB
Expand All @@ -8,6 +10,9 @@ namespace DB
class Region;
using RegionPtr = std::shared_ptr<Region>;

class RegionRangeKeys;
using ImutRegionRangePtr = std::shared_ptr<const RegionRangeKeys>;

struct RaftCommandResult : private boost::noncopyable
{
enum Type
Expand All @@ -24,6 +29,7 @@ struct RaftCommandResult : private boost::noncopyable
Type type = Type::Default;
std::vector<RegionPtr> split_regions{};
TableIDSet table_ids{};
ImutRegionRangePtr range_before_split;
};

} // namespace DB
Loading

0 comments on commit 1ccfbd4

Please sign in to comment.