Skip to content

Commit

Permalink
Bidirectional traverse. (vesoft-inc#1740)
Browse files Browse the repository at this point in the history
* Support bidirectional traverse.

* Add basic bidirectional traverse test.

* Fix ut.

* Add ut for multi edge traverse.

* Fix clang compile error.

* Address comment.

* Adddress @dangleptr's comment.

* Address comment.

Co-authored-by: laura-ding <48548375+laura-ding@users.noreply.github.com>
  • Loading branch information
CPWstatic and laura-ding authored Feb 28, 2020
1 parent 99bab8a commit 3afb5fd
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 82 deletions.
10 changes: 10 additions & 0 deletions src/common/filter/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class ExpressionContext final {
dstTagProps_.emplace(tag, prop);
}

std::unordered_map<std::string, EdgeType>& getEdgeMap() {
return edgeMap_;
}

std::unordered_map<std::string, TagID>& getTagMap() {
return tagMap_;
}
Expand Down Expand Up @@ -81,6 +85,7 @@ class ExpressionContext final {
return false;
}
edgeMap_.emplace(alias, edgeType);
edgeAlias_.emplace_back(alias);
return true;
}

Expand All @@ -94,6 +99,10 @@ class ExpressionContext final {
return true;
}

std::vector<std::string>& getEdgeAlias() {
return edgeAlias_;
}

using PropPair = std::pair<std::string, std::string>;

std::vector<PropPair> srcTagProps() const {
Expand Down Expand Up @@ -175,6 +184,7 @@ class ExpressionContext final {
// alias => edgeType
std::unordered_map<std::string, EdgeType> edgeMap_;
std::unordered_map<std::string, TagID> tagMap_;
std::vector<std::string> edgeAlias_;
bool overAll_{false};
GraphSpaceID space_;
nebula::storage::StorageClient *storageClient_{nullptr};
Expand Down
9 changes: 7 additions & 2 deletions src/dataman/RowReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ class RowReader {
static StatusOr<VariantType> getDefaultProp(const meta::SchemaProviderIf* schema,
const std::string& prop) {
auto& vType = schema->getFieldType(prop);
return getDefaultProp(vType.type);
auto defaultVal = getDefaultProp(vType.type);
if (!defaultVal.ok()) {
LOG(ERROR) << "Get default value for `" << prop << "' failed: " << defaultVal.status();
}

return defaultVal;
}

static StatusOr<VariantType> getDefaultProp(const nebula::cpp2::SupportedType& type) {
Expand All @@ -114,7 +119,7 @@ class RowReader {
}
default:
auto msg = folly::sformat("Unknown type: {}", static_cast<int32_t>(type));
LOG(ERROR) << "Unknown type: " << msg;
LOG(ERROR) << msg;
return Status::Error(msg);
}
}
Expand Down
121 changes: 77 additions & 44 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,12 @@ Status GoExecutor::prepareOverAll() {
}

auto v = edgeStatus.value();
if (isReversely()) {
v = -v;
auto status = addToEdgeTypes(v);
if (!status.ok()) {
return status;
}

edgeTypes_.push_back(v);

if (!expCtx_->addEdge(e, v)) {
if (!expCtx_->addEdge(e, std::abs(v))) {
return Status::Error(folly::sformat("edge alias({}) was dup", e));
}
}
Expand All @@ -251,7 +250,7 @@ Status GoExecutor::prepareOver() {
return Status::Error("Over clause shall never be null");
}

isReversely_ = clause->isReversely();
direction_ = clause->direction();

auto edges = clause->edges();
for (auto e : edges) {
Expand All @@ -267,17 +266,17 @@ Status GoExecutor::prepareOver() {
}

auto v = edgeStatus.value();
if (isReversely()) {
v = -v;
status = addToEdgeTypes(v);
if (!status.ok()) {
return status;
}
edgeTypes_.push_back(v);

if (e->alias() != nullptr) {
if (!expCtx_->addEdge(*e->alias(), v)) {
if (!expCtx_->addEdge(*e->alias(), std::abs(v))) {
return Status::Error(folly::sformat("edge alias({}) was dup", *e->alias()));
}
} else {
if (!expCtx_->addEdge(*e->edge(), v)) {
if (!expCtx_->addEdge(*e->edge(), std::abs(v))) {
return Status::Error(folly::sformat("edge alias({}) was dup", *e->edge()));
}
}
Expand All @@ -286,6 +285,29 @@ Status GoExecutor::prepareOver() {
return status;
}

Status GoExecutor::addToEdgeTypes(EdgeType type) {
switch (direction_) {
case OverClause::Direction::kForward: {
edgeTypes_.push_back(type);
break;
}
case OverClause::Direction::kBackward: {
type = -type;
edgeTypes_.push_back(type);
break;
}
case OverClause::Direction::kBidirect: {
edgeTypes_.push_back(type);
type = -type;
edgeTypes_.push_back(type);
break;
}
default: {
return Status::Error("Unkown direction type: %ld", static_cast<int64_t>(direction_));
}
}
return Status::OK();
}

Status GoExecutor::prepareWhere() {
auto *clause = sentence_->whereClause();
Expand Down Expand Up @@ -442,10 +464,13 @@ void GoExecutor::stepOut() {
}
auto returns = status.value();
std::string filterPushdown = "";
if (FLAGS_filter_pushdown && isFinalStep() && !isReversely()) {
if (FLAGS_filter_pushdown && isFinalStep()
&& direction_ == OverClause::Direction::kForward) {
// TODO: not support filter pushdown in reversely traversal now.
filterPushdown = whereWrapper_->filterPushdown_;
}
VLOG(1) << "edge type size: " << edgeTypes_.size()
<< " return cols: " << returns.size();
auto future = ectx()->getStorageClient()->getNeighbors(spaceId,
starts_,
edgeTypes_,
Expand Down Expand Up @@ -547,19 +572,6 @@ void GoExecutor::onVertexProps(RpcResponse &&rpcResp) {
UNUSED(rpcResp);
}

std::vector<std::string> GoExecutor::getEdgeNames() const {
std::vector<std::string> names;
auto spaceId = ectx()->rctx()->session()->space();
for (auto edgeType : edgeTypes_) {
auto status = ectx()->schemaManager()->toEdgeName(spaceId, std::abs(edgeType));
DCHECK(status.ok());
auto edgeName = status.value();
names.emplace_back(std::move(edgeName));
}

return names;
}

StatusOr<std::vector<VertexID>> GoExecutor::getDstIdsFromResp(RpcResponse &rpcResp) const {
std::unordered_set<VertexID> set;
for (auto &resp : rpcResp.responses()) {
Expand Down Expand Up @@ -587,13 +599,8 @@ void GoExecutor::finishExecution(RpcResponse &&rpcResp) {
// MayBe we can do better.
std::vector<std::unique_ptr<YieldColumn>> yc;
if (expCtx_->isOverAllEdge() && yields_.empty()) {
auto edgeNames = getEdgeNames();
if (edgeNames.empty()) {
doError(Status::Error("get edge name failed"));
return;
}
for (const auto &name : edgeNames) {
auto dummy = new std::string(name);
for (const auto &alias : expCtx_->getEdgeAlias()) {
auto dummy = new std::string(alias);
auto dummy_exp = new EdgeDstIdExpression(dummy);
auto ptr = std::make_unique<YieldColumn>(dummy_exp);
dummy_exp->setContext(expCtx_.get());
Expand Down Expand Up @@ -721,6 +728,7 @@ StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() {
pd.name = _DST;
pd.id.set_edge_type(e);
props.emplace_back(std::move(pd));
VLOG(3) << "Need edge props: " << e << ", _dst";
}
auto spaceId = ectx()->rctx()->session()->space();
for (auto &tagProp : expCtx_->srcTagProps()) {
Expand All @@ -734,6 +742,7 @@ StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() {
auto tagId = status.value();
pd.id.set_tag_id(tagId);
props.emplace_back(std::move(pd));
VLOG(3) << "Need tag src props: " << tagProp.first << ", " << tagProp.second;
}
for (auto &prop : expCtx_->aliasProps()) {
if (prop.second == _DST) {
Expand All @@ -749,7 +758,29 @@ StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() {
return Status::Error("the edge was not found '%s'", prop.first.c_str());
}
pd.id.set_edge_type(edgeType);
props.emplace_back(std::move(pd));
switch (direction_) {
case OverClause::Direction::kForward: {
props.emplace_back(std::move(pd));
break;
}
case OverClause::Direction::kBackward: {
edgeType = -edgeType;
pd.id.set_edge_type(edgeType);
props.emplace_back(std::move(pd));
break;
}
case OverClause::Direction::kBidirect: {
props.emplace_back(pd);
edgeType = -edgeType;
pd.id.set_edge_type(edgeType);
props.emplace_back(std::move(pd));
break;
}
default:
return Status::Error(
"Unknown direction: %ld", static_cast<int64_t>(direction_));
}
VLOG(3) << "Need edge props: " << prop.first << ", " << prop.second;
}
return props;
}
Expand All @@ -770,6 +801,7 @@ StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getDstProps() {
auto tagId = status.value();
pd.id.set_tag_id(tagId);
props.emplace_back(std::move(pd));
VLOG(3) << "Need dst tag props: " << tagProp.first << ", " << tagProp.second;
}
return props;
}
Expand Down Expand Up @@ -976,9 +1008,7 @@ bool GoExecutor::processFinalResult(RpcResponse &rpcResp, Callback cb) const {
for (auto& edge : edata.edges) {
auto dstId = edge.get_dst();
Getters getters;
// In reverse mode, _dst will return the srcId.
getters.getEdgeDstId = [this,
&srcId,
&dstId,
&edgeType] (const std::string& edgeName)
-> OptVariantType {
Expand All @@ -990,13 +1020,12 @@ bool GoExecutor::processFinalResult(RpcResponse &rpcResp, Callback cb) const {
"Get edge type for `%s' failed in getters.",
edgeName.c_str());
}
if (type != edgeType) {
if (type != std::abs(edgeType)) {
return 0L;
}
}
return isReversely() ? srcId : dstId;
return dstId;
};
// In reverse mode, it is used to get the dst props.
getters.getSrcTagProp = [&spaceId,
&tagData,
&tagSchema,
Expand Down Expand Up @@ -1061,7 +1090,6 @@ bool GoExecutor::processFinalResult(RpcResponse &rpcResp, Callback cb) const {

// In reverse mode, we should handle _src
getters.getAliasProp = [&reader,
&dstId,
&srcId,
&edgeType,
&edgeSchema,
Expand All @@ -1074,16 +1102,21 @@ bool GoExecutor::processFinalResult(RpcResponse &rpcResp, Callback cb) const {
return Status::Error(
"Get edge type for `%s' failed in getters.", edgeName.c_str());
}
if (edgeType != type) {
auto sit = edgeSchema.find(type);
if (std::abs(edgeType) != type) {
auto sit = edgeSchema.find(
direction_ == OverClause::Direction::kBackward ? -type : type);
if (sit == edgeSchema.end()) {
LOG(ERROR) << "Can't find schema for " << edgeName;
return Status::Error("get schema failed");
std::string errMsg = folly::stringPrintf(
"Can't find shcema for %s when get default.",
edgeName.c_str());
LOG(ERROR) << errMsg;
return Status::Error(errMsg);
}
return RowReader::getDefaultProp(sit->second.get(), prop);
}

if (prop == _SRC) {
return isReversely() ? dstId : srcId;
return srcId;
}
DCHECK(reader != nullptr);
auto res = RowReader::getPropByName(reader.get(), prop);
Expand Down
8 changes: 3 additions & 5 deletions src/graph/GoExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class GoExecutor final : public TraverseExecutor {

Status prepareOverAll();

Status addToEdgeTypes(EdgeType type);

/**
* To check if this is the final step.
*/
Expand All @@ -76,10 +78,6 @@ class GoExecutor final : public TraverseExecutor {
return upto_;
}

bool isReversely() const {
return isReversely_;
}

/**
* To obtain the source ids from various places,
* such as the literal id list, inputs from the pipeline or results of variable.
Expand Down Expand Up @@ -216,7 +214,7 @@ class GoExecutor final : public TraverseExecutor {
uint32_t steps_{1};
uint32_t curStep_{1};
bool upto_{false};
bool isReversely_{false};
OverClause::Direction direction_{OverClause::Direction::kForward};
std::vector<EdgeType> edgeTypes_;
std::string *varname_{nullptr};
std::string *colname_{nullptr};
Expand Down
Loading

0 comments on commit 3afb5fd

Please sign in to comment.