Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sample go #2853

Merged
merged 44 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cc1293f
Add sample executor.
Shylock-Hg Sep 9, 2021
9023f13
Merge branch 'master' into feature/sample-executor
Shylock-Hg Sep 10, 2021
8ae0630
Correct the node name.
Shylock-Hg Sep 13, 2021
170c250
Support sample/limit clause for go sentence.
Shylock-Hg Sep 13, 2021
0e83421
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 13, 2021
bb75185
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 14, 2021
f9fc464
Merge branch 'master' into feature/sample-executor
Shylock-Hg Sep 14, 2021
2fccbd0
Sample the result of GetNeighbors
Shylock-Hg Sep 14, 2021
1ae0c03
Merge branch 'feature/sample-executor' into feature/sample-go
Shylock-Hg Sep 15, 2021
a7aec1e
Merge branch 'master' of https://github.com/vesoft-inc/nebula into fe…
Shylock-Hg Sep 15, 2021
3e1fb53
Sample/Limit the GetNeighbors directly.
Shylock-Hg Sep 16, 2021
557c946
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 16, 2021
85d625a
Change limit to expression.
Shylock-Hg Sep 16, 2021
1908e8a
Merge branch 'master' into feature/limit-expr
Shylock-Hg Sep 16, 2021
43a114f
Merge branch 'master' into feature/limit-expr
Shylock-Hg Sep 16, 2021
992f63b
Use method.
Shylock-Hg Sep 16, 2021
d939c46
Merge branch 'feature/limit-expr' of github.com:Shylock-Hg/nebula int…
Shylock-Hg Sep 16, 2021
eb611d3
Merge branch 'master' into feature/sample-executor
Shylock-Hg Sep 17, 2021
22323fb
Change sample number to expression.
Shylock-Hg Sep 17, 2021
09aa4b6
Merge branch 'feature/sample-executor' into feature/sample-go
Shylock-Hg Sep 17, 2021
9663a30
Merge branch 'feature/limit-expr' into feature/sample-go
Shylock-Hg Sep 17, 2021
12b23eb
Add setter getter.
Shylock-Hg Sep 17, 2021
bf81cac
Merge branch 'feature/sample-executor' into feature/sample-go
Shylock-Hg Sep 17, 2021
a416118
Merge branch 'master' of https://github.com/vesoft-inc/nebula into fe…
Shylock-Hg Sep 17, 2021
0f74fe7
Eval with context in runtime.
Shylock-Hg Sep 17, 2021
65158e1
Eval with context in runtime.
Shylock-Hg Sep 17, 2021
cdc1003
Merge branch 'feature/sample-executor' into feature/sample-go
Shylock-Hg Sep 17, 2021
9449260
Merge branch 'feature/limit-expr' into feature/sample-go
Shylock-Hg Sep 18, 2021
bd54397
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 22, 2021
7afb71e
Chang expr to typed.
Shylock-Hg Sep 22, 2021
be05c6b
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 22, 2021
255a3ac
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 24, 2021
1676b15
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 24, 2021
fef57da
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 26, 2021
0be4a5b
Add the case with properties.
Shylock-Hg Sep 26, 2021
326ccd9
Add input test case.
Shylock-Hg Sep 26, 2021
654dac1
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 26, 2021
ff66206
Remove the nondetermined result.
Shylock-Hg Sep 27, 2021
69d0277
Merge branch 'master' into feature/sample-go
Shylock-Hg Sep 27, 2021
93a9bdd
Merge branch 'master' into feature/sample-go
nevermore3 Sep 27, 2021
9de4f50
Remove unused include.
Shylock-Hg Sep 27, 2021
8b5812e
Merge branch 'feature/sample-go' of github.com:Shylock-Hg/nebula into…
Shylock-Hg Sep 27, 2021
eed0664
Remove out of date comment.
Shylock-Hg Sep 28, 2021
1cd43ba
Add some cases.
Shylock-Hg Sep 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,25 @@ void GetNeighborsIter::erase() {
next();
}

void GetNeighborsIter::sample(const int64_t count) {
algorithm::ReservoirSampling<std::tuple<List*, List>> sampler_(count);
doReset(0);
for (; valid(); next()) {
// <current List of Edge, value of Edge>
std::tuple<List*, List> t =
std::make_tuple(const_cast<List*>(currentCol_), std::move(*currentEdge_));
sampler_.sampling(std::move(t));
}
doReset(0);
clearEdges();
auto samples = std::move(sampler_).samples();
for (auto& sample : samples) {
auto* col = std::get<0>(sample);
col->emplace_back(std::move(std::get<1>(sample)));
}
doReset(0);
}

const Value& GetNeighborsIter::getColumn(const std::string& col) const {
if (!valid()) {
return Value::kNullValue;
Expand Down Expand Up @@ -454,6 +473,51 @@ List GetNeighborsIter::getEdges() {
return edges;
}

void GetNeighborsIter::nextCol() {
if (!valid()) {
return;
}
DCHECK(!noEdge_);

// go to next column
while (++colIdx_) {
if (colIdx_ < currentDs_->colUpperBound) {
const auto& currentCol = currentRow_->operator[](colIdx_);
if (!currentCol.isList() || currentCol.getList().empty()) {
continue;
}

currentCol_ = &currentCol.getList();
// edgeIdxUpperBound_ = currentCol_->size();
// edgeIdx_ = -1;
break;
}
// go to next row
if (++currentRow_ < rowsUpperBound_) {
colIdx_ = currentDs_->colLowerBound;
continue;
}

// go to next dataset
if (++currentDs_ < dsIndices_.end()) {
colIdx_ = currentDs_->colLowerBound;
currentRow_ = currentDs_->ds->begin();
rowsUpperBound_ = currentDs_->ds->end();
continue;
}
break;
}
if (currentDs_ == dsIndices_.end()) {
return;
}
}

void GetNeighborsIter::clearEdges() {
for (; colValid(); nextCol()) {
const_cast<List*>(currentCol_)->clear();
}
}

SequentialIter::SequentialIter(std::shared_ptr<Value> value) : Iterator(value, Kind::kSequential) {
DCHECK(value->isDataSet());
auto& ds = value->mutableDataSet();
Expand Down
63 changes: 60 additions & 3 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <boost/dynamic_bitset.hpp>

#include "common/algorithm/ReservoirSampling.h"
#include "common/datatypes/DataSet.h"
#include "common/datatypes/List.h"
#include "common/datatypes/Value.h"
Expand Down Expand Up @@ -60,6 +61,12 @@ class Iterator {
// Warning this will break the origin order of elements!
virtual void unstableErase() = 0;

// remain the select data in range
virtual void select(std::size_t offset, std::size_t count) = 0;

// Sample the elements
virtual void sample(int64_t count) = 0;

virtual const Row* row() const = 0;

// erase range, no include last position, if last > size(), erase to the end
Expand Down Expand Up @@ -142,6 +149,12 @@ class DefaultIter final : public Iterator {

void eraseRange(size_t, size_t) override { return; }

void select(std::size_t, std::size_t) override {
DLOG(FATAL) << "Unimplemented method for default iterator.";
}

void sample(int64_t) override { DLOG(FATAL) << "Unimplemented default iterator."; }

void clear() override { reset(); }

size_t size() const override { return 1; }
Expand Down Expand Up @@ -193,12 +206,27 @@ class GetNeighborsIter final : public Iterator {

void unstableErase() override { erase(); }

// erase [first, last)
void eraseRange(size_t first, size_t last) override {
UNUSED(first);
UNUSED(last);
DCHECK(false);
for (std::size_t i = 0; valid() && i < last; next(), ++i) {
if (i >= first || i < last) {
erase();
}
}
doReset(0);
}

void select(std::size_t offset, std::size_t count) override {
for (std::size_t i = 0; valid(); next(), ++i) {
if (i < offset || i > (offset + count - 1)) {
erase();
}
}
doReset(0);
}

void sample(int64_t count) override;

size_t size() const override { return 0; }

const Value& getColumn(const std::string& col) const override;
Expand Down Expand Up @@ -236,6 +264,13 @@ class GetNeighborsIter final : public Iterator {
return currentDs_->tagEdgeNameIndices.find(colIdx_)->second;
}

bool colValid() { return !noEdge_ && valid(); }

// move to next List of Edge data
void nextCol();

void clearEdges();

struct PropIndex {
size_t colIdx;
std::vector<std::string> propList;
Expand Down Expand Up @@ -320,6 +355,28 @@ class SequentialIter : public Iterator {

void eraseRange(size_t first, size_t last) override;

void select(std::size_t offset, std::size_t count) override {
auto size = this->size();
if (size <= static_cast<size_t>(offset)) {
clear();
} else if (size > static_cast<size_t>(offset + count)) {
eraseRange(0, offset);
eraseRange(count, size - offset);
} else if (size > static_cast<size_t>(offset) && size <= static_cast<size_t>(offset + count)) {
eraseRange(0, offset);
}
}

void sample(int64_t count) override {
DCHECK_GE(count, 0);
algorithm::ReservoirSampling<Row> sampler(count);
for (auto& row : *rows_) {
sampler.sampling(std::move(row));
}
*rows_ = std::move(sampler).samples();
iter_ = rows_->begin();
}

void clear() override {
rows_->clear();
reset();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ struct GoContext final : AstContext {
bool distinct{false};
// true: sample, false: limit
bool random{false};
// step limit value
std::vector<int64_t> limits;
std::vector<std::string> colNames;

std::string vidsVar;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ nebula_add_library(
query/GetVerticesExecutor.cpp
query/IntersectExecutor.cpp
query/LimitExecutor.cpp
query/SampleExecutor.cpp
query/MinusExecutor.cpp
query/ProjectExecutor.cpp
query/UnwindExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
#include "graph/executor/query/LimitExecutor.h"
#include "graph/executor/query/MinusExecutor.h"
#include "graph/executor/query/ProjectExecutor.h"
#include "graph/executor/query/SampleExecutor.h"
#include "graph/executor/query/SortExecutor.h"
#include "graph/executor/query/TopNExecutor.h"
#include "graph/executor/query/UnionAllVersionVarExecutor.h"
Expand Down Expand Up @@ -178,6 +179,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kLimit: {
return pool->add(new LimitExecutor(node, qctx));
}
case PlanNode::Kind::kSample: {
return pool->add(new SampleExecutor(node, qctx));
}
case PlanNode::Kind::kProject: {
return pool->add(new ProjectExecutor(node, qctx));
}
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ folly::Future<Status> GetNeighborsExecutor::execute() {

time::Duration getNbrTime;
GraphStorageClient* storageClient = qctx_->getStorageClient();
QueryExpressionContext qec(qctx()->ectx());
return storageClient
->getNeighbors(gn_->space(),
qctx()->rctx()->session()->id(),
Expand All @@ -57,7 +58,7 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
gn_->dedup(),
gn_->random(),
gn_->orderBy(),
gn_->limit(),
gn_->limit(qec),
gn_->filter())
.via(runner())
.ensure([this, getNbrTime]() {
Expand Down
13 changes: 3 additions & 10 deletions src/graph/executor/query/LimitExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,9 @@ folly::Future<Status> LimitExecutor::execute() {
ResultBuilder builder;
builder.value(result.valuePtr());
auto offset = limit->offset();
auto count = limit->count();
auto size = iter->size();
if (size <= static_cast<size_t>(offset)) {
iter->clear();
} else if (size > static_cast<size_t>(offset + count)) {
iter->eraseRange(0, offset);
iter->eraseRange(count, size - offset);
} else if (size > static_cast<size_t>(offset) && size <= static_cast<size_t>(offset + count)) {
iter->eraseRange(0, offset);
}
QueryExpressionContext qec(ectx_);
auto count = limit->count(qec);
iter->select(offset, count);
builder.iter(std::move(result).iter());
return finish(builder.build());
}
Expand Down
36 changes: 36 additions & 0 deletions src/graph/executor/query/SampleExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "graph/executor/query/SampleExecutor.h"

#include "graph/planner/plan/Query.h"
#include "graph/util/ScopedTimer.h"

namespace nebula {
namespace graph {

folly::Future<Status> SampleExecutor::execute() {
SCOPED_TIMER(&execTime_);

auto* sample = asNode<Sample>(node());
Result result = ectx_->getResult(sample->inputVar());
auto* iter = result.iterRef();
DCHECK_NE(iter->kind(), Iterator::Kind::kDefault);
ResultBuilder builder;
builder.value(result.valuePtr());
QueryExpressionContext qec(ectx_);
auto count = sample->count(qec);
if (iter->kind() == Iterator::Kind::kGetNeighbors ||
iter->size() > static_cast<std::size_t>(count)) {
// Sampling
iter->sample(count);
}
builder.iter(std::move(result).iter());
return finish(builder.build());
}

} // namespace graph
} // namespace nebula
23 changes: 23 additions & 0 deletions src/graph/executor/query/SampleExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#pragma once

#include "graph/executor/Executor.h"

namespace nebula {
namespace graph {

class SampleExecutor final : public Executor {
public:
SampleExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("SampleExecutor", node, qctx) {}

folly::Future<Status> execute() override;
};

} // namespace graph
} // namespace nebula
1 change: 1 addition & 0 deletions src/graph/executor/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ nebula_add_test(
FilterTest.cpp
DedupTest.cpp
LimitTest.cpp
SampleTest.cpp
SortTest.cpp
TopNTest.cpp
AggregateTest.cpp
Expand Down
Loading