Skip to content

Commit

Permalink
[bugfix](paimon)add support for 'in' and 'not in' (apache#38390)
Browse files Browse the repository at this point in the history
## Proposed changes

add support for `in` and `not in`:

```
select * from tb where partition_column in ('a','b','c');
select * from tb where partition_column not in ('a','b','c');

```
  • Loading branch information
wuwenchi authored Jul 30, 2024
1 parent 15eac46 commit 65c24f1
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.Subquery;
import org.apache.doris.thrift.TExprOpcode;

import org.apache.paimon.data.BinaryString;
Expand Down Expand Up @@ -85,11 +87,48 @@ private Predicate convertToPaimonExpr(Expr dorisExpr) {
default:
return null;
}
} else if (dorisExpr instanceof InPredicate) {
return doInPredicate((InPredicate) dorisExpr);
} else {
return binaryExprDesc(dorisExpr);
}
}

private Predicate doInPredicate(InPredicate predicate) {
// InPredicate, only support a in (1,2,3)
if (predicate.contains(Subquery.class)) {
return null;
}

SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0));
if (slotRef == null) {
return null;
}
String colName = slotRef.getColumnName();
int idx = fieldNames.indexOf(colName);
DataType dataType = paimonFieldTypes.get(idx);
List<Object> valueList = new ArrayList<>();
for (int i = 1; i < predicate.getChildren().size(); i++) {
if (!(predicate.getChild(i) instanceof LiteralExpr)) {
return null;
}
LiteralExpr literalExpr = convertDorisExprToLiteralExpr(predicate.getChild(i));
Object value = dataType.accept(new PaimonValueConverter(literalExpr));
if (value == null) {
return null;
}
valueList.add(value);
}

if (predicate.isNotIn()) {
// not in
return builder.notIn(idx, valueList);
} else {
// in
return builder.in(idx, valueList);
}
}

private Predicate binaryExprDesc(Expr dorisExpr) {
TExprOpcode opcode = dorisExpr.getOpcode();
// Make sure the col slot is always first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,26 @@ public Map<String, String> getLocationProperties() throws MetaNotFoundException,

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
String result = super.getNodeExplainString(prefix, detailLevel)
+ String.format("%spaimonNativeReadSplits=%d/%d\n",
prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum));
StringBuilder sb = new StringBuilder(super.getNodeExplainString(prefix, detailLevel));
sb.append(String.format("%spaimonNativeReadSplits=%d/%d\n",
prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum)));

sb.append(prefix).append("predicatesFromPaimon:");
if (predicates.isEmpty()) {
sb.append(" NONE\n");
} else {
sb.append("\n");
for (Predicate predicate : predicates) {
sb.append(prefix).append(prefix).append(predicate).append("\n");
}
}

if (detailLevel == TExplainLevel.VERBOSE) {
result += prefix + "PaimonSplitStats: \n";
sb.append(prefix).append("PaimonSplitStats: \n");
for (SplitStat splitStat : splitStats) {
result += String.format("%s %s\n", prefix, splitStat);
sb.append(String.format("%s %s\n", prefix, splitStat));
}
}
return result;
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_paimon_predict", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable paimon test")
return
}

String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String catalog_name = "test_paimon_predict"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")

sql """drop catalog if exists ${catalog_name}"""
sql """
CREATE CATALOG ${catalog_name} PROPERTIES (
'type' = 'paimon',
'warehouse' = 's3://warehouse/wh',
's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
's3.access_key' = 'admin',
's3.secret_key' = 'password',
's3.path.style.access' = 'true'
);
"""
sql """use `${catalog_name}`.`spark_paimon`"""

explain {
sql("select * from predict_for_in")
contains("inputSplitNum=9")
}

def explain_one_column = { col_name ->

explain {
sql("select * from predict_for_in where ${col_name} in ('a')")
contains("inputSplitNum=3")
}

explain {
sql("select * from predict_for_in where ${col_name} in ('b')")
contains("inputSplitNum=3")
}

explain {
sql("select * from predict_for_in where ${col_name} in ('a','b')")
contains("inputSplitNum=6")
}

explain {
sql("select * from predict_for_in where ${col_name} in ('a','x')")
contains("inputSplitNum=3")
}

explain {
sql("select * from predict_for_in where ${col_name} in ('x','y')")
contains("inputSplitNum=0")
}

explain {
sql("select * from predict_for_in where ${col_name} in ('a','b','c')")
contains("inputSplitNum=9")
}

explain {
sql("select * from predict_for_in where ${col_name} in ('y','x','a','c')")
contains("inputSplitNum=6")
}

explain {
sql("select * from predict_for_in where ${col_name} not in ('y','x','a','c')")
contains("inputSplitNum=3")
}

explain {
sql("select * from predict_for_in where ${col_name} not in ('a')")
contains("inputSplitNum=6")
}

explain {
sql("select * from predict_for_in where ${col_name} not in ('x')")
contains("inputSplitNum=9")
}
}

explain_one_column('dt')
explain_one_column('hh')


sql """ drop catalog if exists ${catalog_name} """
}


/*
for spark:
create table predict_for_in(id int, dt string, hh string) partitioned by(dt,hh);
insert into predict_for_in values (1, 'a', 'a');
insert into predict_for_in values (2, 'a', 'b');
insert into predict_for_in values (3, 'a', 'c');
insert into predict_for_in values (4, 'b', 'a');
insert into predict_for_in values (5, 'b', 'b');
insert into predict_for_in values (6, 'b', 'c');
insert into predict_for_in values (7, 'c', 'a');
insert into predict_for_in values (8, 'c', 'b');
insert into predict_for_in values (9, 'c', 'c');
*/

0 comments on commit 65c24f1

Please sign in to comment.