Skip to content

Commit

Permalink
[fix](paimon)Remove the static attribute of the source for paimon for…
Browse files Browse the repository at this point in the history
… branch_2.0 #29032 (#29054)
  • Loading branch information
wuwenchi authored Dec 27, 2023
1 parent 1dbc220 commit 3edac2f
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.external.hudi.HudiScanNode;
import org.apache.doris.planner.external.hudi.HudiSplit;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.iceberg.IcebergSplit;
import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.planner.external.paimon.PaimonSplit;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
Expand Down Expand Up @@ -261,6 +255,10 @@ public TFileScanRangeParams getFileScanRangeParams() {
return params;
}

// Set some parameters of scan to support different types of file data sources
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
}

@Override
public void createScanRangeLocations() throws UserException {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -330,17 +328,7 @@ public void createScanRangeLocations() throws UserException {
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

// external data lake table
if (fileSplit instanceof IcebergSplit) {
// TODO: extract all data lake split to factory
IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
} else if (fileSplit instanceof PaimonSplit) {
PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) fileSplit);
} else if (fileSplit instanceof HudiSplit) {
HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
} else if (fileSplit instanceof MaxComputeSplit) {
MaxComputeScanNode.setScanParams(rangeDesc, (MaxComputeSplit) fileSplit);
}
setScanParams(rangeDesc, fileSplit);

curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
Expand Down Expand Up @@ -450,6 +438,7 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> col

protected abstract TFileFormatType getFileFormatType() throws UserException;


protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeNa
catalog = (MaxComputeExternalCatalog) table.getCatalog();
}

public static void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) {
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof MaxComputeSplit) {
setScanParams(rangeDesc, (MaxComputeSplit) split);
}
}

public void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ protected Map<String, String> getLocationProperties() throws UserException {
}
}

public static void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof HudiSplit) {
setHudiParams(rangeDesc, (HudiSplit) split);
}
}

public void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hudiSplit.getTableFormatType().value());
THudiFileDesc fileDesc = new THudiFileDesc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ protected void doInitialize() throws UserException {
super.doInitialize();
}

public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) {
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof IcebergSplit) {
setIcebergParams(rangeDesc, (IcebergSplit) split);
}
}

public void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import java.util.stream.Collectors;

public class PaimonScanNode extends FileQueryScanNode {
private static PaimonSource source = null;
private static List<Predicate> predicates;
private PaimonSource source = null;
private List<Predicate> predicates;

public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
Expand Down Expand Up @@ -91,7 +91,14 @@ public static <T> String encodeObjectToString(T t) {
}
}

public static void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof PaimonSplit) {
setPaimonParams(rangeDesc, (PaimonSplit) split);
}
}

public void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,5 +284,23 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
qt_c98 c98
qt_c99 c99
qt_c100 c100

// test view from jion paimon
sql """ switch internal """
String view_db = "test_view_for_paimon"
sql """ drop database if exists ${view_db}"""
sql """ create database if not exists ${view_db}"""
sql """use ${view_db}"""
sql """ create view test_tst_1 as select * from ${catalog_name}.`db1`.all_table; """
sql """ create view test_tst_2 as select * from ${catalog_name}.`db1`.all_table_with_parquet; """
sql """ create view test_tst_5 as select * from ${catalog_name}.`db1`.array_nested; """
sql """ create table test_tst_6 properties ("replication_num" = "1") as
select f.c2,f.c3,c.c1 from
(select a.c2,b.c3 from test_tst_1 a inner join test_tst_2 b on a.c2=b.c2) f
inner join test_tst_5 c on f.c2=c.c1;
"""
def view1 = """select * from test_tst_6 order by c1"""

// qt_view1 view1
}
}

0 comments on commit 3edac2f

Please sign in to comment.