diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index ea646339e706..c718adb83ab8 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -834,6 +834,13 @@ impl From for physical_pb::PhysicalOpr { } } +impl From for physical_pb::PhysicalOpr { + fn from(path: physical_pb::PathExpand) -> Self { + let op_kind = physical_pb::physical_opr::operator::OpKind::Path(path); + op_kind.into() + } +} + impl From for physical_pb::Project { fn from(project: pb::Project) -> Self { let mappings = project diff --git a/interactive_engine/executor/ir/integrated/tests/expand_test.rs b/interactive_engine/executor/ir/integrated/tests/expand_test.rs index 7c71e9b03f1f..e90257820bc5 100644 --- a/interactive_engine/executor/ir/integrated/tests/expand_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/expand_test.rs @@ -27,12 +27,14 @@ mod test { use graph_store::ldbc::LDBCVertexParser; use graph_store::prelude::DefaultId; use ir_common::expr_parse::str_to_expr_pb; - use ir_common::generated::physical as pb; + use ir_common::generated::{algebra as algebra_pb, physical as pb}; use ir_common::KeyId; + use ir_physical_client::physical_builder::{JobBuilder, PlanBuilder}; use pegasus::api::{Map, Sink}; use pegasus::result::ResultStream; use pegasus::JobConf; use pegasus_common::downcast::AsAny; + use pegasus_server::JobRequest; use runtime::process::entry::Entry; use runtime::process::operator::flatmap::FlatMapFuncGen; use runtime::process::operator::map::{FilterMapFuncGen, IntersectionEntry}; @@ -1075,4 +1077,173 @@ mod test { result_ids.sort(); assert_eq!(result_ids, expected_ids) } + + // marko (A) -> lop (B); marko (A) <-(path: range)-> (C); lop (B) <- (C) + fn init_intersect_with_path_job_request(lower: i32, upper: i32) -> JobRequest { + // marko (A) + let source_opr = algebra_pb::Scan { + scan_opt: 0, + alias: Some(TAG_A.into()), + params: None, + idx_predicate: Some(vec![1].into()), + is_count_only: false, + meta_data: None, + }; + + // marko (A) -> lop (B); + let expand_opr1 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), + expand_opt: 0, + alias: Some(TAG_B.into()), + meta_data: None, + }; + + // marko (A) -> (C): path expand C; + let expand_opr2 = algebra_pb::EdgeExpand { + v_tag: None, + direction: 2, // both + params: Some(query_params(vec![], vec![], None)), + expand_opt: 0, + alias: None, + meta_data: None, + }; + let path_opr = algebra_pb::PathExpand { + alias: None, + base: Some(algebra_pb::path_expand::ExpandBase { + edge_expand: Some(expand_opr2.clone()), + get_v: None, + }), + start_tag: Some(TAG_A.into()), + hop_range: Some(algebra_pb::Range { lower, upper }), + path_opt: 1, // simple + result_opt: 0, // endv + condition: None, + }; + + let end_v = algebra_pb::GetV { + tag: None, + opt: 1, // EndV + params: Some(query_params(vec![], vec![], None)), + alias: Some(TAG_C.into()), + meta_data: None, + }; + + // lop (B) <- josh (C): expand C and intersect on C; + let expand_opr3 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_B.into()), + direction: 1, // in + params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), + expand_opt: 0, + alias: Some(TAG_C.into()), + meta_data: None, + }; + let unfold_opr = + algebra_pb::Unfold { tag: Some(TAG_C.into()), alias: Some(TAG_C.into()), meta_data: None }; + + let mut job_builder = JobBuilder::default(); + job_builder.add_scan_source(source_opr.clone()); + job_builder.shuffle(None); + job_builder.edge_expand(expand_opr1.clone()); + let mut plan_builder_1 = PlanBuilder::new(1); + plan_builder_1.shuffle(None); + plan_builder_1.path_expand(path_opr); + plan_builder_1.get_v(end_v); + let mut plan_builder_2 = PlanBuilder::new(2); + plan_builder_2.shuffle(None); + plan_builder_2.edge_expand(expand_opr3.clone()); + job_builder.intersect(vec![plan_builder_1, plan_builder_2], TAG_C.into()); + job_builder.unfold(unfold_opr.clone()); + job_builder.sink(default_sink_pb()); + job_builder.build().unwrap() + } + + #[test] + fn expand_one_hop_path_and_intersect_test() { + initialize(); + // intersect with a one-hop path + let request_1 = init_intersect_with_path_job_request(1, 2); + let mut results = submit_query(request_1, 1); + let mut result_collection = vec![]; + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let mut expected_result_ids = vec![v4]; + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(None).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + result_collection.sort(); + expected_result_ids.sort(); + assert_eq!(result_collection, expected_result_ids); + } + + #[test] + fn expand_two_hop_path_and_intersect_test() { + initialize(); + let request_2 = init_intersect_with_path_job_request(2, 3); + + let mut results = submit_query(request_2, 1); + let mut result_collection = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let mut expected_result_ids = vec![v1, v1, v1, v4, v6]; + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(None).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + result_collection.sort(); + expected_result_ids.sort(); + assert_eq!(result_collection, expected_result_ids); + } + + #[test] + fn expand_multi_hop_path_and_intersect_test() { + initialize(); + let request = init_intersect_with_path_job_request(1, 3); + + let mut results = submit_query(request, 1); + let mut result_collection = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let mut expected_result_ids = vec![v1, v1, v1, v4, v4, v6]; + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(None).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + result_collection.sort(); + expected_result_ids.sort(); + assert_eq!(result_collection, expected_result_ids); + } } diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index 0500ce7947e4..435e58ff1cf5 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -192,7 +192,7 @@ impl IRJobAssembly { ) -> Result, BuildJobError> { let mut prev_op_kind = pb::physical_opr::operator::OpKind::Root(pb::Root {}); for op in &plan[..] { - let op_kind = op.try_into().map_err(|e| FnGenError::from(e))?; + let op_kind = to_op_kind(op)?; match op_kind { OpKind::Repartition(repartition) => { let repartition_strategy = repartition.strategy.as_ref().ok_or_else(|| { @@ -453,14 +453,19 @@ impl IRJobAssembly { OpKind::Intersect(intersect) => { // The intersect op can be: // 1) EdgeExpand with Opt = ExpandV, which is to expand and intersect on id-only vertices; - // 2) EdgeExpand with Opt = ExpandE, which is to expand and intersect on edges (not supported yet); + // 2) EdgeExpand (ExpandE) + GetV(Adj), which is to expand and intersect on vertices. + // In this case, EdgeExpand and GetV are not fused, usually with alias in EdgeExpand; Not supported yet; + // 3) EdgeExpand with Opt = ExpandE, which is to expand and intersect on edges + // (not supported yet, and it won't happen in current plan); + // 4) PathExpand + GetV(EndV), which is to expand paths and intersect on the end vertices. // Specifically, // 1) if we want to expand and intersect on vertices, while there are some further filters on the intersected vertices, // this would be translated into Intersect(EdgeExpand(V), EdgeExpand(V)) + Unfold + Select in physical plan for now. // 2) on distributed graph database, the intersect op exists together with the `Repartition` op in subplans. let mut intersected_expands = vec![]; + let mut pre_expands = vec![]; for mut subplan in intersect.sub_plans { - if subplan.plan.len() > 2 { + if subplan.plan.len() > 3 { Err(FnGenError::unsupported_error(&format!( "subplan in pb::Intersect::plan {:?}", subplan, @@ -471,16 +476,12 @@ impl IRJobAssembly { "subplan in pb::Intersect::plan".to_string(), )) })?; - let last_op_kind = last_op - .try_into() - .map_err(|e| FnGenError::from(e))?; + let last_op_kind = to_op_kind(&last_op)?; match last_op_kind { OpKind::Edge(expand) => { // the case of expand id-only vertex let repartition = if let Some(prev) = subplan.plan.last() { - if let OpKind::Repartition(edge_expand_repartition) = prev - .try_into() - .map_err(|e| FnGenError::from(e))? + if let OpKind::Repartition(edge_expand_repartition) = to_op_kind(prev)? { subplan.plan.pop(); Some(edge_expand_repartition) @@ -495,12 +496,126 @@ impl IRJobAssembly { }; intersected_expands.push((repartition, expand)); } + OpKind::Vertex(mut end_v) => { + let prev_opr = subplan.plan.pop().ok_or_else(|| { + FnGenError::unsupported_error(&format!( + "subplan with only getV in pb::Intersect::plan {:?}", + end_v, + )) + })?; + let prev_opr_kind = to_op_kind(&prev_opr)?; + match prev_opr_kind { + OpKind::Path(mut path_expand) => { + // the case of PathExpand + EndV + if end_v.opt != pb::get_v::VOpt::End as i32 { + Err(FnGenError::unsupported_error(&format!( + "Subplan in Intersection {:?}", + subplan, + )))? + } + let repartition = if let Some(prev) = subplan.plan.last() { + if let OpKind::Repartition(path_expand_repartition) = + to_op_kind(prev)? + { + subplan.plan.pop(); + Some(path_expand_repartition) + } else { + Err(FnGenError::unsupported_error(&format!( + "subplan in pb::Intersect::plan {:?}", + subplan, + )))? + } + } else { + None + }; + // the case of expand paths and intersect on the end vertices + // Process path_expand as follows: + // 1. If path_expand range from 0, it is unsupported; + // 2. If it is path_expand(1,2), optimized as edge_expand; + // 3. Otherwise, translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand, + // and the last edge_expand is the one to intersect. + // Notice that if we have predicates for vertices in path_expand, or for the last vertex of path_expand, + // do the filtering after intersection. + // TODO: there might be a bug here: + // if path_expand has an alias which indicates that the path would be referred later, it may not as expected. + let path_expand_base = + path_expand.base.as_ref().ok_or_else(|| { + FnGenError::ParseError( + "PathExpand::base in Pattern is empty".into(), + ) + })?; + let base_edge_expand = path_expand_base + .edge_expand + .as_ref() + .ok_or_else(|| { + FnGenError::ParseError( + "PathExpand::base::edge_expand is empty".into(), + ) + })?; + // current only support expand_opt = ExpandV + if base_edge_expand.expand_opt + != pb::edge_expand::ExpandOpt::Vertex as i32 + { + Err(FnGenError::unsupported_error(&format!( + "PathExpand in Intersection with expand {:?}", + base_edge_expand + )))? + } + // pick the last edge expand out from the path expand + let hop_range = + path_expand.hop_range.as_mut().ok_or_else(|| { + FnGenError::ParseError( + "pb::PathExpand::hop_range is empty".into(), + ) + })?; + if hop_range.lower < 1 { + Err(FnGenError::unsupported_error(&format!( + "PathExpand in Intersection with lower range of {:?}", + hop_range.lower + )))? + } + if hop_range.lower == 1 && hop_range.upper == 2 { + // optimized Path(1..2) to as EdgeExpand + let mut edge_expand = base_edge_expand.clone(); + edge_expand.v_tag = path_expand.start_tag; + edge_expand.alias = end_v.alias; + intersected_expands.push((repartition, edge_expand)); + } else { + // translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand, + let mut edge_expand = base_edge_expand.clone(); + edge_expand.v_tag = None; + // edge expand should carry endv's alias, which is the intersect key. + edge_expand.alias = end_v.alias.clone(); + end_v.alias.take(); + hop_range.lower -= 1; + hop_range.upper -= 1; + // pre expand path_expand(l-1, h-1) + if let Some(repartition) = repartition.clone() { + pre_expands.push(repartition.into()); + } + pre_expands.push(path_expand.into()); + pre_expands.push(end_v.into()); + // and then expand and intersect on the last edge_expand + intersected_expands.push((repartition, edge_expand)); + } + } + _ => Err(FnGenError::unsupported_error(&format!( + "Subplan in Intersection to intersect: {:?}", + subplan + )))?, + } + } + _ => Err(FnGenError::unsupported_error(&format!( "Opr in Intersection to intersect: {:?}", last_op_kind )))?, } } + // pre-expanding for the path_expand case + if !pre_expands.is_empty() { + stream = self.install(stream, &pre_expands)?; + } // intersect of edge_expands for (repartition, expand_intersect_opr) in intersected_expands { if let Some(repartition) = repartition { @@ -685,7 +800,7 @@ impl IRJobAssembly { } } - prev_op_kind = op.try_into().map_err(|e| FnGenError::from(e))?; + prev_op_kind = to_op_kind(op)?; } Ok(stream) } @@ -733,3 +848,8 @@ impl JobAssembly for IRJobAssembly(binary: &[u8]) -> FnGenResult { Ok(T::decode(binary)?) } + +#[inline] +fn to_op_kind(opr: &pb::PhysicalOpr) -> FnGenResult { + Ok(opr.try_into()?) +} diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index 55a3613b5c85..c8eee6a06f12 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -73,12 +73,19 @@ impl FilterMapFunction for GetVertexOperator { Ok(None) } } else if let Some(graph_path) = entry.as_graph_path() { - // we check VOpt here: - // for `Other`, we treat it as to get_other_id() in the Edge within the Path (in which case is expanding the path with a adj vertex) - // for `End`, we treat it as to get EndV() in the Path (in which case is getting the end vertex from the path) - match self.opt { - VOpt::Other => { - let graph_path = input + // Specifically, when dealing with `GetV` on a path entry, we need to consider the following cases: + // 1. if the last entry is an edge, we expand the path with the other vertex of the edge; (in which case is expanding the path with a adj vertex) + // 2. if the last entry is a vertex, we get the vertex. (in which case is getting the end vertex from the path) + let path_end = graph_path.get_path_end(); + if let Some(path_end_edge) = path_end.as_edge() { + let label = path_end_edge.get_other_label(); + if self.contains_label(label)? { + let vertex = Vertex::new( + path_end_edge.get_other_id(), + label.cloned(), + DynDetails::default(), + ); + let mut_graph_path = input .get_mut(self.start_tag) .ok_or_else(|| { FnExecError::unexpected_data_error(&format!( @@ -91,54 +98,29 @@ impl FilterMapFunction for GetVertexOperator { .ok_or_else(|| { FnExecError::unexpected_data_error(&format!("entry is not a path in GetV")) })?; - let path_end_edge = graph_path - .get_path_end() - .as_edge() - .ok_or_else(|| { - FnExecError::unexpected_data_error(&format!( - "GetOtherVertex on a path entry with input: {:?}", - graph_path.get_path_end() - )) - })?; - let label = path_end_edge.get_other_label(); - if self.contains_label(label)? { - let vertex = Vertex::new( - path_end_edge.get_other_id(), - label.cloned(), - DynDetails::default(), - ); - if graph_path.append(vertex) { - Ok(Some(input)) - } else { - Ok(None) - } - } else { - Ok(None) - } - } - VOpt::End => { - let path_end_vertex = graph_path - .get_path_end() - .as_vertex() - .ok_or_else(|| FnExecError::unsupported_error("Get end edge on a path entry"))? - .clone(); - let label = path_end_vertex.label(); - if self.contains_label(label.as_ref())? { - input.append(path_end_vertex, self.alias.clone()); + if mut_graph_path.append(vertex) { Ok(Some(input)) } else { Ok(None) } + } else { + Ok(None) + } + } else if let Some(path_end_vertex) = path_end.as_vertex() { + let label = path_end_vertex.label(); + if self.contains_label(label.as_ref())? { + input.append(path_end_vertex.clone(), self.alias.clone()); + Ok(Some(input)) + } else { + Ok(None) } - _ => Err(FnExecError::unsupported_error(&format!( - "Wired opt in GetVertexOperator for GraphPath: {:?}", - self.opt - )))?, + } else { + Err(FnExecError::unexpected_data_error("unreachable path end entry in GetV"))? } } else { - Err(FnExecError::unexpected_data_error( - "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry", - ))? + Err(FnExecError::unexpected_data_error( &format!( + "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", entry + )))? } } else { Ok(None)