Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

rebuild fulltext indexes via listener #482

Merged
30 changes: 30 additions & 0 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ DEFINE_int32(listener_commit_interval_secs, 1, "Listener commit interval");
DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit");
DEFINE_int32(ft_request_retry_times, 3, "Retry times if fulltext request failed");
DEFINE_int32(ft_bulk_batch_size, 100, "Max batch size when bulk insert");
DEFINE_int32(listener_pursue_leader_threshold, 1000, "Catch up with the leader's threshold");

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -213,6 +214,9 @@ void Listener::doApply() {
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(1) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
VLOG(1) << folly::sformat("Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId_, term_, lastApplyLogId_);
}
});
}
Expand Down Expand Up @@ -243,9 +247,35 @@ std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vector<std::stri
persist(committedLogId, committedLogTerm, lastApplyLogId_);
LOG(INFO) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
VLOG(3) << folly::sformat("Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId, committedLogTerm, lastApplyLogId_);
}
return std::make_pair(count, size);
}

void Listener::resetListener() {
bright-starry-sky marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard<std::mutex> g(raftLock_);
reset();
VLOG(1) << folly::sformat("The listener has been reset : leaderCommitId={},"
"proposedTerm={}, lastLogTerm={}, term={},"
"lastApplyLogId={}",
leaderCommitId_, proposedTerm_, lastLogTerm_,
term_, lastApplyLogId_);
}

bool Listener::pursueLeaderDone() {
std::lock_guard<std::mutex> g(raftLock_);
if (status_ != Status::RUNNING) {
return false;
}
// if the leaderCommitId_ and lastApplyLogId_ all are 0. means the snapshot gap.
if (leaderCommitId_ == 0 && lastApplyLogId_ == 0) {
return false;
}
VLOG(1) << folly::sformat("pursue leader : leaderCommitId={}, lastApplyLogId_={}",
leaderCommitId_, lastApplyLogId_);
return (leaderCommitId_ - lastApplyLogId_) <= FLAGS_listener_pursue_leader_threshold;
}
} // namespace kvstore
} // namespace nebula
8 changes: 4 additions & 4 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ class Listener : public raftex::RaftPart {
}

void cleanup() override {
CHECK(!raftLock_.try_lock());
leaderCommitId_ = 0;
lastApplyLogId_ = 0;
persist(0, 0, lastApplyLogId_);
}

void resetListener() {
std::lock_guard<std::mutex> g(raftLock_);
reset();
}
void resetListener();

bool pursueLeaderDone();

protected:
virtual void init() = 0;
Expand Down
10 changes: 10 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,16 @@ NebulaStore::space(GraphSpaceID spaceId) {
return it->second;
}

ErrorOr<nebula::cpp2::ErrorCode, std::shared_ptr<SpaceListenerInfo>>
NebulaStore::spaceListener(GraphSpaceID spaceId) {
folly::RWSpinLock::ReadHolder rh(&lock_);
auto it = spaceListeners_.find(spaceId);
if (UNLIKELY(it == spaceListeners_.end())) {
return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND;
}
return it->second;
}

int32_t NebulaStore::allLeader(
std::unordered_map<GraphSpaceID, std::vector<meta::cpp2::LeaderInfo>>& leaderIds) {
folly::RWSpinLock::ReadHolder rh(&lock_);
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ class NebulaStore : public KVStore, public Handler {
ErrorOr<nebula::cpp2::ErrorCode, std::shared_ptr<SpacePartInfo>>
space(GraphSpaceID spaceId);

ErrorOr<nebula::cpp2::ErrorCode, std::shared_ptr<SpaceListenerInfo>>
spaceListener(GraphSpaceID spaceId);

/**
* Implement four interfaces in Handler.
* */
Expand Down
3 changes: 0 additions & 3 deletions src/kvstore/plugins/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ class ESListener : public Listener {

LogID lastApplyLogId() override;

void cleanup() override {
}

private:
bool writeAppliedId(LogID lastId, TermID lastTerm, LogID lastApplyLogId);

Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ nebula_add_library(
processors/jobMan/RebuildJobExecutor.cpp
processors/jobMan/RebuildTagJobExecutor.cpp
processors/jobMan/RebuildEdgeJobExecutor.cpp
processors/jobMan/RebuildFTJobExecutor.cpp
processors/jobMan/StatisJobExecutor.cpp
processors/jobMan/GetStatisProcessor.cpp
processors/jobMan/ListTagIndexStatusProcessor.cpp
Expand Down
69 changes: 65 additions & 4 deletions src/meta/processors/jobMan/MetaJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
#include "meta/processors/jobMan/MetaJobExecutor.h"
#include "meta/processors/jobMan/RebuildTagJobExecutor.h"
#include "meta/processors/jobMan/RebuildEdgeJobExecutor.h"
#include "meta/processors/jobMan/RebuildFTJobExecutor.h"
#include "meta/processors/jobMan/StatisJobExecutor.h"
#include "meta/processors/jobMan/TaskDescription.h"
#include "utils/Utils.h"

DECLARE_int32(heartbeat_interval_secs);
DECLARE_uint32(expired_time_factor);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -55,6 +57,12 @@ MetaJobExecutorFactory::createMetaJobExecutor(const JobDescription& jd,
client,
jd.getParas()));
break;
case cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX:
ret.reset(new RebuildFTJobExecutor(jd.getJobId(),
store,
client,
jd.getParas()));
break;
case cpp2::AdminCmd::STATS:
ret.reset(new StatisJobExecutor(jd.getJobId(),
store,
Expand Down Expand Up @@ -138,12 +146,65 @@ ErrOrHosts MetaJobExecutor::getLeaderHost(GraphSpaceID space) {
return hosts;
}

ErrOrHosts MetaJobExecutor::getListenerHost(GraphSpaceID space, cpp2::ListenerType type) {
const auto& prefix = MetaServiceUtils::listenerPrefix(space, type);
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get space " << space << "'s listener failed, error: "
<< apache::thrift::util::enumNameSafe(ret);
return ret;
}

auto activeHostsRet = ActiveHostsMan::getActiveHosts(
kvstore_,
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor,
cpp2::HostRole::LISTENER);
if (!nebula::ok(activeHostsRet)) {
return nebula::error(activeHostsRet);
}

auto activeHosts = std::move(nebula::value(activeHostsRet));
std::vector<std::pair<HostAddr, std::vector<PartitionID>>> hosts;

while (iter->valid()) {
auto host = MetaServiceUtils::deserializeHostAddr(iter->val());
auto part = MetaServiceUtils::parseListenerPart(iter->key());
if (std::find(activeHosts.begin(), activeHosts.end(), host) == activeHosts.end()) {
LOG(ERROR) << "Invalid host : " << network::NetworkUtils::toHostsStr({host});
return nebula::cpp2::ErrorCode::E_INVALID_HOST;
}
auto it = std::find_if(hosts.begin(), hosts.end(), [&host](auto& item){
return item.first == host;
});
if (it == hosts.end()) {
hosts.emplace_back(std::make_pair(host, std::vector<PartitionID>{part}));
} else {
it->second.emplace_back(part);
}
iter->next();
}
if (hosts.empty()) {
return nebula::cpp2::ErrorCode::E_LISTENER_NOT_FOUND;
}
return hosts;
}

nebula::cpp2::ErrorCode MetaJobExecutor::execute() {
ErrOrHosts addressesRet;
if (toLeader_) {
addressesRet = getLeaderHost(space_);
} else {
addressesRet = getTargetHost(space_);
switch (toHost_) {
case TargetHosts::LEADER: {
addressesRet = getLeaderHost(space_);
break;
}
case TargetHosts::LISTENER: {
addressesRet = getListenerHost(space_, cpp2::ListenerType::ELASTICSEARCH);
break;
}
case TargetHosts::DEFAULT: {
addressesRet = getTargetHost(space_);
break;
}
}

if (!nebula::ok(addressesRet)) {
Expand Down
10 changes: 9 additions & 1 deletion src/meta/processors/jobMan/MetaJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ using ErrOrHosts = ErrorOr<nebula::cpp2::ErrorCode, std::vector<PartsOfHost>>;

class MetaJobExecutor {
public:
enum class TargetHosts {
LEADER = 0,
LISTENER,
DEFAULT
};

MetaJobExecutor(JobID jobId,
kvstore::KVStore* kvstore,
AdminClient* adminClient,
Expand Down Expand Up @@ -66,6 +72,8 @@ class MetaJobExecutor {

ErrOrHosts getLeaderHost(GraphSpaceID space);

ErrOrHosts getListenerHost(GraphSpaceID space, cpp2::ListenerType type);

virtual folly::Future<Status>
executeInternal(HostAddr&& address, std::vector<PartitionID>&& parts) = 0;

Expand All @@ -76,7 +84,7 @@ class MetaJobExecutor {
AdminClient* adminClient_{nullptr};
GraphSpaceID space_;
std::vector<std::string> paras_;
bool toLeader_{false};
TargetHosts toHost_{TargetHosts::DEFAULT};
int32_t concurrency_{INT_MAX};
bool stopped_{false};
std::mutex muInterrupt_;
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/jobMan/RebuildFTJobExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "meta/processors/jobMan/RebuildFTJobExecutor.h"

namespace nebula {
namespace meta {

folly::Future<Status>
RebuildFTJobExecutor::executeInternal(HostAddr&& address,
std::vector<PartitionID>&& parts) {
return adminClient_->addTask(cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX, jobId_, taskId_++,
space_, {std::move(address)}, taskParameters_,
std::move(parts), concurrency_);
}

} // namespace meta
} // namespace nebula
33 changes: 33 additions & 0 deletions src/meta/processors/jobMan/RebuildFTJobExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef META_REBUILDFTJOBEXECUTOR_H_
#define META_REBUILDFTJOBEXECUTOR_H_

#include "meta/processors/jobMan/RebuildJobExecutor.h"

namespace nebula {
namespace meta {

class RebuildFTJobExecutor : public RebuildJobExecutor {
public:
RebuildFTJobExecutor(JobID jobId,
kvstore::KVStore* kvstore,
AdminClient* adminClient,
const std::vector<std::string>& paras)
: RebuildJobExecutor(jobId, kvstore, adminClient, std::move(paras)) {
toHost_ = TargetHosts::LISTENER;
}

protected:
folly::Future<Status>
executeInternal(HostAddr&& address, std::vector<PartitionID>&& parts) override;
};

} // namespace meta
} // namespace nebula

#endif // META_REBUILDFTJOBEXECUTOR_H_
2 changes: 1 addition & 1 deletion src/meta/processors/jobMan/RebuildJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RebuildJobExecutor : public MetaJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& paras)
: MetaJobExecutor(jobId, kvstore, adminClient, paras) {
toLeader_ = true;
toHost_ = TargetHosts::LEADER;
}

bool check() override;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/jobMan/StatisJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class StatisJobExecutor : public MetaJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& paras)
: MetaJobExecutor(jobId, kvstore, adminClient, paras) {
toLeader_ = true;
toHost_ = TargetHosts::LEADER;
}

bool check() override;
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ nebula_add_library(
admin/RebuildIndexTask.cpp
admin/RebuildTagIndexTask.cpp
admin/RebuildEdgeIndexTask.cpp
admin/RebuildFTIndexTask.cpp
admin/StatisTask.cpp
admin/ListClusterInfoProcessor.cpp
)
Expand Down
4 changes: 4 additions & 0 deletions src/storage/admin/AdminTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "storage/admin/FlushTask.h"
#include "storage/admin/RebuildTagIndexTask.h"
#include "storage/admin/RebuildEdgeIndexTask.h"
#include "storage/admin/RebuildFTIndexTask.h"
#include "storage/admin/StatisTask.h"

namespace nebula {
Expand All @@ -31,6 +32,9 @@ AdminTaskFactory::createAdminTask(StorageEnv* env, TaskContext&& ctx) {
case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX:
ret = std::make_shared<RebuildEdgeIndexTask>(env, std::move(ctx));
break;
case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX:
ret = std::make_shared<RebuildFTIndexTask>(env, std::move(ctx));
break;
case meta::cpp2::AdminCmd::STATS:
ret = std::make_shared<StatisTask>(env, std::move(ctx));
break;
Expand Down
Loading