Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-298] Import delegate mode for Region and RegionMeta #220

Merged
merged 4 commits into from
Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir, region_manager), log(&Logger::get("KVStore")) {}
KVStore::KVStore(const std::string & data_dir)
: region_persister(data_dir, region_manager), raft_cmd_res(std::make_unique<RaftCommandResult>()), log(&Logger::get("KVStore"))
{}

void KVStore::restore(const RegionClientCreateFunc & region_client_create)
{
Expand Down Expand Up @@ -46,15 +48,20 @@ RegionPtr KVStore::getRegion(const RegionID region_id) const
return nullptr;
}

RegionManager::RegionTaskElementPtr RegionManager::getRegionTaskCtrl(const RegionID region_id) const
const RegionManager::RegionTaskElement & RegionManager::getRegionTaskCtrl(const RegionID region_id) const
{
std::lock_guard<std::mutex> lock(mutex);

auto & p = regions_ctrl[region_id];
return p ? p : (p = std::make_shared<RegionTaskElement>());
if (auto it = regions_ctrl.find(region_id); it != regions_ctrl.end())
return it->second;

return regions_ctrl.try_emplace(region_id).first->second;
}

RegionTaskLock RegionManager::genRegionTaskLock(const RegionID region_id) const { return RegionTaskLock(*getRegionTaskCtrl(region_id)); }
RegionTaskLock RegionManager::genRegionTaskLock(const RegionID region_id) const
{
return RegionTaskLock(getRegionTaskCtrl(region_id).mutex);
}

size_t KVStore::regionSize() const
{
Expand All @@ -77,16 +84,16 @@ bool KVStore::onSnapshot(RegionPtr new_region, Context * context)
region_persister.persist(*new_region, region_lock);
}
{
std::lock_guard<std::mutex> lock(task_mutex);
auto task_lock = genTaskLock();
auto region_lock = region_manager.genRegionTaskLock(region_id);

RegionPtr old_region = getRegion(region_id);
if (old_region != nullptr)
{
UInt64 old_index = old_region->getProbableIndex();
UInt64 old_index = old_region->appliedIndex();

LOG_DEBUG(log, "KVStore::onSnapshot previous " << old_region->toString(true) << " ; new " << new_region->toString(true));
if (old_index >= new_region->getProbableIndex())
if (old_index >= new_region->appliedIndex())
{
LOG_INFO(log, "KVStore::onSnapshot discard new region because of index is outdated");
return false;
Expand Down Expand Up @@ -126,7 +133,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
LOG_INFO(log, "Report [region " << region_id << "] destroyed");
};

std::lock_guard<std::mutex> lock(task_mutex);
auto task_lock = genTaskLock();

for (auto && cmd : *cmds.mutable_requests())
{
Expand Down Expand Up @@ -155,7 +162,8 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
continue;
}

RaftCommandResult result = curr_region.onCommand(std::move(cmd));
curr_region.makeRaftCommandDelegate(task_lock).onCommand(std::move(cmd), *this, region_table, *raft_cmd_res);
RaftCommandResult & result = *raft_cmd_res;

const auto region_report = [&]() { *(responseBatch.add_responses()) = curr_region.toCommandResponse(); };

Expand Down Expand Up @@ -269,7 +277,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex

void KVStore::report(RaftContext & raft_ctx)
{
std::lock_guard<std::mutex> lock(task_mutex);
auto lock = genTaskLock();

enginepb::CommandResponseBatch responseBatch;
{
Expand Down Expand Up @@ -364,6 +372,7 @@ void KVStore::updateRegionTableBySnapshot(RegionTable & region_table)
LOG_INFO(log, "update RegionTable done");
}

KVStoreTaskLock KVStore::genTaskLock() const { return KVStoreTaskLock(task_mutex); }
RegionMap & KVStore::regions() { return region_manager.regions; }
const RegionMap & KVStore::regions() const { return region_manager.regions; }
std::mutex & KVStore::mutex() const { return region_manager.mutex; }
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace DB
static const Seconds REGION_PERSIST_PERIOD(300); // 5 minutes
static const Seconds KVSTORE_TRY_PERSIST_PERIOD(180); // 3 minutes

class Context;

class KVStore;
using KVStorePtr = std::shared_ptr<KVStore>;

Expand All @@ -19,6 +21,8 @@ struct RaftContext;

class Region;
using RegionPtr = std::shared_ptr<Region>;
struct RaftCommandResult;
class KVStoreTaskLock;

struct MockTiDBTable;

Expand Down Expand Up @@ -51,6 +55,8 @@ class KVStore final : private boost::noncopyable

void updateRegionTableBySnapshot(RegionTable & region_table);

KVStoreTaskLock genTaskLock() const;

private:
friend class MockTiDB;
friend struct MockTiDBTable;
Expand All @@ -70,7 +76,18 @@ class KVStore final : private boost::noncopyable
// onServiceCommand and onSnapshot should not be called concurrently
mutable std::mutex task_mutex;

// raft_cmd_res stores the result of applying raft cmd. It must be protected by task_mutex.
std::unique_ptr<RaftCommandResult> raft_cmd_res;

Logger * log;
};

/// Encapsulation of lock guard of task mutex in KVStore
class KVStoreTaskLock : private boost::noncopyable
{
friend KVStoreTaskLock KVStore::genTaskLock() const;
KVStoreTaskLock(std::mutex & mutex_) : lock(mutex_) {}
std::lock_guard<std::mutex> lock;
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RaftCommandResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace DB
class Region;
using RegionPtr = std::shared_ptr<Region>;

struct RaftCommandResult
struct RaftCommandResult : private boost::noncopyable
{
enum Type
{
Expand Down
50 changes: 26 additions & 24 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ RegionDataReadInfo Region::readDataByWriteIt(const TableID & table_id, const Reg

LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts) const { return data.getLockInfo(expected_table_id, start_ts); }

TableID Region::insert(const std::string & cf, TiKVKey key, TiKVValue value)
TableID Region::insert(const std::string & cf, TiKVKey && key, TiKVValue && value)
{
std::unique_lock<std::shared_mutex> lock(mutex);
return doInsert(cf, std::move(key), std::move(value));
Expand Down Expand Up @@ -86,13 +86,7 @@ TableID Region::doRemove(const std::string & cf, const TiKVKey & key)
return table_id;
}

UInt64 Region::getIndex() const
{
std::shared_lock<std::shared_mutex> lock(mutex);
return meta.appliedIndex();
}

UInt64 Region::getProbableIndex() const { return meta.appliedIndex(); }
UInt64 Region::appliedIndex() const { return meta.appliedIndex(); }

RegionPtr Region::splitInto(RegionMeta meta)
{
Expand All @@ -111,17 +105,17 @@ RegionPtr Region::splitInto(RegionMeta meta)
return new_region;
}

void Region::execChangePeer(
void RegionRaftCommandDelegate::execChangePeer(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term)
{
const auto & change_peer_request = request.change_peer();

LOG_INFO(log, toString(false) << " execute change peer type: " << eraftpb::ConfChangeType_Name(change_peer_request.change_type()));

meta.execChangePeer(request, response, index, term);
meta.makeRaftCommandDelegate().execChangePeer(request, response, index, term);
}

Regions Region::execBatchSplit(
Regions RegionRaftCommandDelegate::execBatchSplit(
const raft_cmdpb::AdminRequest &, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term)
{
const auto & new_region_infos = response.splits().regions();
Expand Down Expand Up @@ -174,30 +168,30 @@ Regions Region::execBatchSplit(
return split_regions;
}

void Region::execCompactLog(
void RegionRaftCommandDelegate::execCompactLog(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term)
{
const auto & compact_log_request = request.compact_log();
LOG_INFO(log,
toString(false) << " execute compact log, compact_term: " << compact_log_request.compact_term()
<< ", compact_index: " << compact_log_request.compact_index());
const auto compact_index = compact_log_request.compact_index();
const auto compact_term = compact_log_request.compact_term();

LOG_INFO(log, toString(false) << " execute compact log, compact_term: " << compact_term << ", compact_index: " << compact_index);

meta.execCompactLog(request, response, index, term);
meta.makeRaftCommandDelegate().execCompactLog(request, response, index, term);
}

RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd)
void RegionRaftCommandDelegate::onCommand(enginepb::CommandRequest && cmd, const KVStore &, RegionTable *, RaftCommandResult & result)
{
const auto & header = cmd.header();
UInt64 term = header.term();
UInt64 index = header.index();
bool sync_log = header.sync_log();

RaftCommandResult result;
result.type = RaftCommandResult::Type::Default;
result.sync_log = sync_log;

{
auto applied_index = meta.appliedIndex();
if (index <= applied_index)
if (index <= appliedIndex())
{
result.type = RaftCommandResult::Type::IndexError;
if (term == 0 && index == 0)
Expand All @@ -206,7 +200,7 @@ RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd)
}
else
LOG_WARNING(log, toString() << " ignore outdated raft log [term: " << term << ", index: " << index << "]");
return result;
return;
}
}

Expand Down Expand Up @@ -361,8 +355,6 @@ RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd)

if (is_dirty)
incDirtyFlag();

return result;
}

std::tuple<size_t, UInt64> Region::serialize(WriteBuffer & buf) const
Expand Down Expand Up @@ -417,14 +409,16 @@ ColumnFamilyType Region::getCf(const std::string & cf)

RegionID Region::id() const { return meta.regionId(); }

bool Region::isPendingRemove() const { return meta.peerState() == raft_serverpb::PeerState::Tombstone; }
bool Region::isPendingRemove() const { return peerState() == raft_serverpb::PeerState::Tombstone; }

void Region::setPendingRemove()
{
meta.setPeerState(raft_serverpb::PeerState::Tombstone);
meta.notifyAll();
}

raft_serverpb::PeerState Region::peerState() const { return meta.peerState(); }

size_t Region::dataSize() const { return data.dataSize(); }

size_t Region::writeCFCount() const
Expand Down Expand Up @@ -591,6 +585,14 @@ void Region::compareAndCompleteSnapshot(HandleMap & handle_map, const TableID ta
LOG_INFO(log, "[compareAndCompleteSnapshot] add deleted gc: " << deleted_gc_cnt);
}

RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLock & lock)
{
static_assert(sizeof(RegionRaftCommandDelegate) == sizeof(Region));
// lock is useless, just to make sure the task mutex of KVStore is locked
std::ignore = lock;
return static_cast<RegionRaftCommandDelegate &>(*this);
}

void Region::compareAndCompleteSnapshot(const Timestamp safe_point, const Region & source_region)
{
const auto & [start_key, end_key] = getRange();
Expand Down
37 changes: 26 additions & 11 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ using RegionPtr = std::shared_ptr<Region>;
using Regions = std::vector<RegionPtr>;

struct RaftCommandResult;
class KVStore;
class RegionTable;
class RegionRaftCommandDelegate;
class KVStoreTaskLock;

/// Store all kv data of one region. Including 'write', 'data' and 'lock' column families.
/// TODO: currently the synchronize mechanism is broken and need to fix.
Expand Down Expand Up @@ -98,11 +102,9 @@ class Region : public std::enable_shared_from_this<Region>
: meta(std::move(meta_)), client(region_client_create(meta.getRegionVerID())), log(&Logger::get(log_name))
{}

TableID insert(const std::string & cf, TiKVKey key, TiKVValue value);
TableID insert(const std::string & cf, TiKVKey && key, TiKVValue && value);
TableID remove(const std::string & cf, const TiKVKey & key);

RaftCommandResult onCommand(enginepb::CommandRequest && cmd);

CommittedScanner createCommittedScanner(TableID expected_table_id);
CommittedRemover createCommittedRemover(TableID expected_table_id);

Expand All @@ -118,6 +120,7 @@ class Region : public std::enable_shared_from_this<Region>
bool isPendingRemove() const;
void setPendingRemove();
bool isPeerRemoved() const;
raft_serverpb::PeerState peerState() const;

size_t dataSize() const;
size_t writeCFCount() const;
Expand All @@ -141,8 +144,7 @@ class Region : public std::enable_shared_from_this<Region>

void waitIndex(UInt64 index);

UInt64 getIndex() const;
UInt64 getProbableIndex() const;
UInt64 appliedIndex() const;

RegionVersion version() const;
RegionVersion confVer() const;
Expand All @@ -163,11 +165,13 @@ class Region : public std::enable_shared_from_this<Region>
void compareAndCompleteSnapshot(const Timestamp safe_point, const Region & source_region);

static ColumnFamilyType getCf(const std::string & cf);
RegionRaftCommandDelegate & makeRaftCommandDelegate(const KVStoreTaskLock &);

void tryPreDecodeTiKVValue();

private:
Region() = delete;
friend class RegionRaftCommandDelegate;

// Private methods no need to lock mutex, normally

Expand All @@ -182,12 +186,6 @@ class Region : public std::enable_shared_from_this<Region>
LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) const;

RegionPtr splitInto(RegionMeta meta);
Regions execBatchSplit(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term);
void execChangePeer(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term);
void execCompactLog(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term);

private:
RegionData data;
Expand All @@ -205,4 +203,21 @@ class Region : public std::enable_shared_from_this<Region>
Logger * log;
};

class RegionRaftCommandDelegate : public Region, private boost::noncopyable
{
public:
/// Only after the task mutex of KVStore is locked, region can apply raft command.
void onCommand(enginepb::CommandRequest &&, const KVStore &, RegionTable *, RaftCommandResult &);

private:
RegionRaftCommandDelegate() = delete;

Regions execBatchSplit(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term);
void execChangePeer(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term);
void execCompactLog(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term);
};

} // namespace DB
Loading