Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flash-556] choose learner address by config address hint #284

Merged
merged 11 commits into from
Oct 18, 2019
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1409,12 +1409,13 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::string & learner_key,
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path)
const std::string & kvstore_path,
const std::string & raft_service_address)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, raft_service_address);
}

void Context::initializePartPathSelector(std::vector<std::string> && all_normal_path, std::vector<std::string> && all_fast_path)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class Context
const std::string & learner_key,
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path);
const std::string & kvstore_path,
const std::string & raft_service_address);
RaftService & getRaftService();

void initializeSchemaSyncService();
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::string learner_value;
std::unordered_set<std::string> ignore_databases{"system"};
std::string kvstore_path = path + "kvstore/";
/// Then, startup grpc server to serve raft and/or flash services.
hanfei1991 marked this conversation as resolved.
Show resolved Hide resolved
String flash_server_addr = config().getString("flash.service_addr", "0.0.0.0:3930");

if (config().has("raft"))
{
Expand Down Expand Up @@ -408,7 +410,7 @@ int Server::main(const std::vector<std::string> & /*args*/)

{
/// create TMTContext
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path);
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr);
}

/// Then, load remaining databases
Expand Down Expand Up @@ -450,8 +452,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getTMTContext().restore();
}

/// Then, startup grpc server to serve raft and/or flash services.
String flash_server_addr = config().getString("flash.service_addr", "0.0.0.0:3930");
std::unique_ptr<FlashService> flash_service = nullptr;
std::unique_ptr<grpc::Server> flash_grpc_server = nullptr;
{
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/Transaction/IndexReaderCreate.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <functional>

#include <Storages/Transaction/PDTiKVClient.h>

namespace DB
{

using IndexReaderCreateFunc = std::function<IndexReaderPtr(pingcap::kv::RegionVerID)>;

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ KVStore::KVStore(const std::string & data_dir)
: region_persister(data_dir, region_manager), raft_cmd_res(std::make_unique<RaftCommandResult>()), log(&Logger::get("KVStore"))
{}

void KVStore::restore(const RegionClientCreateFunc & region_client_create)
void KVStore::restore(const IndexReaderCreateFunc & index_reader_create)
{
auto task_lock = genTaskLock();
auto manage_lock = genRegionManageLock();

LOG_INFO(log, "start to restore regions");
regionsMut() = region_persister.restore(const_cast<RegionClientCreateFunc *>(&region_client_create));
regionsMut() = region_persister.restore(const_cast<IndexReaderCreateFunc *>(&index_reader_create));

// init range index
for (const auto & region : regions())
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <Storages/Transaction/RegionClientCreate.h>
#include <Storages/Transaction/IndexReaderCreate.h>
#include <Storages/Transaction/RegionManager.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/RegionsRangeIndex.h>
Expand Down Expand Up @@ -33,7 +33,7 @@ class KVStore final : private boost::noncopyable
{
public:
KVStore(const std::string & data_dir);
void restore(const RegionClientCreateFunc & region_client_create);
void restore(const IndexReaderCreateFunc & index_reader_create);

RegionPtr getRegion(const RegionID region_id) const;

Expand Down
118 changes: 118 additions & 0 deletions dbms/src/Storages/Transaction/PDTiKVClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#include <Storages/Transaction/PDTiKVClient.h>

#include <Common/Exception.h>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

std::string getIP(const std::string & address)
{
if (address == "")
return "";
size_t idx = address.find(":");
if (idx == std::string::npos)
return "";
auto ip = address.substr(0, idx);
return ip;
}


int64_t IndexReader::getReadIndex()
{
auto request = new kvrpcpb::ReadIndexRequest();
pingcap::kv::Backoffer bo(pingcap::kv::readIndexMaxBackoff);
auto rpc_call = std::make_shared<pingcap::kv::RpcCall<kvrpcpb::ReadIndexRequest>>(request);

auto region = cache->getRegionByID(bo, region_id);
const auto & learners = region->learners;
const std::string suggested_ip = getIP(suggested_address);
std::vector<metapb::Peer> candidate_learners;
// By default, we should config true ip in our config file.
// And we make sure that old config can also work.
if (suggested_ip.size() == 0 || suggested_ip == "0.0.0.0")
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
candidate_learners = std::move(learners);
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
else
{
for (const auto learner : learners)
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
{
std::string addr = cache->getStoreAddr(bo, learner.store_id());
if (addr.size() > 0 && getIP(addr) == suggested_ip)
{
candidate_learners.push_back(learner);
break;
}
}
}

// If we don't find a local learner, we should not send request to a remote learner.
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
if (candidate_learners.size() == 0)
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
throw Exception("Cannot find store ip " + suggested_ip + " in region peers, region_id is " + std::to_string(region_id.id)
+ ", maybe rngine is down",
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
ErrorCodes::LOGICAL_ERROR);

for (;;)
{
try
{
getReadIndexFromLearners(bo, region->meta, candidate_learners, rpc_call);
return rpc_call->getResp()->read_index();
}
catch (const pingcap::Exception & e)
{
// all stores are failed, so we need drop the region.
cache->dropRegion(region_id);
bo.backoff(pingcap::kv::boTiKVRPC, e);
}
}
}

void IndexReader::getReadIndexFromLearners(pingcap::kv::Backoffer & bo,
const metapb::Region & meta,
const std::vector<metapb::Peer> & learners,
pingcap::kv::RpcCallPtr<kvrpcpb::ReadIndexRequest>
rpc)
{
for (const auto learner : learners)
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
{
std::string addr = cache->getStoreAddr(bo, learner.store_id());
if (addr == "")
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
{
bo.backoff(pingcap::kv::boRegionMiss,
pingcap::Exception(
"miss store, region id is: " + std::to_string(region_id.id) + " store id is: " + std::to_string(learner.store_id()),
pingcap::ErrorCodes::StoreNotReady));
cache->dropStore(learner.store_id());
continue;
}
auto ctx = std::make_shared<pingcap::kv::RPCContext>(region_id, meta, learner, addr);
rpc->setCtx(ctx);
try
{
client->sendRequest(addr, rpc);
}
catch (const pingcap::Exception & e)
{
// only drop this store. and retry again!
cache->dropStore(learner.store_id());
continue;
}

auto resp = rpc->getResp();
if (resp->has_region_error())
{
onRegionError(bo, ctx, resp->region_error());
}
else
{
return;
}
}
}


} // namespace DB
34 changes: 32 additions & 2 deletions dbms/src/Storages/Transaction/PDTiKVClient.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
#pragma once

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"

#include <pingcap/kv/RegionClient.h>
#include <pingcap/pd/IClient.h>
#include <pingcap/kv/Backoff.h>

#pragma GCC diagnostic pop


namespace DB
{

// TODO:: Implement Read Index Client;

struct IndexReader : public pingcap::kv::RegionClient
{
const std::string & suggested_address;

IndexReader(pingcap::kv::RegionCachePtr cache_,
pingcap::kv::RpcClientPtr client_,
const pingcap::kv::RegionVerID & id,
const std::string & suggested_address_)
: pingcap::kv::RegionClient(cache_, client_, id), suggested_address(suggested_address_)
{}

int64_t getReadIndex();

private:
void getReadIndexFromLearners(pingcap::kv::Backoffer & bo,
const metapb::Region & meta,
const std::vector<metapb::Peer> & learners,
pingcap::kv::RpcCallPtr<kvrpcpb::ReadIndexRequest>
rpc);
};

using IndexReaderPtr = std::shared_ptr<IndexReader>;

struct PDClientHelper
{
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <memory>

#include <Storages/Transaction/PDTiKVClient.h>
#include <Storages/Transaction/RaftCommandResult.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TiKVRange.h>
#include <pingcap/kv/RegionClient.h>
#include <Storages/Transaction/RegionHelper.hpp>

namespace DB
Expand Down Expand Up @@ -91,10 +91,10 @@ UInt64 Region::appliedIndex() const { return meta.appliedIndex(); }
RegionPtr Region::splitInto(RegionMeta meta)
{
RegionPtr new_region;
if (client != nullptr)
if (index_reader != nullptr)
{
new_region = std::make_shared<Region>(std::move(meta), [&](pingcap::kv::RegionVerID ver_id) {
return std::make_shared<pingcap::kv::RegionClient>(client->cache, client->client, ver_id);
return std::make_shared<IndexReader>(index_reader->cache, index_reader->client, ver_id, index_reader->suggested_address);
});
}
else
Expand Down Expand Up @@ -379,16 +379,16 @@ std::tuple<size_t, UInt64> Region::serialize(WriteBuffer & buf) const
return {total_size, applied_index};
}

RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create)
RegionPtr Region::deserialize(ReadBuffer & buf, const IndexReaderCreateFunc * index_reader_create)
{
auto version = readBinary2<UInt32>(buf);
if (version != Region::CURRENT_VERSION)
throw Exception(
"[Region::deserialize] unexpected version: " + DB::toString(version) + ", expected: " + DB::toString(CURRENT_VERSION),
ErrorCodes::UNKNOWN_FORMAT_VERSION);

auto region = region_client_create == nullptr ? std::make_shared<Region>(RegionMeta::deserialize(buf))
: std::make_shared<Region>(RegionMeta::deserialize(buf), *region_client_create);
auto region = index_reader_create == nullptr ? std::make_shared<Region>(RegionMeta::deserialize(buf))
: std::make_shared<Region>(RegionMeta::deserialize(buf), *index_reader_create);

RegionData::deserialize(buf, region->data);

Expand Down Expand Up @@ -474,14 +474,14 @@ ImutRegionRangePtr Region::getRange() const { return meta.getRange(); }

UInt64 Region::learnerRead()
{
if (client != nullptr)
return client->getReadIndex();
if (index_reader != nullptr)
return index_reader->getReadIndex();
return 0;
}

void Region::waitIndex(UInt64 index)
{
if (client != nullptr)
if (index_reader != nullptr)
{
if (!meta.checkIndex(index))
{
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <shared_mutex>

#include <Storages/Transaction/RegionClientCreate.h>
#include <Storages/Transaction/IndexReaderCreate.h>
#include <Storages/Transaction/RegionData.h>
#include <Storages/Transaction/RegionMeta.h>
#include <Storages/Transaction/TiKVKeyValue.h>
Expand Down Expand Up @@ -96,10 +96,10 @@ class Region : public std::enable_shared_from_this<Region>
};

public:
explicit Region(RegionMeta meta_) : meta(std::move(meta_)), client(nullptr), log(&Logger::get(log_name)) {}
explicit Region(RegionMeta meta_) : meta(std::move(meta_)), index_reader(nullptr), log(&Logger::get(log_name)) {}

explicit Region(RegionMeta meta_, const RegionClientCreateFunc & region_client_create)
: meta(std::move(meta_)), client(region_client_create(meta.getRegionVerID())), log(&Logger::get(log_name))
explicit Region(RegionMeta meta_, const IndexReaderCreateFunc & index_reader_create)
: meta(std::move(meta_)), index_reader(index_reader_create(meta.getRegionVerID())), log(&Logger::get(log_name))
{}

TableID insert(const std::string & cf, TiKVKey && key, TiKVValue && value);
Expand All @@ -109,7 +109,7 @@ class Region : public std::enable_shared_from_this<Region>
CommittedRemover createCommittedRemover(TableID expected_table_id);

std::tuple<size_t, UInt64> serialize(WriteBuffer & buf) const;
static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create = nullptr);
static RegionPtr deserialize(ReadBuffer & buf, const IndexReaderCreateFunc * index_reader_create = nullptr);

RegionID id() const;
ImutRegionRangePtr getRange() const;
Expand Down Expand Up @@ -197,7 +197,7 @@ class Region : public std::enable_shared_from_this<Region>

RegionMeta meta;

pingcap::kv::RegionClientPtr client;
IndexReaderPtr index_reader;

mutable std::atomic<Timepoint> last_persist_time = Clock::now();

Expand Down
15 changes: 0 additions & 15 deletions dbms/src/Storages/Transaction/RegionClientCreate.h

This file was deleted.

2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
page_storage.write(wb);
}

RegionMap RegionPersister::restore(RegionClientCreateFunc * func)
RegionMap RegionPersister::restore(IndexReaderCreateFunc * func)
{
RegionMap regions;
auto acceptor = [&](const Page & page) {
Expand Down
Loading