Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into fulltext_rebuild
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu authored Jul 2, 2021
2 parents c0f0bdb + da5dde0 commit 8cfc17c
Show file tree
Hide file tree
Showing 89 changed files with 1,826 additions and 1,107 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# ENABLE_TESTING -- Build unit test
# ENABLE_PACK_ONE -- Package to one or multi packages
#
cmake_minimum_required(VERSION 3.5.0)
cmake_minimum_required(VERSION 3.9.0)

project("Nebula Storage" C CXX)

Expand Down
8 changes: 4 additions & 4 deletions conf/nebula-storaged-listener.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@

########## networking ##########
# Meta server address
--meta_server_addrs=192.168.2.1:45500
--meta_server_addrs=192.168.2.1:9559
# Local ip
--local_ip=192.168.2.4
# Storage daemon listening port
--port=44510
--port=9789
# HTTP service ip
--ws_ip=192.168.2.4
# HTTP service port
--ws_http_port=12021
--ws_http_port=19789
# HTTP2 service port
--ws_h2_port=12031
--ws_h2_port=19790
# heartbeat with meta service
--heartbeat_interval_secs=10

Expand Down
3 changes: 0 additions & 3 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable the whole key filtering.
--enable_rocksdb_whole_key_filtering=true
# The prefix length for each key to use as the filter value.
# can be 12 bytes(PartitionId + VertexID), or 16 bytes(PartitionId + VertexID + TagID/EdgeType).
--rocksdb_filtering_prefix_length=12

############## rocksdb Options ##############
# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma
Expand Down
3 changes: 0 additions & 3 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable the whole key filtering.
--enable_rocksdb_whole_key_filtering=true
# The prefix length for each key to use as the filter value.
# can be 12 bytes(PartitionId + VertexID), or 16 bytes(PartitionId + VertexID + TagID/EdgeType).
--rocksdb_filtering_prefix_length=12

############### misc ####################
--max_handlers_per_req=1
2 changes: 1 addition & 1 deletion src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {
WriteResult r = WriteResult::SUCCEEDED;
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
auto defVal = Expression::eval(expr.get(), expCtx);
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::NULLVALUE:
setNullBit(field->nullFlagPos());
Expand Down
2 changes: 1 addition & 1 deletion src/codec/test/ResultSchemaProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ bool ResultSchemaProvider::ResultSchemaField::nullable() const {


Expression* ResultSchemaProvider::ResultSchemaField::defaultValue() const {
return defaultValue_.get();
return defaultValue_;
}


Expand Down
2 changes: 1 addition & 1 deletion src/codec/test/ResultSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
bool nullable_;
int32_t offset_;
size_t nullFlagPos_;
std::unique_ptr<Expression> defaultValue_;
Expression* defaultValue_;
};


Expand Down
12 changes: 7 additions & 5 deletions src/codec/test/RowWriterV2Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,16 @@ TEST(RowWriterV2, NoDefaultValue) {
EXPECT_EQ(v1, v2);
}


TEST(RowWriterV2, WithDefaultValue) {
ObjectPool objPool;
auto pool = &objPool;

SchemaWriter schema(7 /*Schema version*/);
schema.appendCol("Col01", PropertyType::BOOL, 0, true);
schema.appendCol("Col02", PropertyType::INT64, 0, false, new ConstantExpression(12345));
schema.appendCol("Col03", PropertyType::STRING, 0, true, new ConstantExpression(str));
schema.appendCol("Col04", PropertyType::FIXED_STRING, 12, false, new ConstantExpression(fixed));
schema.appendCol("Col02", PropertyType::INT64, 0, false, ConstantExpression::make(pool, 12345));
schema.appendCol("Col03", PropertyType::STRING, 0, true, ConstantExpression::make(pool, str));
schema.appendCol(
"Col04", PropertyType::FIXED_STRING, 12, false, ConstantExpression::make(pool, fixed));

RowWriterV2 writer(&schema);
ASSERT_EQ(WriteResult::SUCCEEDED, writer.finish());
Expand Down Expand Up @@ -237,7 +240,6 @@ TEST(RowWriterV2, WithDefaultValue) {
EXPECT_EQ(v1, v2);
}


TEST(RowWriterV2, DoubleSet) {
SchemaWriter schema(3 /*Schema version*/);
schema.appendCol("Col01", PropertyType::BOOL, 0, true);
Expand Down
4 changes: 2 additions & 2 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "common/hdfs/HdfsHelper.h"
#include "common/hdfs/HdfsCommandHelper.h"
#include "common/thread/GenericThreadPool.h"
#include "common/time/TimeUtils.h"
#include "common/time/TimezoneInfo.h"
#include "common/version/Version.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "kvstore/PartManager.h"
Expand Down Expand Up @@ -302,7 +302,7 @@ int main(int argc, char *argv[]) {

// Initialize the global timezone, it's only used for datetime type compute
// won't affect the process timezone.
status = nebula::time::TimeUtils::initializeGlobalTimezone();
status = nebula::time::Timezone::initializeGlobalTimezone();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
Expand Down
7 changes: 5 additions & 2 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "common/base/SignalHandler.h"
#include "common/network/NetworkUtils.h"
#include "common/process/ProcessUtils.h"
#include "common/time/TimeUtils.h"
#include "common/time/TimezoneInfo.h"
#include "common/version/Version.h"
#include "storage/StorageServer.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
Expand All @@ -17,6 +17,8 @@
DEFINE_string(local_ip, "", "IP address which is used to identify this server");
DEFINE_string(data_path, "", "Root data path, multi paths should be split by comma."
"For rocksdb engine, one path one instance.");
DEFINE_string(wal_path, "",
"Nebula wal path. By default, wal will be stored as a sibling of rocksdb data.");
DEFINE_string(listener_path, "", "Path for listener, only wal will be saved."
"if it is not empty, data_path will not take effect.");
DEFINE_bool(daemonize, true, "Whether to run the process as a daemon");
Expand Down Expand Up @@ -115,7 +117,7 @@ int main(int argc, char *argv[]) {

// Initialize the global timezone, it's only used for datetime type compute
// won't affect the process timezone.
status = nebula::time::TimeUtils::initializeGlobalTimezone();
status = nebula::time::Timezone::initializeGlobalTimezone();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
Expand All @@ -124,6 +126,7 @@ int main(int argc, char *argv[]) {
gStorageServer = std::make_unique<nebula::storage::StorageServer>(host,
metaAddrsRet.value(),
paths,
FLAGS_wal_path,
FLAGS_listener_path);
if (!gStorageServer->start()) {
LOG(ERROR) << "Storage server start failed";
Expand Down
30 changes: 15 additions & 15 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
std::vector<std::atomic_uint64_t> freeBytes(dataPaths.size() + 1);
size_t index = 0;
for (const auto& path : dataPaths) {
auto absolute = std::filesystem::absolute(path);
if (!std::filesystem::exists(absolute)) {
std::filesystem::create_directories(absolute);
} else if (!std::filesystem::is_directory(absolute)) {
auto absolute = boost::filesystem::absolute(path);
if (!boost::filesystem::exists(absolute)) {
boost::filesystem::create_directories(absolute);
} else if (!boost::filesystem::is_directory(absolute)) {
LOG(FATAL) << "DataPath is not a valid directory: " << path;
}
auto canonical = std::filesystem::canonical(path);
auto info = std::filesystem::space(canonical);
auto canonical = boost::filesystem::canonical(path);
auto info = boost::filesystem::space(canonical);
dataPaths_.emplace_back(std::move(canonical));
freeBytes[index++] = info.available;
}
freeBytes_ = std::move(freeBytes);
} catch (std::filesystem::filesystem_error& e) {
} catch (boost::filesystem::filesystem_error& e) {
LOG(FATAL) << "DataPath invalid: " << e.what();
}
if (bgThread_) {
Expand Down Expand Up @@ -71,13 +71,13 @@ void DiskManager::addPartToPath(GraphSpaceID spaceId,
PartitionID partId,
const std::string& path) {
try {
auto canonical = std::filesystem::canonical(path);
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath);
CHECK(iter != dataPaths_.end());
partIndex_[spaceId][partId] = iter - dataPaths_.begin();
partPath_[spaceId][canonical].emplace(partId);
} catch (std::filesystem::filesystem_error& e) {
partPath_[spaceId][canonical.string()].emplace(partId);
} catch (boost::filesystem::filesystem_error& e) {
LOG(FATAL) << "Invalid path: " << e.what();
}
}
Expand All @@ -86,13 +86,13 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId,
PartitionID partId,
const std::string& path) {
try {
auto canonical = std::filesystem::canonical(path);
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath);
CHECK(iter != dataPaths_.end());
partIndex_[spaceId].erase(partId);
partPath_[spaceId][canonical].erase(partId);
} catch (std::filesystem::filesystem_error& e) {
partPath_[spaceId][canonical.string()].erase(partId);
} catch (boost::filesystem::filesystem_error& e) {
LOG(FATAL) << "Invalid path: " << e.what();
}
}
Expand Down Expand Up @@ -121,8 +121,8 @@ bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) {
void DiskManager::refresh() {
// refresh the available bytes of each data path, skip the dummy path
for (size_t i = 0; i < dataPaths_.size(); i++) {
std::error_code ec;
auto info = std::filesystem::space(dataPaths_[i], ec);
boost::system::error_code ec;
auto info = boost::filesystem::space(dataPaths_[i], ec);
if (!ec) {
VLOG(1) << "Refresh filesystem info of " << dataPaths_[i];
freeBytes_[i] = info.available;
Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/DiskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftTypes.h"
#include <gtest/gtest_prod.h>
#include <filesystem>
#include <boost/filesystem.hpp>
#include <boost/system/error_code.hpp>

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -55,7 +56,7 @@ class DiskManager {
std::shared_ptr<thread::GenericWorker> bgThread_;

// canonical path of data_path flag
std::vector<std::filesystem::path> dataPaths_;
std::vector<boost::filesystem::path> dataPaths_;
// free space available to a non-privileged process, in bytes
std::vector<std::atomic_uint64_t> freeBytes_;

Expand Down
13 changes: 9 additions & 4 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ class KVEngine {
// Otherwise, nullptr will be returned
virtual const char* getDataRoot() const = 0;

virtual const char* getWalRoot() const = 0;

virtual std::unique_ptr<WriteBatch> startBatchWrite() = 0;

virtual nebula::cpp2::ErrorCode
commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL = true,
bool sync = false) = 0;
virtual nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL,
bool sync,
bool wait) = 0;

// Read a single key
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;
Expand Down Expand Up @@ -129,6 +131,9 @@ class KVEngine {
const std::string& tablePrefix,
std::function<bool(const folly::StringPiece& key)> filter) = 0;

virtual nebula::cpp2::ErrorCode backup() = 0;


protected:
GraphSpaceID spaceId_;
};
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct KVOptions {
// otherwise it would mix up the data on disk.
std::vector<std::string> dataPaths_;

std::string walPath_;

// Path for listener, only wal is stored, the structure would be spaceId/partId/wal
std::string listenerPath_;

Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ bool Listener::preProcessLog(LogID logId,
return true;
}

bool Listener::commitLogs(std::unique_ptr<LogIterator> iter) {
cpp2::ErrorCode Listener::commitLogs(std::unique_ptr<LogIterator> iter, bool) {
LogID lastId = -1;
while (iter->valid()) {
lastId = iter->logId();
Expand All @@ -123,7 +123,7 @@ bool Listener::commitLogs(std::unique_ptr<LogIterator> iter) {
if (lastId > 0) {
leaderCommitId_ = lastId;
}
return true;
return cpp2::ErrorCode::SUCCEEDED;
}

void Listener::doApply() {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ derived class.
// For listener, we just return true directly. Another background thread trigger the actual
// apply work, and do it in worker thread, and update lastApplyLogId_
bool commitLogs(std::unique_ptr<LogIterator> iter)
cpp2::Errorcode commitLogs(std::unique_ptr<LogIterator> iter, bool)
// For most of the listeners, just return true is enough. However, if listener need to be aware
// of membership change, some log type of wal need to be pre-processed, could do it here.
Expand Down Expand Up @@ -154,7 +154,7 @@ class Listener : public raftex::RaftPart {

// For listener, we just return true directly. Another background thread trigger the actual
// apply work, and do it in worker thread, and update lastApplyLogId_
bool commitLogs(std::unique_ptr<LogIterator>) override;
cpp2::ErrorCode commitLogs(std::unique_ptr<LogIterator>, bool) override;

// For most of the listeners, just return true is enough. However, if listener need to be aware
// of membership change, some log type of wal need to be pre-processed, could do it here.
Expand Down
Loading

0 comments on commit 8cfc17c

Please sign in to comment.