Skip to content

Commit

Permalink
fix(interactive): Support Intersect with PathExpand in GIE Runtime (#…
Browse files Browse the repository at this point in the history
…3600)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

As titled. 
Support `Intersect` with `PathExpand` operator in GIE Runtime, to be
compatible with the new compilation output.

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

#3685
  • Loading branch information
BingqingLyu committed Apr 1, 2024
1 parent 0b8e42f commit 2904c64
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 58 deletions.
7 changes: 7 additions & 0 deletions interactive_engine/executor/ir/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,13 @@ impl From<physical_pb::Scan> for physical_pb::PhysicalOpr {
}
}

impl From<physical_pb::PathExpand> for physical_pb::PhysicalOpr {
fn from(path: physical_pb::PathExpand) -> Self {
let op_kind = physical_pb::physical_opr::operator::OpKind::Path(path);
op_kind.into()
}
}

impl From<pb::Project> for physical_pb::Project {
fn from(project: pb::Project) -> Self {
let mappings = project
Expand Down
173 changes: 172 additions & 1 deletion interactive_engine/executor/ir/integrated/tests/expand_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ mod test {
use graph_store::ldbc::LDBCVertexParser;
use graph_store::prelude::DefaultId;
use ir_common::expr_parse::str_to_expr_pb;
use ir_common::generated::physical as pb;
use ir_common::generated::{algebra as algebra_pb, physical as pb};
use ir_common::KeyId;
use ir_physical_client::physical_builder::{JobBuilder, PlanBuilder};
use pegasus::api::{Map, Sink};
use pegasus::result::ResultStream;
use pegasus::JobConf;
use pegasus_common::downcast::AsAny;
use pegasus_server::JobRequest;
use runtime::process::entry::Entry;
use runtime::process::operator::flatmap::FlatMapFuncGen;
use runtime::process::operator::map::{FilterMapFuncGen, IntersectionEntry};
Expand Down Expand Up @@ -1075,4 +1077,173 @@ mod test {
result_ids.sort();
assert_eq!(result_ids, expected_ids)
}

// marko (A) -> lop (B); marko (A) <-(path: range)-> (C); lop (B) <- (C)
fn init_intersect_with_path_job_request(lower: i32, upper: i32) -> JobRequest {
// marko (A)
let source_opr = algebra_pb::Scan {
scan_opt: 0,
alias: Some(TAG_A.into()),
params: None,
idx_predicate: Some(vec![1].into()),
is_count_only: false,
meta_data: None,
};

// marko (A) -> lop (B);
let expand_opr1 = algebra_pb::EdgeExpand {
v_tag: Some(TAG_A.into()),
direction: 0, // out
params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)),
expand_opt: 0,
alias: Some(TAG_B.into()),
meta_data: None,
};

// marko (A) -> (C): path expand C;
let expand_opr2 = algebra_pb::EdgeExpand {
v_tag: None,
direction: 2, // both
params: Some(query_params(vec![], vec![], None)),
expand_opt: 0,
alias: None,
meta_data: None,
};
let path_opr = algebra_pb::PathExpand {
alias: None,
base: Some(algebra_pb::path_expand::ExpandBase {
edge_expand: Some(expand_opr2.clone()),
get_v: None,
}),
start_tag: Some(TAG_A.into()),
hop_range: Some(algebra_pb::Range { lower, upper }),
path_opt: 1, // simple
result_opt: 0, // endv
condition: None,
};

let end_v = algebra_pb::GetV {
tag: None,
opt: 1, // EndV
params: Some(query_params(vec![], vec![], None)),
alias: Some(TAG_C.into()),
meta_data: None,
};

// lop (B) <- josh (C): expand C and intersect on C;
let expand_opr3 = algebra_pb::EdgeExpand {
v_tag: Some(TAG_B.into()),
direction: 1, // in
params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)),
expand_opt: 0,
alias: Some(TAG_C.into()),
meta_data: None,
};
let unfold_opr =
algebra_pb::Unfold { tag: Some(TAG_C.into()), alias: Some(TAG_C.into()), meta_data: None };

let mut job_builder = JobBuilder::default();
job_builder.add_scan_source(source_opr.clone());
job_builder.shuffle(None);
job_builder.edge_expand(expand_opr1.clone());
let mut plan_builder_1 = PlanBuilder::new(1);
plan_builder_1.shuffle(None);
plan_builder_1.path_expand(path_opr);
plan_builder_1.get_v(end_v);
let mut plan_builder_2 = PlanBuilder::new(2);
plan_builder_2.shuffle(None);
plan_builder_2.edge_expand(expand_opr3.clone());
job_builder.intersect(vec![plan_builder_1, plan_builder_2], TAG_C.into());
job_builder.unfold(unfold_opr.clone());
job_builder.sink(default_sink_pb());
job_builder.build().unwrap()
}

#[test]
fn expand_one_hop_path_and_intersect_test() {
initialize();
// intersect with a one-hop path
let request_1 = init_intersect_with_path_job_request(1, 2);
let mut results = submit_query(request_1, 1);
let mut result_collection = vec![];
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let mut expected_result_ids = vec![v4];
while let Some(result) = results.next() {
match result {
Ok(res) => {
let record = parse_result(res).unwrap();
if let Some(vertex) = record.get(None).unwrap().as_vertex() {
result_collection.push(vertex.id() as DefaultId);
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}

result_collection.sort();
expected_result_ids.sort();
assert_eq!(result_collection, expected_result_ids);
}

#[test]
fn expand_two_hop_path_and_intersect_test() {
initialize();
let request_2 = init_intersect_with_path_job_request(2, 3);

let mut results = submit_query(request_2, 1);
let mut result_collection = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_result_ids = vec![v1, v1, v1, v4, v6];
while let Some(result) = results.next() {
match result {
Ok(res) => {
let record = parse_result(res).unwrap();
if let Some(vertex) = record.get(None).unwrap().as_vertex() {
result_collection.push(vertex.id() as DefaultId);
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}

result_collection.sort();
expected_result_ids.sort();
assert_eq!(result_collection, expected_result_ids);
}

#[test]
fn expand_multi_hop_path_and_intersect_test() {
initialize();
let request = init_intersect_with_path_job_request(1, 3);

let mut results = submit_query(request, 1);
let mut result_collection = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_result_ids = vec![v1, v1, v1, v4, v4, v6];
while let Some(result) = results.next() {
match result {
Ok(res) => {
let record = parse_result(res).unwrap();
if let Some(vertex) = record.get(None).unwrap().as_vertex() {
result_collection.push(vertex.id() as DefaultId);
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}

result_collection.sort();
expected_result_ids.sort();
assert_eq!(result_collection, expected_result_ids);
}
}
Loading

0 comments on commit 2904c64

Please sign in to comment.