Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve hostname in tics config #311

Merged
merged 8 commits into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1410,12 +1410,12 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & raft_service_address)
const std::string & flash_service_address)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, raft_service_address);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address);
}

void Context::initializePartPathSelector(std::vector<std::string> && all_normal_path, std::vector<std::string> && all_fast_path)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ class Context
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & raft_service_address);
const std::string & flash_service_address);
RaftService & getRaftService();

void initializeSchemaSyncService();
Expand Down
51 changes: 22 additions & 29 deletions dbms/src/Storages/Transaction/PDTiKVClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,15 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

std::string getIP(const std::string & address)
{
if (address.size() == 0)
return "";
size_t idx = address.find(":");
if (idx == std::string::npos)
return "";
auto ip = address.substr(0, idx);
return ip;
}

// convertAddr converts host name to network address.
// We assume the converted net type is AF_NET.
std::string IndexReader::convertAddr(const std::string & address)
{
if (address.size() == 0)
return "";
auto socket_addr = DNSCache::instance().resolveHostAndPort(address);
std::string ip = socket_addr.host().toString();
if (ip.size() == 0)
LOG_ERROR(log, "cannot resolve address: " << address);
return ip;
}

IndexReader::IndexReader(pingcap::kv::RegionCachePtr cache_,
pingcap::kv::RpcClientPtr client_,
const pingcap::kv::RegionVerID & id,
const std::string & suggested_address_)
: pingcap::kv::RegionClient(cache_, client_, id), suggested_address(suggested_address_), log(&Logger::get("pingcap.index_read"))
const std::string & suggested_ip_,
UInt16 suggested_port_)
: pingcap::kv::RegionClient(cache_, client_, id),
suggested_ip(suggested_ip_),
suggested_port(suggested_port_),
log(&Logger::get("pingcap.index_read"))
{}

int64_t IndexReader::getReadIndex()
Expand All @@ -52,19 +32,32 @@ int64_t IndexReader::getReadIndex()
{
auto region = cache->getRegionByID(bo, region_id);
const auto & learners = region->learners;
const std::string suggested_ip = getIP(suggested_address);
std::vector<metapb::Peer> candidate_learners;
// By default, we should config true ip in our config file.
// And we make sure that old config can also work.
if (suggested_ip.size() == 0 || suggested_ip == "0.0.0.0")
if (suggested_ip == "0.0.0.0")
candidate_learners = learners;
else
{
// Try to iterate all learners in pd as no accurate IP specified in config thus I don't know who 'I' am, otherwise only try 'myself'
for (const auto & learner : learners)
{
std::string addr = cache->getStore(bo, learner.store_id()).addr;
if (addr.size() > 0 && convertAddr(addr) == suggested_ip)
if (addr.empty())
{
LOG_DEBUG(log, "learner address empty.");
continue;
}
// Assume net type of addr is AF_NET.
auto socket_addr = DNSCache::instance().resolveHostAndPort(addr);
std::string ip = socket_addr.host().toString();
UInt16 port = socket_addr.port();
if (ip.empty())
{
LOG_WARNING(log, "cannot resolve address: " << addr);
continue;
}
if (ip == suggested_ip && port == suggested_port)
{
candidate_learners.push_back(learner);
break;
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Transaction/PDTiKVClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"

#include <Core/Types.h>
#include <pingcap/kv/RegionClient.h>
#include <pingcap/pd/IClient.h>

Expand All @@ -16,14 +16,16 @@ namespace DB

struct IndexReader : public pingcap::kv::RegionClient
{
std::string suggested_address;
std::string suggested_ip;
UInt16 suggested_port;

Logger * log;

IndexReader(pingcap::kv::RegionCachePtr cache_,
pingcap::kv::RpcClientPtr client_,
const pingcap::kv::RegionVerID & id,
const std::string & suggested_address_);
const std::string & suggested_ip,
UInt16 suggested_port);

int64_t getReadIndex();

Expand All @@ -33,8 +35,6 @@ struct IndexReader : public pingcap::kv::RegionClient
const std::vector<metapb::Peer> & learners,
pingcap::kv::RpcCallPtr<kvrpcpb::ReadIndexRequest>
rpc);

std::string convertAddr(const std::string & address);
};

using IndexReaderPtr = std::shared_ptr<IndexReader>;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ RegionPtr Region::splitInto(RegionMeta meta)
if (index_reader != nullptr)
{
new_region = std::make_shared<Region>(std::move(meta), [&](pingcap::kv::RegionVerID ver_id) {
return std::make_shared<IndexReader>(index_reader->cache, index_reader->client, ver_id, index_reader->suggested_address);
return std::make_shared<IndexReader>(
index_reader->cache, index_reader->client, ver_id, index_reader->suggested_ip, index_reader->suggested_port);
});
}
else
Expand Down
20 changes: 16 additions & 4 deletions dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Common/DNSCache.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/RaftCommandResult.h>
Expand All @@ -12,7 +13,7 @@ namespace DB

TMTContext::TMTContext(Context & context, const std::vector<std::string> & addrs, const std::string & learner_key,
const std::string & learner_value, const std::unordered_set<std::string> & ignore_databases_, const std::string & kvstore_path,
const std::string & raft_service_address_)
const std::string & flash_service_address_)
: kvstore(std::make_shared<KVStore>(kvstore_path)),
region_table(context),
pd_client(addrs.size() == 0 ? static_cast<pingcap::pd::IClient *>(new pingcap::pd::MockPDClient())
Expand All @@ -23,7 +24,7 @@ TMTContext::TMTContext(Context & context, const std::vector<std::string> & addrs
schema_syncer(addrs.size() == 0
? std::static_pointer_cast<SchemaSyncer>(std::make_shared<TiDBSchemaSyncer<true>>(pd_client, region_cache, rpc_client))
: std::static_pointer_cast<SchemaSyncer>(std::make_shared<TiDBSchemaSyncer<false>>(pd_client, region_cache, rpc_client))),
raft_service_address(raft_service_address_)
flash_service_address(flash_service_address_)
{}

void TMTContext::restore()
Expand Down Expand Up @@ -64,8 +65,19 @@ pingcap::pd::ClientPtr TMTContext::getPDClient() const { return pd_client; }
IndexReaderPtr TMTContext::createIndexReader(pingcap::kv::RegionVerID region_version_id) const
{
std::lock_guard<std::mutex> lock(mutex);
return pd_client->isMock() ? nullptr
: std::make_shared<IndexReader>(region_cache, rpc_client, region_version_id, raft_service_address);
if (pd_client->isMock())
{
return nullptr;
}
// Assume net type of flash_service_address is AF_NET.
auto socket_addr = DNSCache::instance().resolveHostAndPort(flash_service_address);
std::string flash_service_ip = socket_addr.host().toString();
UInt16 flash_service_port = socket_addr.port();
if (flash_service_ip.empty())
{
throw Exception("Cannot resolve flash service address " + flash_service_address, ErrorCodes::LOGICAL_ERROR);
}
return std::make_shared<IndexReader>(region_cache, rpc_client, region_version_id, flash_service_ip, flash_service_port);
}

const std::unordered_set<std::string> & TMTContext::getIgnoreDatabases() const { return ignore_databases; }
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TMTContext : private boost::noncopyable
// TODO: get flusher args from config file
explicit TMTContext(Context & context, const std::vector<std::string> & addrs, const std::string & learner_key,
const std::string & learner_value, const std::unordered_set<std::string> & ignore_databases_, const std::string & kv_store_path,
const std::string & regine_addr);
const std::string & flash_service_address_);

SchemaSyncerPtr getSchemaSyncer() const;
void setSchemaSyncer(SchemaSyncerPtr);
Expand Down Expand Up @@ -63,7 +63,7 @@ class TMTContext : private boost::noncopyable
const std::unordered_set<std::string> ignore_databases;
SchemaSyncerPtr schema_syncer;

String raft_service_address;
String flash_service_address;
};

} // namespace DB