From b8bb38a73ebc96aeee24d9a4dc7f365339df7f42 Mon Sep 17 00:00:00 2001 From: chenxu Date: Mon, 17 Jun 2024 18:30:03 +0800 Subject: [PATCH] fix flink package Signed-off-by: chenxu --- lakesoul-flink/pom.xml | 6 +++++- .../apache/flink/lakesoul/tool/FlinkUtil.java | 17 +++-------------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml index d75cfb52d..c292107e7 100644 --- a/lakesoul-flink/pom.xml +++ b/lakesoul-flink/pom.xml @@ -554,6 +554,10 @@ SPDX-License-Identifier: Apache-2.0 shaded.parquet com.lakesoul.shaded.shaded.parquet + + org.yaml.snakeyaml + com.lakesoul.shaded.org.yaml.snakeyaml + - \ No newline at end of file + diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java index 3e632f36b..6bf4c6510 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java @@ -75,19 +75,7 @@ import static org.apache.flink.lakesoul.tool.JobOptions.S3_ENDPOINT; import static org.apache.flink.lakesoul.tool.JobOptions.S3_PATH_STYLE_ACCESS; import static org.apache.flink.lakesoul.tool.JobOptions.S3_SECRET_KEY; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_PARALLELISM; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN_DEFAULT; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.COMPUTE_COLUMN_JSON; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.HASH_BUCKET_NUM; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.LAKESOUL_VIEW; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.LAKESOUL_VIEW_KIND; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.LAKESOUL_VIEW_TYPE; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SORT_FIELD; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.USE_CDC; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.VIEW_EXPANDED_QUERY; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.VIEW_ORIGINAL_QUERY; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.WATERMARK_SPEC_JSON; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType; @@ -235,6 +223,7 @@ public static boolean isTable(TableInfo tableInfo) { public static CatalogBaseTable toFlinkCatalog(TableInfo tableInfo) { String tableSchema = tableInfo.getTableSchema(); JSONObject properties = JSON.parseObject(tableInfo.getProperties()); + properties.put(CATALOG_PATH.key(), tableInfo.getTablePath()); org.apache.arrow.vector.types.pojo.Schema arrowSchema = null; if (TableInfoDao.isArrowKindSchema(tableSchema)) { @@ -441,7 +430,7 @@ public static Object convertStringToInternalValue(String valStr, LogicalType typ } public static DataFileInfo[] getTargetDataFileInfo(TableInfo tif, List> remainingPartitions) { - if (remainingPartitions == null || remainingPartitions.size() == 0) { + if (remainingPartitions == null) { return DataOperation.getTableDataInfo(tif.getTableId()); } else { List partitionDescs = remainingPartitions.stream()