From 5590f02f2cf512c702771a5cd4b5e44dde6974f1 Mon Sep 17 00:00:00 2001 From: Xiaoli Zhou Date: Fri, 7 Jun 2024 11:28:13 +0800 Subject: [PATCH] fix(interactive): Support Nested Map in Compiler & Runtime to Facilitate Queries like `select('a', 'b').by(valueMap())` (#3839) Co-authored-by: BingqingLyu Co-authored-by: xiaolei.zl Co-authored-by: Longbin Lai --- flex/codegen/src/hqps/hqps_project_builder.h | 14 +- flex/engines/hqps_db/core/operator/sink.h | 2 +- .../ir/runtime/proto/RexToProtoConverter.java | 25 ++- .../cypher/result/CypherRecordParser.java | 32 ++- .../gremlin/result/ParserUtils.java | 18 +- .../gremlin/resultx/GremlinRecordParser.java | 28 ++- .../executor/ir/core/src/plan/logical.rs | 9 +- .../ir/graph_proxy/src/utils/expr/eval.rs | 46 ++-- .../executor/ir/proto/expr.proto | 11 +- .../executor/ir/proto/results.proto | 5 +- .../executor/ir/runtime/src/process/entry.rs | 26 ++- .../src/process/operator/map/project.rs | 211 +++++++++++++++--- .../ir/runtime/src/process/operator/mod.rs | 2 +- .../runtime/src/process/operator/sink/sink.rs | 24 +- 14 files changed, 359 insertions(+), 94 deletions(-) diff --git a/flex/codegen/src/hqps/hqps_project_builder.h b/flex/codegen/src/hqps/hqps_project_builder.h index e4027090925e..ca15b7b5ecab 100644 --- a/flex/codegen/src/hqps/hqps_project_builder.h +++ b/flex/codegen/src/hqps/hqps_project_builder.h @@ -254,10 +254,16 @@ std::string project_key_values_to_string( auto& key_value = mappings[i]; auto& key = key_value.key(); CHECK(key.item_case() == common::Value::kStr); - auto& value = key_value.value(); - auto key_value_str = project_key_value_to_string(ctx, key.str(), value); - if (!key_value_str.empty()) { - key_value_strs.emplace_back(key_value_str); + if (key_value.has_val()) { + auto& value = key_value.val(); + auto key_value_str = project_key_value_to_string(ctx, key.str(), value); + if (!key_value_str.empty()) { + key_value_strs.emplace_back(key_value_str); + } + } else if (key_value.has_nested()) { + LOG(FATAL) << "Nested key value not supported yet"; + } else { + LOG(FATAL) << "Unknown key value type"; } } diff --git a/flex/engines/hqps_db/core/operator/sink.h b/flex/engines/hqps_db/core/operator/sink.h index 36b6816b86d8..f935cdf71756 100644 --- a/flex/engines/hqps_db/core/operator/sink.h +++ b/flex/engines/hqps_db/core/operator/sink.h @@ -234,7 +234,7 @@ void template_set_key_value(results::KeyValues* map, for (auto& kv : key_value) { auto cur_kv = map->add_key_values(); cur_kv->mutable_key()->set_str(kv.first); - auto value = cur_kv->mutable_value(); + auto value = cur_kv->mutable_element(); set_any_to_element(kv.second, value); } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java index b44d67f794f0..63e6d5696f19 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java @@ -174,15 +174,24 @@ private OuterExpression.Expression visitMapValueConstructor(RexCall call) { key instanceof RexLiteral, "key type of 'MAP_VALUE_CONSTRUCTOR' should be 'literal', but is " + key.getClass()); - Preconditions.checkArgument( - value instanceof RexGraphVariable, - "value type of 'MAP_VALUE_CONSTRUCTOR' should be 'variable', but is " - + value.getClass()); - varMapBuilder.addKeyVals( + OuterExpression.ExprOpr valueOpr = value.accept(this).getOperators(0); + OuterExpression.VariableKeyValue.Builder keyValueBuilder = OuterExpression.VariableKeyValue.newBuilder() - .setKey(key.accept(this).getOperators(0).getConst()) - .setValue(value.accept(this).getOperators(0).getVar()) - .build()); + .setKey(key.accept(this).getOperators(0).getConst()); + switch (valueOpr.getItemCase()) { + case VAR: + keyValueBuilder.setVal(valueOpr.getVar()); + break; + case MAP: + keyValueBuilder.setNested(valueOpr.getMap()); + break; + default: + throw new IllegalArgumentException( + "can not convert value [" + + value + + "] of 'MAP_VALUE_CONSTRUCTOR' to physical plan"); + } + varMapBuilder.addKeyVals(keyValueBuilder); } return OuterExpression.Expression.newBuilder() .addOperators( diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java index cfb75c3aa219..ae2b90497d86 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java @@ -24,6 +24,7 @@ import com.alibaba.graphscope.common.result.Utils; import com.alibaba.graphscope.gaia.proto.Common; import com.alibaba.graphscope.gaia.proto.IrResult; +import com.alibaba.graphscope.gremlin.exception.GremlinResultParserException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -119,6 +120,30 @@ protected AnyValue parseEntry(IrResult.Entry entry, @Nullable RelDataType dataTy } } + protected AnyValue parseElement(IrResult.KeyValues.KeyValue value, RelDataType type) { + switch (value.getValueCase()) { + case ELEMENT: + return parseElement(value.getElement(), type); + case NESTED: + if (type instanceof ArbitraryMapType) { + return parseKeyValues( + value.getNested(), + ((ArbitraryMapType) type).getKeyTypes(), + ((ArbitraryMapType) type).getValueTypes()); + } else { + return parseKeyValues( + value.getNested(), type.getKeyType(), type.getValueType()); + } + default: + throw new GremlinResultParserException( + "keyValue [" + + value + + "] has invalid value type [" + + value.getValueCase() + + "]"); + } + } + protected AnyValue parseElement(IrResult.Element element, @Nullable RelDataType dataType) { switch (element.getInnerCase()) { case VERTEX: @@ -195,9 +220,7 @@ protected AnyValue parseKeyValues( .getKeyValuesList() .forEach( entry -> { - valueMap.put( - entry.getKey().getStr(), - parseElement(entry.getValue(), valueType)); + valueMap.put(entry.getKey().getStr(), parseElement(entry, valueType)); }); return VirtualValues.fromMap(valueMap, valueMap.size(), 0); } @@ -216,8 +239,7 @@ protected AnyValue parseKeyValues( Map valueMap = Maps.newLinkedHashMap(); for (int i = 0; i < entries.size(); ++i) { IrResult.KeyValues.KeyValue entry = entries.get(i); - valueMap.put( - entry.getKey().getStr(), parseElement(entry.getValue(), valueTypes.get(i))); + valueMap.put(entry.getKey().getStr(), parseElement(entry, valueTypes.get(i))); } return VirtualValues.fromMap(valueMap, valueMap.size(), 0); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java index efa51fa1df8f..9484adaf88a1 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java @@ -217,11 +217,27 @@ public static Object parseEntry(IrResult.Entry entry) { k -> { valueMap.put( ParserUtils.parseCommonValue(k.getKey()), - ParserUtils.parseElement(k.getValue())); + ParserUtils.parseElement(k)); }); return valueMap; default: throw new GremlinResultParserException("invalid " + entry.getInnerCase().name()); } } + + public static Object parseElement(IrResult.KeyValues.KeyValue value) { + switch (value.getValueCase()) { + case ELEMENT: + return parseElement(value.getElement()); + case NESTED: + return parseEntry(IrResult.Entry.newBuilder().setMap(value.getNested()).build()); + default: + throw new GremlinResultParserException( + "keyValue [" + + value + + "] has invalid value type [" + + value.getValueCase() + + "]"); + } + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinRecordParser.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinRecordParser.java index 5068e2d7e540..fe490220539d 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinRecordParser.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinRecordParser.java @@ -161,7 +161,7 @@ private Map parseKeyValues( .getKeyValuesList() .forEach( entry -> { - Object value = parseElement(entry.getValue(), valueType); + Object value = parseElement(entry, valueType); if (value != null) { valueMap.put(parseMapKey(entry.getKey().getStr()), value); } @@ -183,7 +183,7 @@ private Map parseKeyValues( Map valueMap = Maps.newLinkedHashMap(); for (int i = 0; i < entries.size(); ++i) { IrResult.KeyValues.KeyValue entry = entries.get(i); - Object value = parseElement(entry.getValue(), valueTypes.get(i)); + Object value = parseElement(entry, valueTypes.get(i)); if (value != null) { valueMap.put(parseMapKey(entry.getKey().getStr()), value); } @@ -219,6 +219,30 @@ private Object parseElement(IrResult.Element element, RelDataType type) { } } + private Object parseElement(IrResult.KeyValues.KeyValue value, RelDataType type) { + switch (value.getValueCase()) { + case ELEMENT: + return parseElement(value.getElement(), type); + case NESTED: + if (type instanceof ArbitraryMapType) { + return parseKeyValues( + value.getNested(), + ((ArbitraryMapType) type).getKeyTypes(), + ((ArbitraryMapType) type).getValueTypes()); + } else { + return parseKeyValues( + value.getNested(), type.getKeyType(), type.getValueType()); + } + default: + throw new GremlinResultParserException( + "keyValue [" + + value + + "] has invalid value type [" + + value.getValueCase() + + "]"); + } + } + private Vertex parseVertex(IrResult.Vertex vertex, RelDataType type) { return new DetachedVertex( vertex.getId(), diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 5f21cfd40c54..e05cf343b95b 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -1110,7 +1110,14 @@ fn preprocess_expression( common_pb::expr_opr::Item::Map(key_values) => { for key_val in &mut key_values.key_vals { if let Some(value) = key_val.value.as_mut() { - preprocess_var(value, meta, plan_meta, false)?; + match value { + common_pb::variable_key_value::Value::Val(val) => { + preprocess_var(val, meta, plan_meta, false)?; + } + common_pb::variable_key_value::Value::Nested(_) => { + return Err(IrError::Unsupported("nested value in Map".to_string())); + } + } } } count = 0; diff --git a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs index da8ba4bfad87..5767d14e38ac 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs @@ -582,6 +582,32 @@ impl TryFrom for Operand { } } +impl TryFrom for Operand { + type Error = ParsePbError; + + fn try_from(key_vals: common_pb::VariableKeyValues) -> Result { + let mut vec = Vec::with_capacity(key_vals.key_vals.len()); + for key_val in key_vals.key_vals { + let (_key, _value) = (key_val.key, key_val.value); + let key = if let Some(key) = _key { + Object::try_from(key)? + } else { + return Err(ParsePbError::from("empty key provided in Map")); + }; + let value = if let Some(value) = _value { + match value { + common_pb::variable_key_value::Value::Val(val) => Operand::try_from(val)?, + common_pb::variable_key_value::Value::Nested(nested) => Operand::try_from(nested)?, + } + } else { + return Err(ParsePbError::from("empty value provided in Map")); + }; + vec.push((key, value)); + } + Ok(Self::Map(vec)) + } +} + impl TryFrom for Operand { type Error = ParsePbError; @@ -605,25 +631,7 @@ impl TryFrom for Operand { } Ok(Self::VarMap(vec)) } - - Map(key_vals) => { - let mut vec = Vec::with_capacity(key_vals.key_vals.len()); - for key_val in key_vals.key_vals { - let (_key, _value) = (key_val.key, key_val.value); - let key = if let Some(key) = _key { - Object::try_from(key)? - } else { - return Err(ParsePbError::from("empty key provided in Map")); - }; - let value = if let Some(value) = _value { - Operand::try_from(value)? - } else { - return Err(ParsePbError::from("empty value provided in Map")); - }; - vec.push((key, value)); - } - Ok(Self::Map(vec)) - } + Map(key_vals) => Operand::try_from(key_vals), _ => Err(ParsePbError::ParseError("invalid operators for an Operand".to_string())), } } else { diff --git a/interactive_engine/executor/ir/proto/expr.proto b/interactive_engine/executor/ir/proto/expr.proto index b3c9c53bd8a5..c4a5979a5b65 100644 --- a/interactive_engine/executor/ir/proto/expr.proto +++ b/interactive_engine/executor/ir/proto/expr.proto @@ -123,9 +123,16 @@ message VariableKeys { message VariableKeyValue { common.Value key = 1; - Variable value = 2; + oneof value { + Variable val = 2; + VariableKeyValues nested = 3; + } } +// A nested kv projection, which is used to project a nested structure of key-value pairs +// e.g., to project [value(a.name), value(a.age)], or project [key_value('name', a.name), key_value('age', a.age)], +// or more complex nested projection, +// e.g., to project [key_value('tagA', nested{[key_value('name', a.name), key_value('age', a.age)]}), key_value('tagB', nested{key_value(b.name), key_value(b.age)})] message VariableKeyValues { repeated VariableKeyValue key_vals = 1; } @@ -203,7 +210,7 @@ message ExprOpr { DynamicParam param = 9; Case case = 10; Extract extract = 11; - // TODO: the new definition for var_map, that allows user-given key name. Will remove the old var_map soon. + // TODO: the new definition for var_map, that allows user-given key name, and nested maps. Will remove the old var_map finally. VariableKeyValues map = 13; TimeInterval time_interval = 14; DateTimeMinus date_time_minus = 15; diff --git a/interactive_engine/executor/ir/proto/results.proto b/interactive_engine/executor/ir/proto/results.proto index bb19f8e6c261..9938fefc4792 100644 --- a/interactive_engine/executor/ir/proto/results.proto +++ b/interactive_engine/executor/ir/proto/results.proto @@ -68,7 +68,10 @@ message Collection { message KeyValues { message KeyValue { common.Value key = 1; - Element value = 2; + oneof value { + Element element = 2; + KeyValues nested = 3; + } } repeated KeyValue key_values = 1; } diff --git a/interactive_engine/executor/ir/runtime/src/process/entry.rs b/interactive_engine/executor/ir/runtime/src/process/entry.rs index a1dedfb57ab8..168c66404cd0 100644 --- a/interactive_engine/executor/ir/runtime/src/process/entry.rs +++ b/interactive_engine/executor/ir/runtime/src/process/entry.rs @@ -597,17 +597,21 @@ impl TryFrom for DynEntry { let mut map = BTreeMap::new(); for key_val in kv.key_values { let key = key_val.key.unwrap(); - let val_inner = key_val.value.unwrap().inner.unwrap(); - // currently, convert kv into Object::KV - let key_obj: Object = Object::try_from(key)?; - let val_obj: Object = match val_inner { - result_pb::element::Inner::Object(obj) => Object::try_from(obj)?, - _ => Err(ParsePbError::Unsupported(format!( - "unsupported kvs value inner {:?}", - val_inner, - )))?, - }; - map.insert(key_obj, val_obj); + let value_inner = key_val.value.unwrap(); + match value_inner { + result_pb::key_values::key_value::Value::Element(val) => { + let val_inner = val.inner.unwrap(); + let val_obj: Object = match val_inner { + result_pb::element::Inner::Object(obj) => Object::try_from(obj)?, + _ => Err(ParsePbError::Unsupported(format!( + "unsupported kvs value inner {:?}", + val_inner, + )))?, + }; + map.insert(Object::try_from(key)?, val_obj); + } + result_pb::key_values::key_value::Value::Nested(_) => todo!(), + } } Ok(DynEntry::new(Object::KV(map))) } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs index fc8bae3a1f32..7eace8d6acf3 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs @@ -43,13 +43,46 @@ struct ProjectOperator { } #[derive(Debug)] -pub enum Projector { +enum VariableValue { + Value(TagKey), + Nest(VariableKeyValues), +} + +#[derive(Debug)] +struct VariableKeyValue { + key: Object, + value: VariableValue, +} + +#[derive(Debug)] +struct VariableKeyValues { + key_vals: Vec, +} + +impl VariableKeyValues { + fn exec_projector(&self, input: &Record) -> FnExecResult { + let mut map_collection = Vec::with_capacity(self.key_vals.len()); + for kv in self.key_vals.iter() { + let key = kv.key.clone(); + let value = match &kv.value { + VariableValue::Value(tag_key) => tag_key.get_arc_entry(input)?, + VariableValue::Nest(nest) => nest.exec_projector(input)?, + }; + map_collection.push(PairEntry::new(key.into(), value).into()); + } + Ok(DynEntry::new(CollectionEntry { inner: map_collection })) + } +} + +#[derive(Debug)] +enum Projector { ExprProjector(Evaluator), GraphElementProjector(TagKey), - /// MultiGraphElementProject will output a collection entry. - /// If the key is given, it is a collection of PairEntry with user-given key, and value of projected graph elements (computed via TagKey); - /// If the key is None, it is a collection of projected graph elements (computed via TagKey). - MultiGraphElementProjector(Vec<(Option, TagKey)>), + /// VecProjector will output a collection entry, which is a collection of projected graph elements (computed via TagKey). + VecProjector(Vec), + /// MapProjector will output a collection entry, which is a collection of key-value pairs. The key is a Object (preserve the user-given key), and the value is a projected graph element (computed via TagKey). + /// Besides, MapProjector supports nested map. + MapProjector(VariableKeyValues), /// A simple concatenation of multiple entries. ConcatProjector(Vec), } @@ -69,18 +102,15 @@ fn exec_projector(input: &Record, projector: &Projector) -> FnExecResult tag_key.get_arc_entry(input)?, - Projector::MultiGraphElementProjector(key_vals) => { - let mut collection = Vec::with_capacity(key_vals.len()); - for (key, tag_key) in key_vals.iter() { + Projector::VecProjector(vec) => { + let mut collection = Vec::with_capacity(vec.len()); + for tag_key in vec.iter() { let entry = tag_key.get_arc_entry(input)?; - if let Some(key) = key { - collection.push(PairEntry::new(key.clone().into(), entry).into()); - } else { - collection.push(entry); - } + collection.push(entry); } DynEntry::new(CollectionEntry { inner: collection }) } + Projector::MapProjector(map) => map.exec_projector(input)?, Projector::ConcatProjector(concat_vars) => { if concat_vars.len() != 2 { Err(FnExecError::unsupported_error("Only support concatenated 2 entries now"))? @@ -182,27 +212,13 @@ impl FilterMapFuncGen for pb::Project { let tag_keys = vars .keys .iter() - .map(|var| match TagKey::try_from(var.clone()) { - Ok(tag_key) => Ok((None, tag_key)), - Err(err) => Err(err), - }) - .collect::, TagKey)>, _>>()?; - Projector::MultiGraphElementProjector(tag_keys) + .map(|var| TagKey::try_from(var.clone())) + .collect::, _>>()?; + Projector::VecProjector(tag_keys) } common_pb::ExprOpr { item: Some(common_pb::expr_opr::Item::Map(key_vals)), .. } => { - let mut key_value_vec = Vec::with_capacity(key_vals.key_vals.len()); - for key_val in key_vals.key_vals.iter() { - let key = key_val.key.as_ref().ok_or_else(|| { - ParsePbError::EmptyFieldError(format!("key in Map Expr {:?}", key_val)) - })?; - let key_obj = Object::try_from(key.clone())?; - let val = key_val.value.as_ref().ok_or_else(|| { - ParsePbError::EmptyFieldError(format!("value in Map Expr {:?}", key_val)) - })?; - let tag_key = TagKey::try_from(val.clone())?; - key_value_vec.push((Some(key_obj), tag_key)); - } - Projector::MultiGraphElementProjector(key_value_vec) + let variable_key_values = VariableKeyValues::try_from(key_vals.clone())?; + Projector::MapProjector(variable_key_values) } common_pb::ExprOpr { item: Some(common_pb::expr_opr::Item::Concat(concat_vars)), @@ -234,6 +250,37 @@ impl FilterMapFuncGen for pb::Project { } } +impl TryFrom for VariableKeyValues { + type Error = ParsePbError; + + fn try_from(key_vals: common_pb::VariableKeyValues) -> Result { + let mut vec = Vec::with_capacity(key_vals.key_vals.len()); + for key_val in key_vals.key_vals { + let (_key, _value) = (key_val.key, key_val.value); + let key = if let Some(key) = _key { + Object::try_from(key.clone())? + } else { + return Err(ParsePbError::from("empty key provided in Map")); + }; + let value = if let Some(val) = _value { + match val { + common_pb::variable_key_value::Value::Val(val) => { + VariableValue::Value(TagKey::try_from(val.clone())?) + } + common_pb::variable_key_value::Value::Nested(nested_vals) => { + let nested = VariableKeyValues::try_from(nested_vals)?; + VariableValue::Nest(nested) + } + } + } else { + return Err(ParsePbError::from("empty value provided in Map")); + }; + vec.push(VariableKeyValue { key, value }); + } + Ok(VariableKeyValues { key_vals: vec }) + } +} + #[cfg(test)] mod tests { use ahash::HashMap; @@ -685,7 +732,7 @@ mod tests { } // g.V().valueMap('age', 'name') with alias of 'age' as 'newAge' and 'name' as 'newName', by map - // this is projected by MultiGraphElementProjector + // this is projected by MapProjector #[test] fn simple_project_map_mapping_test() { let project_opr_pb = pb::Project { @@ -816,7 +863,7 @@ mod tests { } // g.V().as("a").select("a").by(valueMap("age", "name")),with alias of 'a.age' as 'newAge' and 'a.tname' as 'newName', by map - // this is projected by MultiGraphElementProjector + // this is projected by MapProjector #[test] fn simple_project_tag_map_mapping_test() { let project_opr_pb = pb::Project { @@ -866,6 +913,102 @@ mod tests { assert_eq!(object_result, expected_result); } + // g.V().as("a").select("a").by(valueMap("age", "name")), with expr_opr as a nested VariableKeyValues + // this is projected by MapProjector (nested) + #[test] + fn project_nested_map_test() { + // project a.{name, age} + let variable_key_value = common_pb::VariableKeyValues { + key_vals: vec![ + common_pb::VariableKeyValue { + key: Some(common_pb::Value::from("name".to_string())), + value: Some(common_pb::variable_key_value::Value::Val(to_var_pb( + None, + Some("name".into()), + ))), + }, + common_pb::VariableKeyValue { + key: Some(common_pb::Value::from("age".to_string())), + value: Some(common_pb::variable_key_value::Value::Val(to_var_pb( + None, + Some("age".into()), + ))), + }, + ], + }; + let nested_variable_key_vals = common_pb::VariableKeyValues { + key_vals: vec![common_pb::VariableKeyValue { + key: Some(common_pb::Value::from("a".to_string())), + value: Some(common_pb::variable_key_value::Value::Nested(variable_key_value)), + }], + }; + let project_opr_pb = pb::Project { + mappings: vec![pb::project::ExprAlias { + expr: Some(common_pb::Expression { + operators: vec![common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Map(nested_variable_key_vals)), + }], + }), + alias: None, + }], + is_append: false, + }; + let mut result = project_test(init_source_with_tag(), project_opr_pb); + + let mut object_result = vec![]; + while let Some(Ok(res)) = result.next() { + let collection = res + .get(None) + .unwrap() + .as_any_ref() + .downcast_ref::() + .unwrap(); + let mut result = vec![]; + for entry in collection.inner.iter() { + let pair_entry = entry + .as_any_ref() + .downcast_ref::() + .unwrap(); + let key = pair_entry + .get_left() + .as_any_ref() + .downcast_ref::() + .unwrap(); + let value = pair_entry + .get_right() + .as_any_ref() + .downcast_ref::() + .unwrap(); + let mut result_value = vec![]; + for entry in value.inner.iter() { + let inner_pair = entry + .as_any_ref() + .downcast_ref::() + .unwrap(); + let inner_key = inner_pair + .get_left() + .as_any_ref() + .downcast_ref::() + .unwrap(); + let inner_val = inner_pair + .get_right() + .as_any_ref() + .downcast_ref::() + .unwrap(); + result_value.push((inner_key.clone(), inner_val.clone())); + } + result.push((key.clone(), result_value.clone())); + } + object_result.push(result); + } + let expected_result = vec![ + vec![(object!("a"), vec![(object!("name"), object!("marko")), (object!("age"), object!(29))])], + vec![(object!("a"), vec![(object!("name"), object!("vadas")), (object!("age"), object!(27))])], + ]; + assert_eq!(object_result, expected_result); + } + #[test] fn project_multi_mapping_tags() { let project_opr_pb = pb::Project { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs b/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs index 00db86cf7cd9..9428cdb72cda 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs @@ -289,7 +289,7 @@ pub(crate) mod tests { .into_iter() .map(|(key, value)| common_pb::VariableKeyValue { key: Some(common_pb::Value::from(key)), - value: Some(to_var_pb(value.0, value.1)), + value: Some(common_pb::variable_key_value::Value::Val(to_var_pb(value.0, value.1))), }) .collect(); common_pb::Expression { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs index 03e63494c38a..4e3227c5820d 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs @@ -140,9 +140,9 @@ impl RecordSinkEncoder { let val_pb: common_pb::Value = val.clone().into(); key_values.push(result_pb::key_values::KeyValue { key: Some(key_pb), - value: Some(result_pb::Element { + value: Some(result_pb::key_values::key_value::Value::Element(result_pb::Element { inner: Some(result_pb::element::Inner::Object(val_pb)), - }), + })), }) } return Some(result_pb::KeyValues { key_values }); @@ -161,8 +161,24 @@ impl RecordSinkEncoder { .unwrap(); if let Some(key_obj) = pair.get_left().as_object() { let key_pb: common_pb::Value = key_obj.clone().into(); - let val_pb = self.element_to_pb(pair.get_right()); - key_values.push(result_pb::key_values::KeyValue { key: Some(key_pb), value: Some(val_pb) }) + let val = pair.get_right(); + if val.get_type() == EntryType::Collection { + let inner_collection = val + .as_any_ref() + .downcast_ref::() + .unwrap(); + let inner_collection_pb = self.collection_map_to_pb(inner_collection.clone())?; + key_values.push(result_pb::key_values::KeyValue { + key: Some(key_pb), + value: Some(result_pb::key_values::key_value::Value::Nested(inner_collection_pb)), + }) + } else { + let val_pb = self.element_to_pb(pair.get_right()); + key_values.push(result_pb::key_values::KeyValue { + key: Some(key_pb), + value: Some(result_pb::key_values::key_value::Value::Element(val_pb)), + }) + } } else { Err(FnExecError::unsupported_error(&format!( "only support map result with object key, while it is {:?}",