diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index a6a529f5000..2159b923008 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1410,12 +1410,12 @@ void Context::createTMTContext(const std::vector & pd_addrs, const std::string & learner_value, const std::unordered_set & ignore_databases, const std::string & kvstore_path, - const std::string & raft_service_address) + const std::string & flash_service_address) { auto lock = getLock(); if (shared->tmt_context) throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR); - shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, raft_service_address); + shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address); } void Context::initializePartPathSelector(std::vector && all_normal_path, std::vector && all_fast_path) diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 4cd89c64d17..2c88ddfc9b1 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -365,7 +365,7 @@ class Context const std::string & learner_value, const std::unordered_set & ignore_databases, const std::string & kvstore_path, - const std::string & raft_service_address); + const std::string & flash_service_address); RaftService & getRaftService(); void initializeSchemaSyncService(); diff --git a/dbms/src/Storages/Transaction/PDTiKVClient.cpp b/dbms/src/Storages/Transaction/PDTiKVClient.cpp index a66d87defd4..89f9807b22b 100644 --- a/dbms/src/Storages/Transaction/PDTiKVClient.cpp +++ b/dbms/src/Storages/Transaction/PDTiKVClient.cpp @@ -11,35 +11,15 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -std::string getIP(const std::string & address) -{ - if (address.size() == 0) - return ""; - size_t idx = address.find(":"); - if (idx == std::string::npos) - return ""; - auto ip = address.substr(0, idx); - return ip; -} - -// convertAddr converts host name to network address. -// We assume the converted net type is AF_NET. -std::string IndexReader::convertAddr(const std::string & address) -{ - if (address.size() == 0) - return ""; - auto socket_addr = DNSCache::instance().resolveHostAndPort(address); - std::string ip = socket_addr.host().toString(); - if (ip.size() == 0) - LOG_ERROR(log, "cannot resolve address: " << address); - return ip; -} - IndexReader::IndexReader(pingcap::kv::RegionCachePtr cache_, pingcap::kv::RpcClientPtr client_, const pingcap::kv::RegionVerID & id, - const std::string & suggested_address_) - : pingcap::kv::RegionClient(cache_, client_, id), suggested_address(suggested_address_), log(&Logger::get("pingcap.index_read")) + const std::string & suggested_ip_, + UInt16 suggested_port_) + : pingcap::kv::RegionClient(cache_, client_, id), + suggested_ip(suggested_ip_), + suggested_port(suggested_port_), + log(&Logger::get("pingcap.index_read")) {} int64_t IndexReader::getReadIndex() @@ -52,11 +32,10 @@ int64_t IndexReader::getReadIndex() { auto region = cache->getRegionByID(bo, region_id); const auto & learners = region->learners; - const std::string suggested_ip = getIP(suggested_address); std::vector candidate_learners; // By default, we should config true ip in our config file. // And we make sure that old config can also work. - if (suggested_ip.size() == 0 || suggested_ip == "0.0.0.0") + if (suggested_ip == "0.0.0.0") candidate_learners = learners; else { @@ -64,7 +43,21 @@ int64_t IndexReader::getReadIndex() for (const auto & learner : learners) { std::string addr = cache->getStore(bo, learner.store_id()).addr; - if (addr.size() > 0 && convertAddr(addr) == suggested_ip) + if (addr.empty()) + { + LOG_DEBUG(log, "learner address empty."); + continue; + } + // Assume net type of addr is AF_NET. + auto socket_addr = DNSCache::instance().resolveHostAndPort(addr); + std::string ip = socket_addr.host().toString(); + UInt16 port = socket_addr.port(); + if (ip.empty()) + { + LOG_WARNING(log, "cannot resolve address: " << addr); + continue; + } + if (ip == suggested_ip && port == suggested_port) { candidate_learners.push_back(learner); break; diff --git a/dbms/src/Storages/Transaction/PDTiKVClient.h b/dbms/src/Storages/Transaction/PDTiKVClient.h index 3e4e3947d14..b06a59108e8 100644 --- a/dbms/src/Storages/Transaction/PDTiKVClient.h +++ b/dbms/src/Storages/Transaction/PDTiKVClient.h @@ -2,7 +2,7 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" - +#include #include #include @@ -16,14 +16,16 @@ namespace DB struct IndexReader : public pingcap::kv::RegionClient { - std::string suggested_address; + std::string suggested_ip; + UInt16 suggested_port; Logger * log; IndexReader(pingcap::kv::RegionCachePtr cache_, pingcap::kv::RpcClientPtr client_, const pingcap::kv::RegionVerID & id, - const std::string & suggested_address_); + const std::string & suggested_ip, + UInt16 suggested_port); int64_t getReadIndex(); @@ -33,8 +35,6 @@ struct IndexReader : public pingcap::kv::RegionClient const std::vector & learners, pingcap::kv::RpcCallPtr rpc); - - std::string convertAddr(const std::string & address); }; using IndexReaderPtr = std::shared_ptr; diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index f0f5639a7cd..086b089a7d3 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -94,7 +94,8 @@ RegionPtr Region::splitInto(RegionMeta meta) if (index_reader != nullptr) { new_region = std::make_shared(std::move(meta), [&](pingcap::kv::RegionVerID ver_id) { - return std::make_shared(index_reader->cache, index_reader->client, ver_id, index_reader->suggested_address); + return std::make_shared( + index_reader->cache, index_reader->client, ver_id, index_reader->suggested_ip, index_reader->suggested_port); }); } else diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 88c2f440901..f1dd4e29606 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -12,7 +13,7 @@ namespace DB TMTContext::TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kvstore_path, - const std::string & raft_service_address_) + const std::string & flash_service_address_) : kvstore(std::make_shared(kvstore_path)), region_table(context), pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) @@ -23,7 +24,7 @@ TMTContext::TMTContext(Context & context, const std::vector & addrs schema_syncer(addrs.size() == 0 ? std::static_pointer_cast(std::make_shared>(pd_client, region_cache, rpc_client)) : std::static_pointer_cast(std::make_shared>(pd_client, region_cache, rpc_client))), - raft_service_address(raft_service_address_) + flash_service_address(flash_service_address_) {} void TMTContext::restore() @@ -64,8 +65,19 @@ pingcap::pd::ClientPtr TMTContext::getPDClient() const { return pd_client; } IndexReaderPtr TMTContext::createIndexReader(pingcap::kv::RegionVerID region_version_id) const { std::lock_guard lock(mutex); - return pd_client->isMock() ? nullptr - : std::make_shared(region_cache, rpc_client, region_version_id, raft_service_address); + if (pd_client->isMock()) + { + return nullptr; + } + // Assume net type of flash_service_address is AF_NET. + auto socket_addr = DNSCache::instance().resolveHostAndPort(flash_service_address); + std::string flash_service_ip = socket_addr.host().toString(); + UInt16 flash_service_port = socket_addr.port(); + if (flash_service_ip.empty()) + { + throw Exception("Cannot resolve flash service address " + flash_service_address, ErrorCodes::LOGICAL_ERROR); + } + return std::make_shared(region_cache, rpc_client, region_version_id, flash_service_ip, flash_service_port); } const std::unordered_set & TMTContext::getIgnoreDatabases() const { return ignore_databases; } diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index 60f506f80d5..571b0cceb0d 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -34,7 +34,7 @@ class TMTContext : private boost::noncopyable // TODO: get flusher args from config file explicit TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kv_store_path, - const std::string & regine_addr); + const std::string & flash_service_address_); SchemaSyncerPtr getSchemaSyncer() const; void setSchemaSyncer(SchemaSyncerPtr); @@ -63,7 +63,7 @@ class TMTContext : private boost::noncopyable const std::unordered_set ignore_databases; SchemaSyncerPtr schema_syncer; - String raft_service_address; + String flash_service_address; }; } // namespace DB