diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index 2855ab20f..4b013a7e5 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -13,6 +13,7 @@ DEFINE_int32(listener_commit_interval_secs, 1, "Listener commit interval"); DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit"); DEFINE_int32(ft_request_retry_times, 3, "Retry times if fulltext request failed"); DEFINE_int32(ft_bulk_batch_size, 100, "Max batch size when bulk insert"); +DEFINE_int32(listener_pursue_leader_threshold, 1000, "Catch up with the leader's threshold"); namespace nebula { namespace kvstore { @@ -213,6 +214,9 @@ void Listener::doApply() { persist(committedLogId_, term_, lastApplyLogId_); VLOG(1) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_; lastApplyTime_ = time::WallClock::fastNowInMilliSec(); + VLOG(1) << folly::sformat("Commit snapshot to : committedLogId={}," + "committedLogTerm={}, lastApplyLogId_={}", + committedLogId_, term_, lastApplyLogId_); } }); } @@ -243,9 +247,35 @@ std::pair Listener::commitSnapshot(const std::vector g(raftLock_); + reset(); + VLOG(1) << folly::sformat("The listener has been reset : leaderCommitId={}," + "proposedTerm={}, lastLogTerm={}, term={}," + "lastApplyLogId={}", + leaderCommitId_, proposedTerm_, lastLogTerm_, + term_, lastApplyLogId_); +} + +bool Listener::pursueLeaderDone() { + std::lock_guard g(raftLock_); + if (status_ != Status::RUNNING) { + return false; + } + // if the leaderCommitId_ and lastApplyLogId_ all are 0. means the snapshot gap. + if (leaderCommitId_ == 0 && lastApplyLogId_ == 0) { + return false; + } + VLOG(1) << folly::sformat("pursue leader : leaderCommitId={}, lastApplyLogId_={}", + leaderCommitId_, lastApplyLogId_); + return (leaderCommitId_ - lastApplyLogId_) <= FLAGS_listener_pursue_leader_threshold; +} } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 32cd4740d..e1d6eeb8d 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -111,15 +111,15 @@ class Listener : public raftex::RaftPart { } void cleanup() override { + CHECK(!raftLock_.try_lock()); leaderCommitId_ = 0; lastApplyLogId_ = 0; persist(0, 0, lastApplyLogId_); } - void resetListener() { - std::lock_guard g(raftLock_); - reset(); - } + void resetListener(); + + bool pursueLeaderDone(); protected: virtual void init() = 0; diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index be3e0d9e9..392909c09 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -1043,6 +1043,16 @@ NebulaStore::space(GraphSpaceID spaceId) { return it->second; } +ErrorOr> +NebulaStore::spaceListener(GraphSpaceID spaceId) { + folly::RWSpinLock::ReadHolder rh(&lock_); + auto it = spaceListeners_.find(spaceId); + if (UNLIKELY(it == spaceListeners_.end())) { + return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND; + } + return it->second; +} + int32_t NebulaStore::allLeader( std::unordered_map>& leaderIds) { folly::RWSpinLock::ReadHolder rh(&lock_); diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index a3a7f5dc0..1cc43cbf0 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -256,6 +256,9 @@ class NebulaStore : public KVStore, public Handler { ErrorOr> space(GraphSpaceID spaceId); + ErrorOr> + spaceListener(GraphSpaceID spaceId); + /** * Implement four interfaces in Handler. * */ diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index a542e3933..ea9ba0b25 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -47,9 +47,6 @@ class ESListener : public Listener { LogID lastApplyLogId() override; - void cleanup() override { - } - private: bool writeAppliedId(LogID lastId, TermID lastTerm, LogID lastApplyLogId); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 2fe5bb35b..919934c4b 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -72,6 +72,7 @@ nebula_add_library( processors/jobMan/RebuildJobExecutor.cpp processors/jobMan/RebuildTagJobExecutor.cpp processors/jobMan/RebuildEdgeJobExecutor.cpp + processors/jobMan/RebuildFTJobExecutor.cpp processors/jobMan/StatisJobExecutor.cpp processors/jobMan/GetStatisProcessor.cpp processors/jobMan/ListTagIndexStatusProcessor.cpp diff --git a/src/meta/processors/jobMan/MetaJobExecutor.cpp b/src/meta/processors/jobMan/MetaJobExecutor.cpp index a783902fd..e62a729be 100644 --- a/src/meta/processors/jobMan/MetaJobExecutor.cpp +++ b/src/meta/processors/jobMan/MetaJobExecutor.cpp @@ -16,11 +16,13 @@ #include "meta/processors/jobMan/MetaJobExecutor.h" #include "meta/processors/jobMan/RebuildTagJobExecutor.h" #include "meta/processors/jobMan/RebuildEdgeJobExecutor.h" +#include "meta/processors/jobMan/RebuildFTJobExecutor.h" #include "meta/processors/jobMan/StatisJobExecutor.h" #include "meta/processors/jobMan/TaskDescription.h" #include "utils/Utils.h" DECLARE_int32(heartbeat_interval_secs); +DECLARE_uint32(expired_time_factor); namespace nebula { namespace meta { @@ -55,6 +57,12 @@ MetaJobExecutorFactory::createMetaJobExecutor(const JobDescription& jd, client, jd.getParas())); break; + case cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: + ret.reset(new RebuildFTJobExecutor(jd.getJobId(), + store, + client, + jd.getParas())); + break; case cpp2::AdminCmd::STATS: ret.reset(new StatisJobExecutor(jd.getJobId(), store, @@ -138,12 +146,65 @@ ErrOrHosts MetaJobExecutor::getLeaderHost(GraphSpaceID space) { return hosts; } +ErrOrHosts MetaJobExecutor::getListenerHost(GraphSpaceID space, cpp2::ListenerType type) { + const auto& prefix = MetaServiceUtils::listenerPrefix(space, type); + std::unique_ptr iter; + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Get space " << space << "'s listener failed, error: " + << apache::thrift::util::enumNameSafe(ret); + return ret; + } + + auto activeHostsRet = ActiveHostsMan::getActiveHosts( + kvstore_, + FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor, + cpp2::HostRole::LISTENER); + if (!nebula::ok(activeHostsRet)) { + return nebula::error(activeHostsRet); + } + + auto activeHosts = std::move(nebula::value(activeHostsRet)); + std::vector>> hosts; + + while (iter->valid()) { + auto host = MetaServiceUtils::deserializeHostAddr(iter->val()); + auto part = MetaServiceUtils::parseListenerPart(iter->key()); + if (std::find(activeHosts.begin(), activeHosts.end(), host) == activeHosts.end()) { + LOG(ERROR) << "Invalid host : " << network::NetworkUtils::toHostsStr({host}); + return nebula::cpp2::ErrorCode::E_INVALID_HOST; + } + auto it = std::find_if(hosts.begin(), hosts.end(), [&host](auto& item){ + return item.first == host; + }); + if (it == hosts.end()) { + hosts.emplace_back(std::make_pair(host, std::vector{part})); + } else { + it->second.emplace_back(part); + } + iter->next(); + } + if (hosts.empty()) { + return nebula::cpp2::ErrorCode::E_LISTENER_NOT_FOUND; + } + return hosts; +} + nebula::cpp2::ErrorCode MetaJobExecutor::execute() { ErrOrHosts addressesRet; - if (toLeader_) { - addressesRet = getLeaderHost(space_); - } else { - addressesRet = getTargetHost(space_); + switch (toHost_) { + case TargetHosts::LEADER: { + addressesRet = getLeaderHost(space_); + break; + } + case TargetHosts::LISTENER: { + addressesRet = getListenerHost(space_, cpp2::ListenerType::ELASTICSEARCH); + break; + } + case TargetHosts::DEFAULT: { + addressesRet = getTargetHost(space_); + break; + } } if (!nebula::ok(addressesRet)) { diff --git a/src/meta/processors/jobMan/MetaJobExecutor.h b/src/meta/processors/jobMan/MetaJobExecutor.h index c08e2519c..5d2268635 100644 --- a/src/meta/processors/jobMan/MetaJobExecutor.h +++ b/src/meta/processors/jobMan/MetaJobExecutor.h @@ -21,6 +21,12 @@ using ErrOrHosts = ErrorOr>; class MetaJobExecutor { public: + enum class TargetHosts { + LEADER = 0, + LISTENER, + DEFAULT + }; + MetaJobExecutor(JobID jobId, kvstore::KVStore* kvstore, AdminClient* adminClient, @@ -66,6 +72,8 @@ class MetaJobExecutor { ErrOrHosts getLeaderHost(GraphSpaceID space); + ErrOrHosts getListenerHost(GraphSpaceID space, cpp2::ListenerType type); + virtual folly::Future executeInternal(HostAddr&& address, std::vector&& parts) = 0; @@ -76,7 +84,7 @@ class MetaJobExecutor { AdminClient* adminClient_{nullptr}; GraphSpaceID space_; std::vector paras_; - bool toLeader_{false}; + TargetHosts toHost_{TargetHosts::DEFAULT}; int32_t concurrency_{INT_MAX}; bool stopped_{false}; std::mutex muInterrupt_; diff --git a/src/meta/processors/jobMan/RebuildFTJobExecutor.cpp b/src/meta/processors/jobMan/RebuildFTJobExecutor.cpp new file mode 100644 index 000000000..8bf622470 --- /dev/null +++ b/src/meta/processors/jobMan/RebuildFTJobExecutor.cpp @@ -0,0 +1,21 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/jobMan/RebuildFTJobExecutor.h" + +namespace nebula { +namespace meta { + +folly::Future +RebuildFTJobExecutor::executeInternal(HostAddr&& address, + std::vector&& parts) { + return adminClient_->addTask(cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX, jobId_, taskId_++, + space_, {std::move(address)}, taskParameters_, + std::move(parts), concurrency_); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/jobMan/RebuildFTJobExecutor.h b/src/meta/processors/jobMan/RebuildFTJobExecutor.h new file mode 100644 index 000000000..deae62543 --- /dev/null +++ b/src/meta/processors/jobMan/RebuildFTJobExecutor.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_REBUILDFTJOBEXECUTOR_H_ +#define META_REBUILDFTJOBEXECUTOR_H_ + +#include "meta/processors/jobMan/RebuildJobExecutor.h" + +namespace nebula { +namespace meta { + +class RebuildFTJobExecutor : public RebuildJobExecutor { +public: + RebuildFTJobExecutor(JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& paras) + : RebuildJobExecutor(jobId, kvstore, adminClient, std::move(paras)) { + toHost_ = TargetHosts::LISTENER; + } + +protected: + folly::Future + executeInternal(HostAddr&& address, std::vector&& parts) override; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_REBUILDFTJOBEXECUTOR_H_ diff --git a/src/meta/processors/jobMan/RebuildJobExecutor.h b/src/meta/processors/jobMan/RebuildJobExecutor.h index f1ef70220..9fe8e1dd9 100644 --- a/src/meta/processors/jobMan/RebuildJobExecutor.h +++ b/src/meta/processors/jobMan/RebuildJobExecutor.h @@ -21,7 +21,7 @@ class RebuildJobExecutor : public MetaJobExecutor { AdminClient* adminClient, const std::vector& paras) : MetaJobExecutor(jobId, kvstore, adminClient, paras) { - toLeader_ = true; + toHost_ = TargetHosts::LEADER; } bool check() override; diff --git a/src/meta/processors/jobMan/StatisJobExecutor.h b/src/meta/processors/jobMan/StatisJobExecutor.h index 24882bf71..29c04248a 100644 --- a/src/meta/processors/jobMan/StatisJobExecutor.h +++ b/src/meta/processors/jobMan/StatisJobExecutor.h @@ -21,7 +21,7 @@ class StatisJobExecutor : public MetaJobExecutor { AdminClient* adminClient, const std::vector& paras) : MetaJobExecutor(jobId, kvstore, adminClient, paras) { - toLeader_ = true; + toHost_ = TargetHosts::LEADER; } bool check() override; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 1ecc56a1d..5180202ea 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -19,6 +19,7 @@ nebula_add_library( admin/RebuildIndexTask.cpp admin/RebuildTagIndexTask.cpp admin/RebuildEdgeIndexTask.cpp + admin/RebuildFTIndexTask.cpp admin/StatisTask.cpp admin/ListClusterInfoProcessor.cpp ) diff --git a/src/storage/admin/AdminTask.cpp b/src/storage/admin/AdminTask.cpp index ede6714ec..9353e888b 100644 --- a/src/storage/admin/AdminTask.cpp +++ b/src/storage/admin/AdminTask.cpp @@ -9,6 +9,7 @@ #include "storage/admin/FlushTask.h" #include "storage/admin/RebuildTagIndexTask.h" #include "storage/admin/RebuildEdgeIndexTask.h" +#include "storage/admin/RebuildFTIndexTask.h" #include "storage/admin/StatisTask.h" namespace nebula { @@ -31,6 +32,9 @@ AdminTaskFactory::createAdminTask(StorageEnv* env, TaskContext&& ctx) { case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX: ret = std::make_shared(env, std::move(ctx)); break; + case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: + ret = std::make_shared(env, std::move(ctx)); + break; case meta::cpp2::AdminCmd::STATS: ret = std::make_shared(env, std::move(ctx)); break; diff --git a/src/storage/admin/RebuildFTIndexTask.cpp b/src/storage/admin/RebuildFTIndexTask.cpp new file mode 100644 index 000000000..ea6255efd --- /dev/null +++ b/src/storage/admin/RebuildFTIndexTask.cpp @@ -0,0 +1,74 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "storage/admin/RebuildFTIndexTask.h" +#include "common/base/Logging.h" + +DECLARE_uint32(raft_heartbeat_interval_secs); + +namespace nebula { +namespace storage { + +ErrorOr> +RebuildFTIndexTask::genSubTasks() { + std::vector tasks; + VLOG(3) << "Begin rebuild fulltext indexes, space : " << *ctx_.parameters_.space_id_ref(); + auto parts = *ctx_.parameters_.parts_ref(); + auto* store = dynamic_cast(env_->kvstore_); + auto listenerRet = store->spaceListener(*ctx_.parameters_.space_id_ref()); + if (!ok(listenerRet)) { + return error(listenerRet); + } + auto space = nebula::value(listenerRet); + for (const auto& part : parts) { + nebula::kvstore::Listener *listener = nullptr; + for (auto& lMap : space->listeners_) { + if (part != lMap.first) { + continue; + } + for (auto& l : lMap.second) { + if (l.first != meta::cpp2::ListenerType::ELASTICSEARCH) { + continue; + } + listener = l.second.get(); + break; + } + } + if (listener == nullptr) { + return nebula::cpp2::ErrorCode::E_LISTENER_NOT_FOUND; + } + if (!listener->isRunning()) { + LOG(ERROR) << "listener not ready, may be starting or waiting snapshot"; + // TODO : add ErrorCode for listener not ready. + return nebula::cpp2::ErrorCode::E_LISTENER_NOT_FOUND; + } + VLOG(3) << folly::sformat("Processing fulltext rebuild subtask, space={}, part={}", + *ctx_.parameters_.space_id_ref(), part); + std::function task = + std::bind(&RebuildFTIndexTask::taskByPart, this, listener); + tasks.emplace_back(std::move(task)); + } + return tasks; +} + +nebula::cpp2::ErrorCode +RebuildFTIndexTask::taskByPart(nebula::kvstore::Listener* listener) { + auto part = listener->partitionId(); + listener->resetListener(); + while (true) { + sleep(FLAGS_raft_heartbeat_interval_secs); + if (listener->pursueLeaderDone()) { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + VLOG(1) << folly::sformat( + "Processing fulltext rebuild subtask, part={}, rebuild_log={}", + part, listener->getApplyId()); + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/admin/RebuildFTIndexTask.h b/src/storage/admin/RebuildFTIndexTask.h new file mode 100644 index 000000000..128263aca --- /dev/null +++ b/src/storage/admin/RebuildFTIndexTask.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef STORAGE_ADMIN_REBUILDFTINDEXTASK_H_ +#define STORAGE_ADMIN_REBUILDFTINDEXTASK_H_ + +#include "common/thrift/ThriftTypes.h" +#include "kvstore/KVEngine.h" +#include "kvstore/NebulaStore.h" +#include "storage/admin/AdminTask.h" + +namespace nebula { +namespace storage { + +class RebuildFTIndexTask : public AdminTask { +public: + RebuildFTIndexTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + + ErrorOr> genSubTasks() override; + +protected: + nebula::cpp2::ErrorCode taskByPart(nebula::kvstore::Listener* listener); +}; + +} // namespace storage +} // namespace nebula +#endif // STORAGE_ADMIN_REBUILDFTINDEXTASK_H_