From e1b5ef08901a7adda66f5a46c4a5d20296bd1850 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Thu, 25 Nov 2021 22:01:39 -0600 Subject: [PATCH] enable kv interface in 2.0 (#3282) * enable kv interface in 2.0 * rebased, fix SimpleKVVerifyTool and StorageIntegrityTool * address @yixinglu's comments, rename GraphStorageClient to StorageClient --- src/clients/storage/CMakeLists.txt | 10 +- src/clients/storage/GeneralStorageClient.cpp | 90 -------- src/clients/storage/GeneralStorageClient.h | 50 ----- ...aphStorageClient.cpp => StorageClient.cpp} | 145 +++++++++---- .../{GraphStorageClient.h => StorageClient.h} | 26 ++- src/daemons/CMakeLists.txt | 4 +- src/graph/context/QueryContext.cpp | 2 +- src/graph/context/QueryContext.h | 10 +- src/graph/executor/mutate/DeleteExecutor.cpp | 8 +- src/graph/executor/mutate/InsertExecutor.cpp | 6 +- src/graph/executor/mutate/UpdateExecutor.cpp | 6 +- .../executor/query/AppendVerticesExecutor.cpp | 12 +- src/graph/executor/query/GetEdgesExecutor.cpp | 12 +- .../executor/query/GetNeighborsExecutor.cpp | 14 +- .../executor/query/GetVerticesExecutor.cpp | 12 +- .../executor/query/IndexScanExecutor.cpp | 12 +- src/graph/executor/query/IndexScanExecutor.h | 2 +- src/graph/executor/query/TraverseExecutor.cpp | 14 +- src/graph/executor/query/TraverseExecutor.h | 2 +- src/graph/executor/test/CMakeLists.txt | 2 +- src/graph/planner/test/CMakeLists.txt | 2 +- src/graph/service/GraphService.cpp | 2 +- src/graph/service/QueryEngine.cpp | 2 +- src/graph/service/QueryEngine.h | 4 +- src/interface/CMakeLists.txt | 2 +- src/interface/storage.thrift | 101 +++++---- src/meta/CMakeLists.txt | 3 +- src/mock/MockCluster.cpp | 27 +-- src/mock/MockCluster.h | 13 +- src/storage/CMakeLists.txt | 5 - src/storage/CompactionFilter.h | 7 - src/storage/GeneralStorageServiceHandler.cpp | 45 ---- src/storage/GeneralStorageServiceHandler.h | 33 --- src/storage/GraphStorageServiceHandler.cpp | 24 +++ src/storage/GraphStorageServiceHandler.h | 6 + src/storage/StorageServer.cpp | 7 +- src/storage/test/CMakeLists.txt | 4 +- src/storage/test/KVClientTest.cpp | 6 +- src/storage/test/KVTest.cpp | 2 +- src/storage/test/StorageClientTest.cpp | 2 +- src/storage/test/StorageLookupBenchmark.cpp | 2 +- src/storage/test/TossTestExecutor.h | 2 +- src/storage/test/TossTestUtils.h | 2 +- src/tools/CMakeLists.txt | 2 +- src/tools/simple-kv-verify/CMakeLists.txt | 19 +- .../simple-kv-verify/SimpleKVVerifyTool.cpp | 17 +- src/tools/storage-perf/CMakeLists.txt | 2 +- .../storage-perf/StorageIntegrityTool.cpp | 197 ++++++------------ src/tools/storage-perf/StoragePerfTool.cpp | 26 +-- 49 files changed, 387 insertions(+), 618 deletions(-) delete mode 100644 src/clients/storage/GeneralStorageClient.cpp delete mode 100644 src/clients/storage/GeneralStorageClient.h rename src/clients/storage/{GraphStorageClient.cpp => StorageClient.cpp} (83%) rename src/clients/storage/{GraphStorageClient.h => StorageClient.h} (84%) delete mode 100644 src/storage/GeneralStorageServiceHandler.cpp delete mode 100644 src/storage/GeneralStorageServiceHandler.h diff --git a/src/clients/storage/CMakeLists.txt b/src/clients/storage/CMakeLists.txt index 693f412f2cb..c6df1f8dda6 100644 --- a/src/clients/storage/CMakeLists.txt +++ b/src/clients/storage/CMakeLists.txt @@ -3,14 +3,8 @@ # This source code is licensed under Apache 2.0 License. nebula_add_library( - graph_storage_client_obj OBJECT - GraphStorageClient.cpp -) - - -nebula_add_library( - general_storage_client_obj OBJECT - GeneralStorageClient.cpp + storage_client_obj OBJECT + StorageClient.cpp ) diff --git a/src/clients/storage/GeneralStorageClient.cpp b/src/clients/storage/GeneralStorageClient.cpp deleted file mode 100644 index 03a1fbb327f..00000000000 --- a/src/clients/storage/GeneralStorageClient.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "clients/storage/GeneralStorageClient.h" - -#include "common/base/Base.h" - -namespace nebula { -namespace storage { - -folly::SemiFuture> GeneralStorageClient::get( - GraphSpaceID space, std::vector&& keys, bool returnPartly, folly::EventBase* evb) { - auto status = clusterIdsToHosts( - space, std::move(keys), [](const std::string& v) -> const std::string& { return v; }); - - if (!status.ok()) { - return folly::makeFuture>( - std::runtime_error(status.status().toString())); - } - - auto& clusters = status.value(); - std::unordered_map requests; - for (auto& c : clusters) { - auto& host = c.first; - auto& req = requests[host]; - req.set_space_id(space); - req.set_parts(std::move(c.second)); - req.set_return_partly(returnPartly); - } - - return collectResponse(evb, - std::move(requests), - [](cpp2::GeneralStorageServiceAsyncClient* client, - const cpp2::KVGetRequest& r) { return client->future_get(r); }); -} - -folly::SemiFuture> GeneralStorageClient::put( - GraphSpaceID space, std::vector kvs, folly::EventBase* evb) { - auto status = clusterIdsToHosts( - space, std::move(kvs), [](const KeyValue& v) -> const std::string& { return v.key; }); - - if (!status.ok()) { - return folly::makeFuture>( - std::runtime_error(status.status().toString())); - } - - auto& clusters = status.value(); - std::unordered_map requests; - for (auto& c : clusters) { - auto& host = c.first; - auto& req = requests[host]; - req.set_space_id(space); - req.set_parts(std::move(c.second)); - } - - return collectResponse(evb, - std::move(requests), - [](cpp2::GeneralStorageServiceAsyncClient* client, - const cpp2::KVPutRequest& r) { return client->future_put(r); }); -} - -folly::SemiFuture> GeneralStorageClient::remove( - GraphSpaceID space, std::vector keys, folly::EventBase* evb) { - auto status = clusterIdsToHosts( - space, std::move(keys), [](const std::string& v) -> const std::string& { return v; }); - - if (!status.ok()) { - return folly::makeFuture>( - std::runtime_error(status.status().toString())); - } - - auto& clusters = status.value(); - std::unordered_map requests; - for (auto& c : clusters) { - auto& host = c.first; - auto& req = requests[host]; - req.set_space_id(space); - req.set_parts(std::move(c.second)); - } - - return collectResponse(evb, - std::move(requests), - [](cpp2::GeneralStorageServiceAsyncClient* client, - const cpp2::KVRemoveRequest& r) { return client->future_remove(r); }); -} - -} // namespace storage -} // namespace nebula diff --git a/src/clients/storage/GeneralStorageClient.h b/src/clients/storage/GeneralStorageClient.h deleted file mode 100644 index 77d21608c8c..00000000000 --- a/src/clients/storage/GeneralStorageClient.h +++ /dev/null @@ -1,50 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef CLIENTS_STORAGE_GENERALSTORAGECLIENT_H_ -#define CLIENTS_STORAGE_GENERALSTORAGECLIENT_H_ - -#include - -#include "clients/meta/MetaClient.h" -#include "clients/storage/StorageClientBase.h" -#include "common/base/Base.h" -#include "common/datatypes/KeyValue.h" -#include "interface/gen-cpp2/GeneralStorageServiceAsyncClient.h" - -namespace nebula { -namespace storage { - -/** - * A wrapper class for GeneralStorageServiceAsyncClient thrift API - * - * The class is NOT reentrant - */ -class GeneralStorageClient : public StorageClientBase { - using Parent = StorageClientBase; - - public: - GeneralStorageClient(std::shared_ptr ioThreadPool, - meta::MetaClient* metaClient) - : Parent(ioThreadPool, metaClient) {} - virtual ~GeneralStorageClient() {} - - folly::SemiFuture> get(GraphSpaceID space, - std::vector&& keys, - bool returnPartly = false, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> put(GraphSpaceID space, - std::vector kvs, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> remove(GraphSpaceID space, - std::vector keys, - folly::EventBase* evb = nullptr); -}; - -} // namespace storage -} // namespace nebula -#endif // CLIENTS_STORAGE_GENERALSTORAGECLIENT_H_ diff --git a/src/clients/storage/GraphStorageClient.cpp b/src/clients/storage/StorageClient.cpp similarity index 83% rename from src/clients/storage/GraphStorageClient.cpp rename to src/clients/storage/StorageClient.cpp index 85af874cc19..462dcefc367 100644 --- a/src/clients/storage/GraphStorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -3,7 +3,7 @@ * This source code is licensed under Apache 2.0 License. */ -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/Base.h" @@ -15,12 +15,12 @@ using nebula::storage::cpp2::GetPropResponse; namespace nebula { namespace storage { -GraphStorageClient::CommonRequestParam::CommonRequestParam(GraphSpaceID space_, - SessionID sess, - ExecutionPlanID plan_, - bool profile_, - bool experimental, - folly::EventBase* evb_) +StorageClient::CommonRequestParam::CommonRequestParam(GraphSpaceID space_, + SessionID sess, + ExecutionPlanID plan_, + bool profile_, + bool experimental, + folly::EventBase* evb_) : space(space_), session(sess), plan(plan_), @@ -28,7 +28,7 @@ GraphStorageClient::CommonRequestParam::CommonRequestParam(GraphSpaceID space_, useExperimentalFeature(experimental), evb(evb_) {} -cpp2::RequestCommon GraphStorageClient::CommonRequestParam::toReqCommon() const { +cpp2::RequestCommon StorageClient::CommonRequestParam::toReqCommon() const { cpp2::RequestCommon common; common.set_session_id(session); common.set_plan_id(plan); @@ -36,7 +36,7 @@ cpp2::RequestCommon GraphStorageClient::CommonRequestParam::toReqCommon() const return common; } -StorageRpcRespFuture GraphStorageClient::getNeighbors( +StorageRpcRespFuture StorageClient::getNeighbors( const CommonRequestParam& param, std::vector colNames, const std::vector& vertices, @@ -108,7 +108,7 @@ StorageRpcRespFuture GraphStorageClient::getNeighbor }); } -StorageRpcRespFuture GraphStorageClient::addVertices( +StorageRpcRespFuture StorageClient::addVertices( const CommonRequestParam& param, std::vector vertices, std::unordered_map> propNames, @@ -146,11 +146,10 @@ StorageRpcRespFuture GraphStorageClient::addVertices( }); } -StorageRpcRespFuture GraphStorageClient::addEdges( - const CommonRequestParam& param, - std::vector edges, - std::vector propNames, - bool ifNotExists) { +StorageRpcRespFuture StorageClient::addEdges(const CommonRequestParam& param, + std::vector edges, + std::vector propNames, + bool ifNotExists) { auto cbStatus = getIdFromNewEdge(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -184,7 +183,7 @@ StorageRpcRespFuture GraphStorageClient::addEdges( }); } -StorageRpcRespFuture GraphStorageClient::getProps( +StorageRpcRespFuture StorageClient::getProps( const CommonRequestParam& param, const DataSet& input, const std::vector* vertexProps, @@ -240,7 +239,7 @@ StorageRpcRespFuture GraphStorageClient::getProps( const cpp2::GetPropRequest& r) { return client->future_getProps(r); }); } -StorageRpcRespFuture GraphStorageClient::deleteEdges( +StorageRpcRespFuture StorageClient::deleteEdges( const CommonRequestParam& param, std::vector edges) { auto cbStatus = getIdFromEdgeKey(param.space); if (!cbStatus.ok()) { @@ -273,7 +272,7 @@ StorageRpcRespFuture GraphStorageClient::deleteEdges( }); } -StorageRpcRespFuture GraphStorageClient::deleteVertices( +StorageRpcRespFuture StorageClient::deleteVertices( const CommonRequestParam& param, std::vector ids) { auto cbStatus = getIdFromValue(param.space); if (!cbStatus.ok()) { @@ -306,7 +305,7 @@ StorageRpcRespFuture GraphStorageClient::deleteVertices( }); } -StorageRpcRespFuture GraphStorageClient::deleteTags( +StorageRpcRespFuture StorageClient::deleteTags( const CommonRequestParam& param, std::vector delTags) { auto cbStatus = getIdFromDelTags(param.space); if (!cbStatus.ok()) { @@ -339,7 +338,7 @@ StorageRpcRespFuture GraphStorageClient::deleteTags( }); } -folly::Future> GraphStorageClient::updateVertex( +folly::Future> StorageClient::updateVertex( const CommonRequestParam& param, Value vertexId, TagID tagId, @@ -393,7 +392,7 @@ folly::Future> GraphStorageClient::updat }); } -folly::Future> GraphStorageClient::updateEdge( +folly::Future> StorageClient::updateEdge( const CommonRequestParam& param, storage::cpp2::EdgeKey edgeKey, std::vector updatedProps, @@ -448,9 +447,9 @@ folly::Future> GraphStorageClient::updat }); } -folly::Future> GraphStorageClient::getUUID(GraphSpaceID space, - const std::string& name, - folly::EventBase* evb) { +folly::Future> StorageClient::getUUID(GraphSpaceID space, + const std::string& name, + folly::EventBase* evb) { std::pair request; DCHECK(!!metaClient_); auto status = metaClient_->partsNum(space); @@ -482,7 +481,7 @@ folly::Future> GraphStorageClient::getUUID(GraphSpac }); } -StorageRpcRespFuture GraphStorageClient::lookupIndex( +StorageRpcRespFuture StorageClient::lookupIndex( const CommonRequestParam& param, const std::vector& contexts, bool isEdge, @@ -529,7 +528,7 @@ StorageRpcRespFuture GraphStorageClient::lookupIndex( }); } -StorageRpcRespFuture GraphStorageClient::lookupAndTraverse( +StorageRpcRespFuture StorageClient::lookupAndTraverse( const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec) { auto space = param.space; auto status = getHostParts(space); @@ -559,7 +558,7 @@ StorageRpcRespFuture GraphStorageClient::lookupAndTr }); } -StorageRpcRespFuture GraphStorageClient::scanEdge( +StorageRpcRespFuture StorageClient::scanEdge( const CommonRequestParam& param, const cpp2::EdgeProp& edgeProp, int64_t limit, @@ -590,7 +589,7 @@ StorageRpcRespFuture GraphStorageClient::scanEdge( const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); }); } -StorageRpcRespFuture GraphStorageClient::scanVertex( +StorageRpcRespFuture StorageClient::scanVertex( const CommonRequestParam& param, const std::vector& vertexProp, int64_t limit, @@ -623,7 +622,83 @@ StorageRpcRespFuture GraphStorageClient::scanVertex( }); } -StatusOr> GraphStorageClient::getIdFromRow( +folly::SemiFuture> StorageClient::get( + GraphSpaceID space, std::vector&& keys, bool returnPartly, folly::EventBase* evb) { + auto status = clusterIdsToHosts( + space, std::move(keys), [](const std::string& v) -> const std::string& { return v; }); + + if (!status.ok()) { + return folly::makeFuture>( + std::runtime_error(status.status().toString())); + } + + auto& clusters = status.value(); + std::unordered_map requests; + for (auto& c : clusters) { + auto& host = c.first; + auto& req = requests[host]; + req.set_space_id(space); + req.set_parts(std::move(c.second)); + req.set_return_partly(returnPartly); + } + + return collectResponse(evb, + std::move(requests), + [](cpp2::GraphStorageServiceAsyncClient* client, + const cpp2::KVGetRequest& r) { return client->future_get(r); }); +} + +folly::SemiFuture> StorageClient::put( + GraphSpaceID space, std::vector kvs, folly::EventBase* evb) { + auto status = clusterIdsToHosts( + space, std::move(kvs), [](const KeyValue& v) -> const std::string& { return v.key; }); + + if (!status.ok()) { + return folly::makeFuture>( + std::runtime_error(status.status().toString())); + } + + auto& clusters = status.value(); + std::unordered_map requests; + for (auto& c : clusters) { + auto& host = c.first; + auto& req = requests[host]; + req.set_space_id(space); + req.set_parts(std::move(c.second)); + } + + return collectResponse(evb, + std::move(requests), + [](cpp2::GraphStorageServiceAsyncClient* client, + const cpp2::KVPutRequest& r) { return client->future_put(r); }); +} + +folly::SemiFuture> StorageClient::remove( + GraphSpaceID space, std::vector keys, folly::EventBase* evb) { + auto status = clusterIdsToHosts( + space, std::move(keys), [](const std::string& v) -> const std::string& { return v; }); + + if (!status.ok()) { + return folly::makeFuture>( + std::runtime_error(status.status().toString())); + } + + auto& clusters = status.value(); + std::unordered_map requests; + for (auto& c : clusters) { + auto& host = c.first; + auto& req = requests[host]; + req.set_space_id(space); + req.set_parts(std::move(c.second)); + } + + return collectResponse(evb, + std::move(requests), + [](cpp2::GraphStorageServiceAsyncClient* client, + const cpp2::KVRemoveRequest& r) { return client->future_remove(r); }); +} + +StatusOr> StorageClient::getIdFromRow( GraphSpaceID space, bool isEdgeProps) const { auto vidTypeStatus = metaClient_->getSpaceVidType(space); if (!vidTypeStatus) { @@ -669,8 +744,8 @@ StatusOr> GraphStorageClient::getIdFr return cb; } -StatusOr> -GraphStorageClient::getIdFromNewVertex(GraphSpaceID space) const { +StatusOr> StorageClient::getIdFromNewVertex( + GraphSpaceID space) const { auto vidTypeStatus = metaClient_->getSpaceVidType(space); if (!vidTypeStatus) { return vidTypeStatus.status(); @@ -696,7 +771,7 @@ GraphStorageClient::getIdFromNewVertex(GraphSpaceID space) const { return cb; } -StatusOr> GraphStorageClient::getIdFromNewEdge( +StatusOr> StorageClient::getIdFromNewEdge( GraphSpaceID space) const { auto vidTypeStatus = metaClient_->getSpaceVidType(space); if (!vidTypeStatus) { @@ -732,7 +807,7 @@ StatusOr> GraphStorageClien return cb; } -StatusOr> GraphStorageClient::getIdFromEdgeKey( +StatusOr> StorageClient::getIdFromEdgeKey( GraphSpaceID space) const { auto vidTypeStatus = metaClient_->getSpaceVidType(space); if (!vidTypeStatus) { @@ -764,7 +839,7 @@ StatusOr> GraphStorageClien return cb; } -StatusOr> GraphStorageClient::getIdFromValue( +StatusOr> StorageClient::getIdFromValue( GraphSpaceID space) const { auto vidTypeStatus = metaClient_->getSpaceVidType(space); if (!vidTypeStatus) { @@ -791,7 +866,7 @@ StatusOr> GraphStorageClient::getId return cb; } -StatusOr> GraphStorageClient::getIdFromDelTags( +StatusOr> StorageClient::getIdFromDelTags( GraphSpaceID space) const { auto vidTypeStatus = metaClient_->getSpaceVidType(space); if (!vidTypeStatus) { diff --git a/src/clients/storage/GraphStorageClient.h b/src/clients/storage/StorageClient.h similarity index 84% rename from src/clients/storage/GraphStorageClient.h rename to src/clients/storage/StorageClient.h index 9b917b36add..2261f548b53 100644 --- a/src/clients/storage/GraphStorageClient.h +++ b/src/clients/storage/StorageClient.h @@ -3,8 +3,7 @@ * This source code is licensed under Apache 2.0 License. */ -#ifndef CLIENTS_STORAGE_GRAPHSTORAGECLIENT_H_ -#define CLIENTS_STORAGE_GRAPHSTORAGECLIENT_H_ +#pragma once #include @@ -23,7 +22,7 @@ using StorageRpcRespFuture = folly::SemiFuture>; * * The class is NOT reentrant */ -class GraphStorageClient : public StorageClientBase { +class StorageClient : public StorageClientBase { FRIEND_TEST(StorageClientTest, LeaderChangeTest); public: @@ -45,10 +44,10 @@ class GraphStorageClient : public StorageClientBase ioThreadPool, - meta::MetaClient* metaClient) + StorageClient(std::shared_ptr ioThreadPool, + meta::MetaClient* metaClient) : StorageClientBase(ioThreadPool, metaClient) {} - virtual ~GraphStorageClient() {} + virtual ~StorageClient() {} StorageRpcRespFuture getNeighbors( const CommonRequestParam& param, @@ -141,6 +140,19 @@ class GraphStorageClient : public StorageClientBase> get(GraphSpaceID space, + std::vector&& keys, + bool returnPartly = false, + folly::EventBase* evb = nullptr); + + folly::SemiFuture> put(GraphSpaceID space, + std::vector kvs, + folly::EventBase* evb = nullptr); + + folly::SemiFuture> remove(GraphSpaceID space, + std::vector keys, + folly::EventBase* evb = nullptr); + private: StatusOr> getIdFromRow(GraphSpaceID space, bool isEdgeProps) const; @@ -162,5 +174,3 @@ class GraphStorageClient : public StorageClientBase $ $ - $ + $ $ $ ${common_deps} @@ -127,7 +127,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ $ diff --git a/src/graph/context/QueryContext.cpp b/src/graph/context/QueryContext.cpp index f70727e9ad0..4fc2c6f65be 100644 --- a/src/graph/context/QueryContext.cpp +++ b/src/graph/context/QueryContext.cpp @@ -11,7 +11,7 @@ namespace graph { QueryContext::QueryContext(RequestContextPtr rctx, meta::SchemaManager* sm, meta::IndexManager* im, - storage::GraphStorageClient* storage, + storage::StorageClient* storage, meta::MetaClient* metaClient, CharsetInfo* charsetInfo) : rctx_(std::move(rctx)), diff --git a/src/graph/context/QueryContext.h b/src/graph/context/QueryContext.h index e9cba6cd4a1..00aa8b75336 100644 --- a/src/graph/context/QueryContext.h +++ b/src/graph/context/QueryContext.h @@ -7,7 +7,7 @@ #define GRAPH_CONTEXT_QUERYCONTEXT_H_ #include "clients/meta/MetaClient.h" -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/ObjectPool.h" #include "common/charset/Charset.h" #include "common/cpp/helpers.h" @@ -45,7 +45,7 @@ class QueryContext { QueryContext(RequestContextPtr rctx, meta::SchemaManager* sm, meta::IndexManager* im, - storage::GraphStorageClient* storage, + storage::StorageClient* storage, meta::MetaClient* metaClient, CharsetInfo* charsetInfo); @@ -57,7 +57,7 @@ class QueryContext { void setIndexManager(meta::IndexManager* im) { im_ = im; } - void setStorageClient(storage::GraphStorageClient* storage) { storageClient_ = storage; } + void setStorageClient(storage::StorageClient* storage) { storageClient_ = storage; } void setMetaClient(meta::MetaClient* metaClient) { metaClient_ = metaClient; } @@ -75,7 +75,7 @@ class QueryContext { meta::IndexManager* indexMng() const { return im_; } - storage::GraphStorageClient* getStorageClient() const { return storageClient_; } + storage::StorageClient* getStorageClient() const { return storageClient_; } meta::MetaClient* getMetaClient() const { return metaClient_; } @@ -105,7 +105,7 @@ class QueryContext { std::unique_ptr ep_; meta::SchemaManager* sm_{nullptr}; meta::IndexManager* im_{nullptr}; - storage::GraphStorageClient* storageClient_{nullptr}; + storage::StorageClient* storageClient_{nullptr}; meta::MetaClient* metaClient_{nullptr}; CharsetInfo* charsetInfo_{nullptr}; diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index 49fcb6a6830..a98ea20afe8 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -11,7 +11,7 @@ #include "graph/planner/plan/Mutate.h" #include "graph/util/SchemaUtil.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; namespace nebula { namespace graph { @@ -63,7 +63,7 @@ folly::Future DeleteVerticesExecutor::deleteVertices() { auto spaceId = spaceInfo.id; time::Duration deleteVertTime; auto plan = qctx()->plan(); - GraphStorageClient::CommonRequestParam param( + StorageClient::CommonRequestParam param( spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() @@ -119,7 +119,7 @@ folly::Future DeleteTagsExecutor::deleteTags() { auto spaceId = spaceInfo.id; time::Duration deleteTagTime; auto plan = qctx()->plan(); - GraphStorageClient::CommonRequestParam param( + StorageClient::CommonRequestParam param( spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() @@ -204,7 +204,7 @@ folly::Future DeleteEdgesExecutor::deleteEdges() { auto spaceId = spaceInfo.id; time::Duration deleteEdgeTime; auto plan = qctx()->plan(); - GraphStorageClient::CommonRequestParam param( + StorageClient::CommonRequestParam param( spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index 94717c0d6c3..bb8806ade8b 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -10,7 +10,7 @@ #include "graph/planner/plan/Mutate.h" #include "graph/service/GraphFlags.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; namespace nebula { namespace graph { @@ -23,7 +23,7 @@ folly::Future InsertVerticesExecutor::insertVertices() { auto *ivNode = asNode(node()); time::Duration addVertTime; auto plan = qctx()->plan(); - GraphStorageClient::CommonRequestParam param( + StorageClient::CommonRequestParam param( ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() @@ -47,7 +47,7 @@ folly::Future InsertEdgesExecutor::insertEdges() { auto *ieNode = asNode(node()); time::Duration addEdgeTime; auto plan = qctx()->plan(); - GraphStorageClient::CommonRequestParam param( + StorageClient::CommonRequestParam param( ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); param.useExperimentalFeature = FLAGS_enable_experimental_feature; return qctx() diff --git a/src/graph/executor/mutate/UpdateExecutor.cpp b/src/graph/executor/mutate/UpdateExecutor.cpp index de748c2c266..3abfd089973 100644 --- a/src/graph/executor/mutate/UpdateExecutor.cpp +++ b/src/graph/executor/mutate/UpdateExecutor.cpp @@ -11,7 +11,7 @@ #include "graph/service/GraphFlags.h" #include "graph/util/SchemaUtil.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; namespace nebula { namespace graph { @@ -49,7 +49,7 @@ folly::Future UpdateVertexExecutor::execute() { time::Duration updateVertTime; auto plan = qctx()->plan(); auto sess = qctx()->rctx()->session(); - GraphStorageClient::CommonRequestParam param( + StorageClient::CommonRequestParam param( uvNode->getSpaceId(), sess->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() @@ -100,7 +100,7 @@ folly::Future UpdateEdgeExecutor::execute() { time::Duration updateEdgeTime; auto plan = qctx()->plan(); - GraphStorageClient::CommonRequestParam param( + StorageClient::CommonRequestParam param( ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); param.useExperimentalFeature = FLAGS_enable_experimental_feature; return qctx() diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index 94f67efc581..aa6088f1db5 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -5,7 +5,7 @@ #include "graph/executor/query/AppendVerticesExecutor.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetPropResponse; @@ -25,17 +25,17 @@ folly::Future AppendVerticesExecutor::appendVertices() { SCOPED_TIMER(&execTime_); auto *av = asNode(node()); - GraphStorageClient *storageClient = qctx()->getStorageClient(); + StorageClient *storageClient = qctx()->getStorageClient(); DataSet vertices = buildRequestDataSet(av); if (vertices.rows.empty()) { return finish(ResultBuilder().value(Value(DataSet(av->colNames()))).build()); } - GraphStorageClient::CommonRequestParam param(av->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled()); + StorageClient::CommonRequestParam param(av->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); time::Duration getPropsTime; return DCHECK_NOTNULL(storageClient) ->getProps(param, diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index 45830738962..4ee5ddace34 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -10,7 +10,7 @@ #include "graph/planner/plan/Query.h" #include "graph/util/SchemaUtil.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetPropResponse; @@ -49,7 +49,7 @@ DataSet GetEdgesExecutor::buildRequestDataSet(const GetEdges *ge) { folly::Future GetEdgesExecutor::getEdges() { SCOPED_TIMER(&execTime_); - GraphStorageClient *client = qctx()->getStorageClient(); + StorageClient *client = qctx()->getStorageClient(); auto *ge = asNode(node()); if (ge->src() == nullptr || ge->type() == nullptr || ge->ranking() == nullptr || ge->dst() == nullptr) { @@ -65,10 +65,10 @@ folly::Future GetEdgesExecutor::getEdges() { } time::Duration getPropsTime; - GraphStorageClient::CommonRequestParam param(ge->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled()); + StorageClient::CommonRequestParam param(ge->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return DCHECK_NOTNULL(client) ->getProps(param, std::move(edges), diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index b8151d0b3e1..c315ce53910 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -7,14 +7,14 @@ #include -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/datatypes/List.h" #include "common/datatypes/Vertex.h" #include "common/time/ScopedTimer.h" #include "graph/context/QueryContext.h" #include "graph/service/GraphFlags.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetNeighborsResponse; @@ -40,12 +40,12 @@ folly::Future GetNeighborsExecutor::execute() { } time::Duration getNbrTime; - GraphStorageClient* storageClient = qctx_->getStorageClient(); + StorageClient* storageClient = qctx_->getStorageClient(); QueryExpressionContext qec(qctx()->ectx()); - GraphStorageClient::CommonRequestParam param(gn_->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled()); + StorageClient::CommonRequestParam param(gn_->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return storageClient ->getNeighbors(param, std::move(reqDs.colNames), diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index ec2d9934c37..f3a7bd2dd59 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -9,7 +9,7 @@ #include "graph/context/QueryContext.h" #include "graph/util/SchemaUtil.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetPropResponse; @@ -22,7 +22,7 @@ folly::Future GetVerticesExecutor::getVertices() { SCOPED_TIMER(&execTime_); auto *gv = asNode(node()); - GraphStorageClient *storageClient = qctx()->getStorageClient(); + StorageClient *storageClient = qctx()->getStorageClient(); DataSet vertices = buildRequestDataSet(gv); if (vertices.rows.empty()) { @@ -32,10 +32,10 @@ folly::Future GetVerticesExecutor::getVertices() { } time::Duration getPropsTime; - GraphStorageClient::CommonRequestParam param(gv->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled()); + StorageClient::CommonRequestParam param(gv->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return DCHECK_NOTNULL(storageClient) ->getProps(param, std::move(vertices), diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index 13f667be63f..3f3b20ae37e 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -11,7 +11,7 @@ #include "graph/planner/plan/PlanNode.h" #include "graph/service/GraphFlags.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::LookupIndexResp; @@ -21,7 +21,7 @@ namespace graph { folly::Future IndexScanExecutor::execute() { return indexScan(); } folly::Future IndexScanExecutor::indexScan() { - GraphStorageClient *storageClient = qctx_->getStorageClient(); + StorageClient *storageClient = qctx_->getStorageClient(); auto *lookup = asNode(node()); if (lookup->isEmptyResultSet()) { DataSet dataSet({"dummy"}); @@ -35,10 +35,10 @@ folly::Future IndexScanExecutor::indexScan() { return Status::Error("There is no index to use at runtime"); } - GraphStorageClient::CommonRequestParam param(lookup->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled()); + StorageClient::CommonRequestParam param(lookup->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return storageClient ->lookupIndex(param, ictxs, diff --git a/src/graph/executor/query/IndexScanExecutor.h b/src/graph/executor/query/IndexScanExecutor.h index c9823752e26..4d5e489ec20 100644 --- a/src/graph/executor/query/IndexScanExecutor.h +++ b/src/graph/executor/query/IndexScanExecutor.h @@ -6,7 +6,7 @@ #ifndef GRAPH_EXECUTOR_QUERY_INDEXSCANEXECUTOR_H_ #define GRAPH_EXECUTOR_QUERY_INDEXSCANEXECUTOR_H_ -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "graph/executor/StorageAccessExecutor.h" #include "graph/planner/plan/Query.h" #include "interface/gen-cpp2/storage_types.h" diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index d2e23e2d458..ee1e638da8f 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -7,7 +7,7 @@ #include -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/datatypes/List.h" #include "common/datatypes/Vertex.h" #include "common/time/ScopedTimer.h" @@ -15,7 +15,7 @@ #include "graph/service/GraphFlags.h" #include "graph/util/SchemaUtil.h" -using nebula::storage::GraphStorageClient; +using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetNeighborsResponse; @@ -85,12 +85,12 @@ folly::Future TraverseExecutor::traverse() { void TraverseExecutor::getNeighbors() { currentStep_++; time::Duration getNbrTime; - GraphStorageClient* storageClient = qctx_->getStorageClient(); + StorageClient* storageClient = qctx_->getStorageClient(); bool finalStep = isFinalStep(); - GraphStorageClient::CommonRequestParam param(traverse_->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled()); + StorageClient::CommonRequestParam param(traverse_->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); storageClient ->getNeighbors(param, reqDs_.colNames, diff --git a/src/graph/executor/query/TraverseExecutor.h b/src/graph/executor/query/TraverseExecutor.h index 4f2802cb68d..d85d7c44247 100644 --- a/src/graph/executor/query/TraverseExecutor.h +++ b/src/graph/executor/query/TraverseExecutor.h @@ -8,7 +8,7 @@ #include -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/StatusOr.h" #include "common/datatypes/Value.h" #include "common/datatypes/Vertex.h" diff --git a/src/graph/executor/test/CMakeLists.txt b/src/graph/executor/test/CMakeLists.txt index d88f3b299e7..f664ee5f192 100644 --- a/src/graph/executor/test/CMakeLists.txt +++ b/src/graph/executor/test/CMakeLists.txt @@ -8,7 +8,7 @@ SET(EXEC_QUERY_TEST_OBJS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/planner/test/CMakeLists.txt b/src/graph/planner/test/CMakeLists.txt index 355c1cbae60..bb45ebd9201 100644 --- a/src/graph/planner/test/CMakeLists.txt +++ b/src/graph/planner/test/CMakeLists.txt @@ -14,7 +14,7 @@ nebula_add_test( $ $ $ - $ + $ $ $ $ diff --git a/src/graph/service/GraphService.cpp b/src/graph/service/GraphService.cpp index 8058688de71..3b58c3be851 100644 --- a/src/graph/service/GraphService.cpp +++ b/src/graph/service/GraphService.cpp @@ -5,7 +5,7 @@ #include "graph/service/GraphService.h" -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/Base.h" #include "common/encryption/MD5Utils.h" #include "common/time/Duration.h" diff --git a/src/graph/service/QueryEngine.cpp b/src/graph/service/QueryEngine.cpp index b5d47df8e3a..d3f4f54e609 100644 --- a/src/graph/service/QueryEngine.cpp +++ b/src/graph/service/QueryEngine.cpp @@ -29,7 +29,7 @@ Status QueryEngine::init(std::shared_ptr ioExecutor metaClient_ = metaClient; schemaManager_ = meta::ServerBasedSchemaManager::create(metaClient_); indexManager_ = meta::ServerBasedIndexManager::create(metaClient_); - storage_ = std::make_unique(ioExecutor, metaClient_); + storage_ = std::make_unique(ioExecutor, metaClient_); charsetInfo_ = CharsetInfo::instance(); PlannersRegister::registerPlanners(); diff --git a/src/graph/service/QueryEngine.h b/src/graph/service/QueryEngine.h index a0b777bc35b..b85f5dc0e78 100644 --- a/src/graph/service/QueryEngine.h +++ b/src/graph/service/QueryEngine.h @@ -9,7 +9,7 @@ #include #include "clients/meta/MetaClient.h" -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/charset/Charset.h" #include "common/cpp/helpers.h" #include "common/meta/IndexManager.h" @@ -45,7 +45,7 @@ class QueryEngine final : public cpp::NonCopyable, public cpp::NonMovable { std::unique_ptr schemaManager_; std::unique_ptr indexManager_; - std::unique_ptr storage_; + std::unique_ptr storage_; std::unique_ptr optimizer_; std::unique_ptr memoryMonitorThread_; meta::MetaClient* metaClient_{nullptr}; diff --git a/src/interface/CMakeLists.txt b/src/interface/CMakeLists.txt index 21fc56f4336..5dcfefbaabb 100644 --- a/src/interface/CMakeLists.txt +++ b/src/interface/CMakeLists.txt @@ -22,7 +22,7 @@ thrift_generate("raftex" "RaftexService" ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CUR # Target object name : storage_thrift_obj thrift_generate( "storage" - "GraphStorageService;StorageAdminService;GeneralStorageService;InternalStorageService" + "GraphStorageService;StorageAdminService;InternalStorageService" ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} "interface" diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 3451fd8b8e2..98c884b5485 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -630,18 +630,37 @@ struct TaskPara { 3: optional list task_specific_paras } -struct AddAdminTaskRequest { - // rebuild index / flush / compact / stats - 1: meta.AdminCmd cmd - 2: i32 job_id - 3: i32 task_id - 4: TaskPara para - 5: optional i32 concurrency +////////////////////////////////////////////////////////// +// +// Requests, responses for the kv interfaces +// +////////////////////////////////////////////////////////// +struct KVGetRequest { + 1: common.GraphSpaceID space_id, + 2: map>( + cpp.template = "std::unordered_map") parts, + // When return_partly is true and some of the keys not found, will return the keys + // which exist + 3: bool return_partly } -struct StopAdminTaskRequest { - 1: i32 job_id - 2: i32 task_id +struct KVGetResponse { + 1: required ResponseCommon result, + 2: map(cpp.template = "std::unordered_map") key_values, +} + +struct KVPutRequest { + 1: common.GraphSpaceID space_id, + // part -> key/value + 2: map>( + cpp.template = "std::unordered_map") parts, +} + +struct KVRemoveRequest { + 1: common.GraphSpaceID space_id, + // part -> key + 2: map>( + cpp.template = "std::unordered_map") parts, } service GraphStorageService { @@ -672,6 +691,10 @@ service GraphStorageService { UpdateResponse chainUpdateEdge(1: UpdateEdgeRequest req); ExecResponse chainAddEdges(1: AddEdgesRequest req); + + KVGetResponse get(1: KVGetRequest req); + ExecResponse put(1: KVPutRequest req); + ExecResponse remove(1: KVRemoveRequest req); } @@ -790,6 +813,20 @@ struct ListClusterInfoResp { struct ListClusterInfoReq { } +struct AddAdminTaskRequest { + // rebuild index / flush / compact / statis + 1: meta.AdminCmd cmd + 2: i32 job_id + 3: i32 task_id + 4: TaskPara para + 5: optional i32 concurrency +} + +struct StopAdminTaskRequest { + 1: i32 job_id + 2: i32 task_id +} + service StorageAdminService { // Interfaces for admin operations AdminExecResp transLeader(1: TransLeaderReq req); @@ -820,50 +857,6 @@ service StorageAdminService { } -////////////////////////////////////////////////////////// -// -// Requests, responses for the GeneralStorageService -// -////////////////////////////////////////////////////////// -struct KVGetRequest { - 1: common.GraphSpaceID space_id, - 2: map>( - cpp.template = "std::unordered_map") parts, - // When return_partly is true and some of the keys not found, will return the keys - // which exist - 3: bool return_partly -} - - -struct KVGetResponse { - 1: required ResponseCommon result, - 2: map(cpp.template = "std::unordered_map") key_values, -} - - -struct KVPutRequest { - 1: common.GraphSpaceID space_id, - // part -> key/value - 2: map>( - cpp.template = "std::unordered_map") parts, -} - - -struct KVRemoveRequest { - 1: common.GraphSpaceID space_id, - // part -> key - 2: map>( - cpp.template = "std::unordered_map") parts, -} - - -service GeneralStorageService { - // Interfaces for key-value storage - KVGetResponse get(1: KVGetRequest req); - ExecResponse put(1: KVPutRequest req); - ExecResponse remove(1: KVRemoveRequest req); -} - ////////////////////////////////////////////////////////// // // Requests, responses for the InternalStorageService diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index a02e1bd3073..7faf784bc26 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -110,11 +110,10 @@ set(meta_test_deps $ $ $ - $ $ $ $ - $ + $ $ $ $ diff --git a/src/mock/MockCluster.cpp b/src/mock/MockCluster.cpp index 3bc79378900..ad4e91b44f8 100644 --- a/src/mock/MockCluster.cpp +++ b/src/mock/MockCluster.cpp @@ -12,7 +12,6 @@ #include "mock/AdHocSchemaManager.h" #include "mock/MockData.h" #include "storage/CompactionFilter.h" -#include "storage/GeneralStorageServiceHandler.h" #include "storage/GraphStorageServiceHandler.h" #include "storage/StorageAdminServiceHandler.h" #include "storage/transaction/TransactionManager.h" @@ -217,7 +216,6 @@ void MockCluster::initStorageKV(const char* dataPath, void MockCluster::startStorage(HostAddr addr, const std::string& rootPath, - bool isGeneralService, SchemaVer schemaVerCount) { initStorageKV(rootPath.c_str(), addr, schemaVerCount); @@ -227,17 +225,10 @@ void MockCluster::startStorage(HostAddr addr, storageAdminServer_->start("admin-storage", addr.port - 1, adminHandler); LOG(INFO) << "The admin storage daemon started on port " << storageAdminServer_->port_; - if (!isGeneralService) { - graphStorageServer_ = std::make_unique(); - auto graphHandler = std::make_shared(env); - graphStorageServer_->start("graph-storage", addr.port, graphHandler); - LOG(INFO) << "The graph storage daemon started on port " << graphStorageServer_->port_; - } else { - generalStorageServer_ = std::make_unique(); - auto generalHandler = std::make_shared(env); - generalStorageServer_->start("general-storage", addr.port, generalHandler); - LOG(INFO) << "The general storage daemon started on port " << generalStorageServer_->port_; - } + graphStorageServer_ = std::make_unique(); + auto graphHandler = std::make_shared(env); + graphStorageServer_->start("graph-storage", addr.port, graphHandler); + LOG(INFO) << "The graph storage daemon started on port " << graphStorageServer_->port_; } std::unique_ptr MockCluster::memSchemaMan(SchemaVer schemaVerCount, @@ -293,17 +284,11 @@ meta::MetaClient* MockCluster::initMetaClient(meta::MetaClientOptions options) { return metaClient_.get(); } -storage::GraphStorageClient* MockCluster::initGraphStorageClient() { +storage::StorageClient* MockCluster::initGraphStorageClient() { auto threadPool = std::make_shared(1); - storageClient_ = std::make_unique(threadPool, metaClient_.get()); + storageClient_ = std::make_unique(threadPool, metaClient_.get()); return storageClient_.get(); } -storage::GeneralStorageClient* MockCluster::initGeneralStorageClient() { - auto threadPool = std::make_shared(1); - generalClient_ = std::make_unique(threadPool, metaClient_.get()); - return generalClient_.get(); -} - } // namespace mock } // namespace nebula diff --git a/src/mock/MockCluster.h b/src/mock/MockCluster.h index f7ed05a4ba2..8548a0e51f4 100644 --- a/src/mock/MockCluster.h +++ b/src/mock/MockCluster.h @@ -12,8 +12,7 @@ #include #include -#include "clients/storage/GeneralStorageClient.h" -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/Base.h" #include "common/base/ObjectPool.h" #include "kvstore/KVStore.h" @@ -36,7 +35,6 @@ class MockCluster { stop(); storageAdminServer_.reset(); graphStorageServer_.reset(); - generalStorageServer_.reset(); } void startAll(); @@ -45,7 +43,6 @@ class MockCluster { void startStorage(HostAddr addr, const std::string& rootPath, - bool isGeneralService = false, SchemaVer schemaVerCount = 1); /** @@ -58,9 +55,7 @@ class MockCluster { * Init a storage client connect to graphStorageServer * The meta server, and meta client must started first * */ - storage::GraphStorageClient* initGraphStorageClient(); - - storage::GeneralStorageClient* initGeneralStorageClient(); + storage::StorageClient* initGraphStorageClient(); std::unique_ptr memSchemaMan(SchemaVer schemaVerCount = 1, GraphSpaceID spaceId = 1, @@ -118,13 +113,11 @@ class MockCluster { public: std::unique_ptr metaServer_{nullptr}; std::unique_ptr metaClient_{nullptr}; - std::unique_ptr storageClient_{nullptr}; - std::unique_ptr generalClient_{nullptr}; + std::unique_ptr storageClient_{nullptr}; std::unique_ptr metaKV_{nullptr}; std::unique_ptr storageAdminServer_{nullptr}; std::unique_ptr graphStorageServer_{nullptr}; - std::unique_ptr generalStorageServer_{nullptr}; std::unique_ptr storageKV_{nullptr}; std::unique_ptr storageEnv_{nullptr}; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index c182f8b4f99..4f263476edc 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -49,11 +49,6 @@ nebula_add_library( exec/IndexScanNode.cpp exec/IndexSelectionNode.cpp exec/IndexVertexScanNode.cpp -) - -nebula_add_library( - general_storage_service_handler OBJECT - GeneralStorageServiceHandler.cpp kv/PutProcessor.cpp kv/GetProcessor.cpp kv/RemoveProcessor.cpp diff --git a/src/storage/CompactionFilter.h b/src/storage/CompactionFilter.h index 6f95e3675fd..1c17b1d3ff4 100644 --- a/src/storage/CompactionFilter.h +++ b/src/storage/CompactionFilter.h @@ -15,8 +15,6 @@ #include "kvstore/CompactionFilter.h" #include "storage/CommonUtils.h" -DEFINE_bool(storage_kv_mode, false, "True for kv mode"); - namespace nebula { namespace storage { @@ -32,11 +30,6 @@ class StorageCompactionFilter final : public kvstore::KVFilter { bool filter(GraphSpaceID spaceId, const folly::StringPiece& key, const folly::StringPiece& val) const override { - if (FLAGS_storage_kv_mode) { - // in kv mode, we don't delete any data - return false; - } - if (NebulaKeyUtils::isTag(vIdLen_, key)) { return !vertexValid(spaceId, key, val); } else if (NebulaKeyUtils::isEdge(vIdLen_, key)) { diff --git a/src/storage/GeneralStorageServiceHandler.cpp b/src/storage/GeneralStorageServiceHandler.cpp deleted file mode 100644 index 308bbd97fd2..00000000000 --- a/src/storage/GeneralStorageServiceHandler.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "storage/GeneralStorageServiceHandler.h" - -#include "storage/kv/GetProcessor.h" -#include "storage/kv/PutProcessor.h" -#include "storage/kv/RemoveProcessor.h" - -#define RETURN_FUTURE(processor) \ - auto f = processor->getFuture(); \ - processor->process(req); \ - return f; - -namespace nebula { -namespace storage { - -GeneralStorageServiceHandler::GeneralStorageServiceHandler(StorageEnv* env) : env_(env) { - kPutCounters.init("put"); - kGetCounters.init("get"); - kRemoveCounters.init("remove"); -} - -folly::Future GeneralStorageServiceHandler::future_put( - const cpp2::KVPutRequest& req) { - auto* processor = PutProcessor::instance(env_); - RETURN_FUTURE(processor); -} - -folly::Future GeneralStorageServiceHandler::future_get( - const cpp2::KVGetRequest& req) { - auto* processor = GetProcessor::instance(env_); - RETURN_FUTURE(processor); -} - -folly::Future GeneralStorageServiceHandler::future_remove( - const cpp2::KVRemoveRequest& req) { - auto* processor = RemoveProcessor::instance(env_); - RETURN_FUTURE(processor); -} - -} // namespace storage -} // namespace nebula diff --git a/src/storage/GeneralStorageServiceHandler.h b/src/storage/GeneralStorageServiceHandler.h deleted file mode 100644 index de1e4ed89ec..00000000000 --- a/src/storage/GeneralStorageServiceHandler.h +++ /dev/null @@ -1,33 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef STORAGE_GENERALSTORAGESERVICEHANDLER_H_ -#define STORAGE_GENERALSTORAGESERVICEHANDLER_H_ - -#include "common/base/Base.h" -#include "interface/gen-cpp2/GeneralStorageService.h" - -namespace nebula { -namespace storage { - -class StorageEnv; - -class GeneralStorageServiceHandler final : public cpp2::GeneralStorageServiceSvIf { - public: - explicit GeneralStorageServiceHandler(StorageEnv* env); - - folly::Future future_put(const cpp2::KVPutRequest& req) override; - - folly::Future future_get(const cpp2::KVGetRequest& req) override; - - folly::Future future_remove(const cpp2::KVRemoveRequest& req) override; - - private: - StorageEnv* env_{nullptr}; -}; - -} // namespace storage -} // namespace nebula -#endif // STORAGE_GENERALSTORAGESERVICEHANDLER_H_ diff --git a/src/storage/GraphStorageServiceHandler.cpp b/src/storage/GraphStorageServiceHandler.cpp index 2c99894097f..23f9abe18ec 100644 --- a/src/storage/GraphStorageServiceHandler.cpp +++ b/src/storage/GraphStorageServiceHandler.cpp @@ -6,6 +6,9 @@ #include "storage/GraphStorageServiceHandler.h" #include "storage/index/LookupProcessor.h" +#include "storage/kv/GetProcessor.h" +#include "storage/kv/PutProcessor.h" +#include "storage/kv/RemoveProcessor.h" #include "storage/mutate/AddEdgesProcessor.h" #include "storage/mutate/AddVerticesProcessor.h" #include "storage/mutate/DeleteEdgesProcessor.h" @@ -57,6 +60,9 @@ GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env) : env_(e kLookupCounters.init("lookup"); kScanVertexCounters.init("scan_vertex"); kScanEdgeCounters.init("scan_edge"); + kPutCounters.init("kv_put"); + kGetCounters.init("kv_get"); + kRemoveCounters.init("kv_remove"); } // Vertice section @@ -154,5 +160,23 @@ folly::Future GraphStorageServiceHandler::future_chainAddEdg RETURN_FUTURE(processor); } +folly::Future GraphStorageServiceHandler::future_put( + const cpp2::KVPutRequest& req) { + auto* processor = PutProcessor::instance(env_); + RETURN_FUTURE(processor); +} + +folly::Future GraphStorageServiceHandler::future_get( + const cpp2::KVGetRequest& req) { + auto* processor = GetProcessor::instance(env_); + RETURN_FUTURE(processor); +} + +folly::Future GraphStorageServiceHandler::future_remove( + const cpp2::KVRemoveRequest& req) { + auto* processor = RemoveProcessor::instance(env_); + RETURN_FUTURE(processor); +} + } // namespace storage } // namespace nebula diff --git a/src/storage/GraphStorageServiceHandler.h b/src/storage/GraphStorageServiceHandler.h index 905c3fd67a7..a28b1509cb1 100644 --- a/src/storage/GraphStorageServiceHandler.h +++ b/src/storage/GraphStorageServiceHandler.h @@ -62,6 +62,12 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf { folly::Future future_getUUID(const cpp2::GetUUIDReq& req) override; + folly::Future future_put(const cpp2::KVPutRequest& req) override; + + folly::Future future_get(const cpp2::KVGetRequest& req) override; + + folly::Future future_remove(const cpp2::KVRemoveRequest& req) override; + private: StorageEnv* env_{nullptr}; std::shared_ptr readerPool_; diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index bdc332e2ab7..44e864928b3 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -38,6 +38,7 @@ DEFINE_int32(num_io_threads, 16, "Number of IO threads"); DEFINE_int32(num_worker_threads, 32, "Number of workers"); DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread"); DEFINE_bool(local_config, false, "meta client will not retrieve latest configuration from meta"); +DEFINE_bool(storage_kv_mode, false, "True for kv mode"); namespace nebula { namespace storage { @@ -62,8 +63,10 @@ std::unique_ptr StorageServer::getStoreInstance() { options.listenerPath_ = listenerPath_; options.partMan_ = std::make_unique(localHost_, metaClient_.get()); - options.cffBuilder_ = - std::make_unique(schemaMan_.get(), indexMan_.get()); + if (!FLAGS_storage_kv_mode) { + options.cffBuilder_ = + std::make_unique(schemaMan_.get(), indexMan_.get()); + } options.schemaMan_ = schemaMan_.get(); if (FLAGS_store_type == "nebula") { auto nbStore = std::make_unique( diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index ceb3809be17..44f27faa41f 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -7,9 +7,8 @@ set(storage_test_deps $ $ $ - $ $ - $ + $ $ $ $ @@ -583,7 +582,6 @@ nebula_add_test( SOURCES KVClientTest.cpp OBJECTS - $ ${storage_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} diff --git a/src/storage/test/KVClientTest.cpp b/src/storage/test/KVClientTest.cpp index 7249997686f..06122bf9ca1 100644 --- a/src/storage/test/KVClientTest.cpp +++ b/src/storage/test/KVClientTest.cpp @@ -5,7 +5,7 @@ #include -#include "clients/storage/GeneralStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/Base.h" #include "common/datatypes/KeyValue.h" #include "common/fs/TempDir.h" @@ -44,9 +44,9 @@ TEST(KVClientTest, SimpleTest) { options.localHost_ = storageAddr; options.role_ = meta::cpp2::HostRole::STORAGE; cluster.initMetaClient(options); - cluster.startStorage(storageAddr, storagePath.path(), true); + cluster.startStorage(storageAddr, storagePath.path()); - auto client = cluster.initGeneralStorageClient(); + auto client = cluster.initGraphStorageClient(); // kv interface test { std::vector pairs; diff --git a/src/storage/test/KVTest.cpp b/src/storage/test/KVTest.cpp index d7520bbc903..725ec1d8803 100644 --- a/src/storage/test/KVTest.cpp +++ b/src/storage/test/KVTest.cpp @@ -5,7 +5,7 @@ #include -#include "clients/storage/GeneralStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/Base.h" #include "common/fs/TempDir.h" #include "common/network/NetworkUtils.h" diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 10f658f2a11..6b7dd2ec136 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -5,7 +5,7 @@ #include -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "codec/RowReader.h" #include "common/base/Base.h" #include "common/fs/TempDir.h" diff --git a/src/storage/test/StorageLookupBenchmark.cpp b/src/storage/test/StorageLookupBenchmark.cpp index d421b3910cc..95d229a530c 100644 --- a/src/storage/test/StorageLookupBenchmark.cpp +++ b/src/storage/test/StorageLookupBenchmark.cpp @@ -5,7 +5,7 @@ #include -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "codec/RowWriter.h" #include "common/base/Base.h" #include "common/fs/FileUtils.h" diff --git a/src/storage/test/TossTestExecutor.h b/src/storage/test/TossTestExecutor.h index 85475f7f260..99e81c550a5 100644 --- a/src/storage/test/TossTestExecutor.h +++ b/src/storage/test/TossTestExecutor.h @@ -10,7 +10,7 @@ namespace nebula { namespace storage { -using StorageClient = storage::GraphStorageClient; +using StorageClient = storage::StorageClient; template class StorageResponseReader { diff --git a/src/storage/test/TossTestUtils.h b/src/storage/test/TossTestUtils.h index f7000c47ecc..006cd46753f 100644 --- a/src/storage/test/TossTestUtils.h +++ b/src/storage/test/TossTestUtils.h @@ -14,8 +14,8 @@ #include #include -#include "clients/storage/GraphStorageClient.h" #include "clients/storage/InternalStorageClient.h" +#include "clients/storage/StorageClient.h" #include "codec/RowWriterV2.h" #include "common/base/Base.h" #include "common/expression/ConstantExpression.h" diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 23fb03897d2..9c0e31c33f0 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -1,5 +1,5 @@ nebula_add_subdirectory(storage-perf) -#nebula_add_subdirectory(simple-kv-verify) +nebula_add_subdirectory(simple-kv-verify) nebula_add_subdirectory(meta-dump) nebula_add_subdirectory(db-dump) nebula_add_subdirectory(db-upgrade) diff --git a/src/tools/simple-kv-verify/CMakeLists.txt b/src/tools/simple-kv-verify/CMakeLists.txt index cf65e874429..2aff91b1a5b 100644 --- a/src/tools/simple-kv-verify/CMakeLists.txt +++ b/src/tools/simple-kv-verify/CMakeLists.txt @@ -10,6 +10,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ @@ -20,11 +21,12 @@ nebula_add_executable( $ $ $ + $ + $ $ $ $ $ - $ $ $ $ @@ -40,25 +42,20 @@ nebula_add_executable( $ $ $ - $ $ $ $ $ + $ + $ + $ $ + $ $ LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} wangle gtest ) - -#install( -# TARGETS -# simple_kv_verify -# DESTINATION -# bin -# COMPONENT -# tool -#) diff --git a/src/tools/simple-kv-verify/SimpleKVVerifyTool.cpp b/src/tools/simple-kv-verify/SimpleKVVerifyTool.cpp index 8590bf29be6..35179b17c72 100644 --- a/src/tools/simple-kv-verify/SimpleKVVerifyTool.cpp +++ b/src/tools/simple-kv-verify/SimpleKVVerifyTool.cpp @@ -4,9 +4,10 @@ */ #include +#include #include "clients/meta/MetaClient.h" -#include "clients/storage/GeneralStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/Base.h" #include "common/datatypes/KeyValue.h" #include "common/meta/SchemaManager.h" @@ -52,8 +53,7 @@ class SimpleKVVerifyTool { spaceId_ = spaceResult.value(); LOG(INFO) << "Space ID: " << spaceId_; - generalStorageClient_ = - std::make_unique(ioExecutor, metaClient_.get()); + storageClient_ = std::make_unique(ioExecutor, metaClient_.get()); return EXIT_SUCCESS; } @@ -66,7 +66,7 @@ class SimpleKVVerifyTool { keyValues.emplace_back(std::make_pair(key, value)); } - auto future = generalStorageClient_->put(spaceId_, std::move(keyValues)); + auto future = storageClient_->put(spaceId_, std::move(keyValues)); auto resp = std::move(future).get(); if (!resp.succeeded()) { LOG(ERROR) << "Put Failed"; @@ -89,7 +89,7 @@ class SimpleKVVerifyTool { keys.emplace_back(pair.first); } - auto future = generalStorageClient_->get(spaceId_, std::move(keys)); + auto future = storageClient_->get(spaceId_, std::move(keys)); auto resp = std::move(future).get(); if (!resp.succeeded()) { LOG(ERROR) << "Get Failed"; @@ -108,8 +108,9 @@ class SimpleKVVerifyTool { auto key = pair.first; bool found = false; for (const auto& result : resp.responses()) { - auto iter = result.key_values.find(key); - if (iter != result.key_values.end()) { + auto kvs = result.get_key_values(); + auto iter = kvs.find(key); + if (iter != kvs.end()) { if (iter->second != pairs[key]) { LOG(ERROR) << "Check Fail: key = " << key << ", values: " << iter->second << " != " << pairs[key]; @@ -128,7 +129,7 @@ class SimpleKVVerifyTool { } private: - std::unique_ptr generalStorageClient_; + std::unique_ptr storageClient_; std::unique_ptr metaClient_; nebula::GraphSpaceID spaceId_; }; diff --git a/src/tools/storage-perf/CMakeLists.txt b/src/tools/storage-perf/CMakeLists.txt index deb1bd67277..ff7ebc52b71 100644 --- a/src/tools/storage-perf/CMakeLists.txt +++ b/src/tools/storage-perf/CMakeLists.txt @@ -16,7 +16,7 @@ set(perf_test_deps $ $ $ - $ + $ $ $ $ diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index e67c96579cc..2e661e462a3 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -5,7 +5,7 @@ #include -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "codec/RowReader.h" #include "common/base/Base.h" #include "common/time/Duration.h" @@ -13,49 +13,33 @@ DEFINE_string(meta_server_addrs, "", "meta server address"); DEFINE_int32(io_threads, 10, "client io threads"); -DEFINE_int32(partition_num, 1024, "partition for space"); DEFINE_string(space_name, "test_space", "the space name"); -DEFINE_string(tag_name, "test_tag", "the tag name"); -DEFINE_string(prop_name, "test_prop", "the property name"); -DEFINE_string(first_vertex_id, "1", "The smallest vertex id"); -DEFINE_uint64(width, 100, "width of matrix"); -DEFINE_uint64(height, 1000, "height of matrix"); - -DECLARE_int32(heartbeat_interval_secs); +DEFINE_string(first_key, "1", "the smallest key"); +DEFINE_uint32(width, 100, "width of matrix"); +DEFINE_uint32(height, 1000, "height of matrix"); namespace nebula { namespace storage { /** - * We generate a big circle of data, all node is the vertex, and the vertex have - * only one property of the next vertex, so we can validate them by traversing. + * We generate a big circle of data, all node are key/values, the value is the next node's key + * , so we can validate them by traversing. * - * There are some gflags we need to pay attention: - * 1. The space's replica must be 1, because we don't have retry in - * StorageClient, we will update it after we support preheat. The tag must have - * only one int property, which is prop_name. - * 2. If the space and tag doesn't exists, it will try to create one, maybe you - * need to set heartbeat_interval_secs to make sure the storage service has load - * meta. - * 3. The width and height is the size of the big linked list, you can refer to - * the graph below. As expected, we can traverse the big linked list after width - * * height steps starting from any node in the list. + * The width and height is the size of the big linked list, you can refer to the graph below. As + * expected, we can traverse the big linked list after width * height steps starting from any node + * in the list. */ class IntegrityTest { public: - IntegrityTest() - : propName_(FLAGS_prop_name), - width_{FLAGS_width}, - height_{FLAGS_height}, - firstVertexId_{FLAGS_first_vertex_id} {} + IntegrityTest() : width_{FLAGS_width}, height_{FLAGS_height}, firstKey_{FLAGS_first_key} {} int run() { if (!init()) { return EXIT_FAILURE; } prepareData(); - if (!validate(firstVertexId_, width_ * height_)) { + if (!validate(firstKey_, width_ * height_)) { LOG(INFO) << "Integrity test failed"; return EXIT_FAILURE; } @@ -65,7 +49,12 @@ class IntegrityTest { private: bool init() { - FLAGS_heartbeat_interval_secs = 10; + if (static_cast(width_) * static_cast(height_) > + std::numeric_limits::max()) { + LOG(ERROR) << "Width * Height is out of range"; + return false; + } + auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) { LOG(ERROR) << "Can't get metaServer address, status: " << metaAddrsRet.status() @@ -84,41 +73,13 @@ class IntegrityTest { auto spaceResult = mClient_->getSpaceIdByNameFromCache(FLAGS_space_name); if (!spaceResult.ok()) { - LOG(ERROR) << "Get spaceId failed, try to create one"; - meta::cpp2::SpaceDesc spaceDesc; - spaceDesc.set_space_name(FLAGS_space_name); - spaceDesc.set_partition_num(FLAGS_partition_num); - spaceDesc.set_replica_factor(1); - auto ret = mClient_->createSpace(spaceDesc).get(); - if (!ret.ok()) { - LOG(ERROR) << "Create space failed: " << ret.status(); - return false; - } - spaceId_ = ret.value(); + LOG(ERROR) << "Get spaceId failed"; + return false; } else { spaceId_ = spaceResult.value(); } - auto tagResult = mClient_->getTagIDByNameFromCache(spaceId_, FLAGS_tag_name); - if (!tagResult.ok()) { - sleep(FLAGS_heartbeat_interval_secs + 1); - LOG(ERROR) << "Get tagId failed, try to create one: " << tagResult.status(); - nebula::meta::cpp2::Schema schema; - nebula::meta::cpp2::ColumnDef column; - column.name = FLAGS_prop_name; - column.type.set_type(nebula::cpp2::PropertyType::INT64); - (*schema.columns_ref()).emplace_back(std::move(column)); - auto ret = mClient_->createTagSchema(spaceId_, FLAGS_tag_name, schema).get(); - if (!ret.ok()) { - LOG(ERROR) << "Create tag failed: " << ret.status(); - return false; - } - tagId_ = ret.value(); - } else { - tagId_ = tagResult.value(); - } - - client_ = std::make_unique(threadPool_, mClient_.get()); + client_ = std::make_unique(threadPool_, mClient_.get()); return true; } @@ -145,32 +106,31 @@ class IntegrityTest { * |___________________________| */ void prepareData() { - std::vector first; - std::vector prev; - std::vector cur; + std::vector first; + std::vector prev; + std::vector cur; - LOG(INFO) << "Start insert vertex"; + LOG(INFO) << "Start insert kvs"; for (size_t i = 0; i < width_; i++) { - prev.emplace_back(std::to_string(std::atol(firstVertexId_.c_str()) + i)); + prev.emplace_back(std::to_string(std::atoi(firstKey_.c_str()) + i)); } // leave alone the first line, generate other lines for (size_t i = 1; i < height_; i++) { - addVertex(prev, cur, std::to_string(std::atol(firstVertexId_.c_str() + i * width_))); + insertRow(prev, cur, std::to_string(std::atoi(firstKey_.c_str()) + i * width_)); prev = std::move(cur); } // shift the last line std::rotate(prev.begin(), prev.end() - 1, prev.end()); // generate first line, each node in first line will points to a node in // rotated last line, which will make the matrix a big linked list - addVertex(prev, first, firstVertexId_); + insertRow(prev, first, firstKey_); LOG(INFO) << "Prepare data ok"; } - void addVertex(std::vector& prev, std::vector& cur, VertexID startId) { - std::unordered_map> propNames; - propNames[tagId_].emplace_back(propName_); - GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); - auto future = client_->addVertices(param, genVertices(prev, cur, startId), propNames, true); + void insertRow(const std::vector& prev, + std::vector& cur, + const std::string& startId) { + auto future = client_->put(spaceId_, genKeyValue(prev, cur, startId)); auto resp = std::move(future).get(); if (!resp.succeeded()) { for (auto& err : resp.failedParts()) { @@ -180,82 +140,45 @@ class IntegrityTest { } } - std::vector genVertices(std::vector& prev, - std::vector& cur, - VertexID startId) { - // We insert add vertices of a row once a time - std::vector newVertices; + std::vector genKeyValue(const std::vector& prev, + std::vector& cur, + const std::string& startId) { + // We insert key-values of a row once a time + std::vector kvs; for (size_t i = 0; i < width_; i++) { - VertexID vId; - vId = std::to_string(std::atol(startId.c_str()) + i); - cur.emplace_back(vId); - - storage::cpp2::NewVertex v; - v.set_id(vId); - std::vector tags; + auto key = std::to_string(std::atoi(startId.c_str()) + i); + cur.emplace_back(key); + kvs.emplace_back(std::make_pair(cur[i], prev[i])); - storage::cpp2::NewTag tag; - tag.set_tag_id(tagId_); - - std::vector props; - Value val(prev[i]); - props.emplace_back(val); - tag.set_props(props); - tags.emplace_back(std::move(tag)); - - v.set_tags(std::move(tags)); - newVertices.emplace_back(std::move(v)); VLOG(2) << "Build " << cur[i] << " -> " << prev[i]; - PLOG_EVERY_N(INFO, 10000) << "We have inserted " - << std::atol(vId.c_str()) - std::atol(firstVertexId_.c_str()) - - width_ - << " vertices so far, total: " << width_ * height_; + LOG_EVERY_N(INFO, 10000) << "We have inserted " + << std::atoi(key.c_str()) - std::atoi(firstKey_.c_str()) - width_ + << " key-value so far, total: " << width_ * height_; } - return newVertices; + return kvs; } - bool validate(VertexID startId, int64_t queryTimes) { + bool validate(const std::string& startId, int64_t queryTimes) { int64_t count = 0; - VertexID nextId = startId; + std::string nextId = startId; while (count < queryTimes) { - PLOG_EVERY_N(INFO, 1000) << "We have gone " << count << " steps so far"; - // TODO support getProps - std::vector props; - cpp2::VertexProp tagProp; - tagProp.set_tag(tagId_); - (*tagProp.props_ref()).emplace_back(propName_); - DataSet dataset({kVid}); - GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); - auto future = client_->getProps(param, dataset, &props, nullptr, nullptr); + LOG_EVERY_N(INFO, 1000) << "We have gone " << count << " steps so far"; + auto future = client_->get(spaceId_, {nextId}); auto resp = std::move(future).get(); if (!resp.succeeded()) { - LOG(ERROR) << "Failed to fetch props of vertex " << nextId; + LOG(ERROR) << "Failed to get value of " << nextId; return false; } -// TODO -#if 0 - auto& results = resp.responses(); - // get tag schema - auto* vschema = results[0].get_vertex_schema(); - DCHECK(vschema != nullptr); - auto tagIter = vschema->find(tagId_); - DCHECK(tagIter != vschema->end()); - auto tagProvider = std::make_shared(tagIter->second); - for (auto& vdata : results[0].vertices) { - auto iter = std::find_if(vdata.tag_data.begin(), vdata.tag_data.end(), - [this] (const auto& tagData) { - return tagData.tag_id == tagId_; - }); - if (iter == vdata.tag_data.end()) { - return false; - } - auto tagReader = RowReaderWrapper::getRowReader(iter->data, tagProvider); - auto ret = RowReader::getPropByName(tagReader.get(), propName_); - CHECK(ok(ret)); - nextId = boost::get(value(ret)); - } -#endif + const auto& results = resp.responses(); + DCHECK_EQ(results.size(), 1UL); + auto kvs = results[0].get_key_values(); + auto iter = kvs.find(nextId); + if (iter == kvs.end()) { + LOG(ERROR) << "Value of " << nextId << " not found"; + return false; + } + nextId = iter->second; count++; } // after go to next node for width * height times, it should go back to @@ -267,15 +190,13 @@ class IntegrityTest { } private: - std::unique_ptr client_; + std::unique_ptr client_; std::unique_ptr mClient_; std::shared_ptr threadPool_; GraphSpaceID spaceId_; - TagID tagId_; - std::string propName_; size_t width_; size_t height_; - VertexID firstVertexId_; + std::string firstKey_; }; } // namespace storage diff --git a/src/tools/storage-perf/StoragePerfTool.cpp b/src/tools/storage-perf/StoragePerfTool.cpp index b7336097245..dbb2c9ce7ef 100644 --- a/src/tools/storage-perf/StoragePerfTool.cpp +++ b/src/tools/storage-perf/StoragePerfTool.cpp @@ -8,7 +8,7 @@ #include #include -#include "clients/storage/GraphStorageClient.h" +#include "clients/storage/StorageClient.h" #include "common/base/Base.h" #include "common/thread/GenericWorker.h" #include "common/time/Duration.h" @@ -130,7 +130,7 @@ class Perf { edgeProps_.emplace_back(edgeSchema->getFieldName(i)); } - graphStorageClient_ = std::make_unique(threadPool_, mClient_.get()); + storageClient_ = std::make_unique(threadPool_, mClient_.get()); time::Duration duration; std::vector threads; @@ -297,8 +297,8 @@ class Perf { for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0, false); - graphStorageClient_ + StorageClient::CommonRequestParam param(spaceId_, 0, 0, false); + storageClient_ ->getNeighbors(param, colNames, vertices, @@ -332,8 +332,8 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); - graphStorageClient_->addVertices(param, genVertices(), tagProps_, true) + StorageClient::CommonRequestParam param(spaceId_, 0, 0); + storageClient_->addVertices(param, genVertices(), tagProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -361,8 +361,8 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); - graphStorageClient_->addEdges(param, genEdges(), edgeProps_, true) + StorageClient::CommonRequestParam param(spaceId_, 0, 0); + storageClient_->addEdges(param, genEdges(), edgeProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -394,8 +394,8 @@ class Perf { input.emplace_back(std::move(row)); auto vProps = vertexProps(); auto start = time::WallClock::fastNowInMicroSec(); - GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); - graphStorageClient_->getProps(param, std::move(input), &vProps, nullptr, nullptr) + StorageClient::CommonRequestParam param(spaceId_, 0, 0); + storageClient_->getProps(param, std::move(input), &vProps, nullptr, nullptr) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -419,8 +419,8 @@ class Perf { input.emplace_back(std::move(row)); auto eProps = edgeProps(); auto start = time::WallClock::fastNowInMicroSec(); - GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); - graphStorageClient_->getProps(param, std::move(input), nullptr, &eProps, nullptr) + StorageClient::CommonRequestParam param(spaceId_, 0, 0); + storageClient_->getProps(param, std::move(input), nullptr, &eProps, nullptr) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -438,7 +438,7 @@ class Perf { private: std::atomic_long finishedRequests_{0}; - std::unique_ptr graphStorageClient_; + std::unique_ptr storageClient_; std::unique_ptr mClient_; std::shared_ptr threadPool_; GraphSpaceID spaceId_;