Skip to content

Commit

Permalink
fix property type. fix macos action failure
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <chenxu@dmetasoul.com>
  • Loading branch information
dmetasoul01 committed Aug 28, 2023
1 parent d44671f commit fec81fa
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/native-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand Down Expand Up @@ -182,6 +183,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand Down Expand Up @@ -238,6 +240,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: |
MAVEN_OPTS="-Xmx4000m" mvn -q -B package --file pom.xml -Pcross-build -DskipTests -Dmaven.test.skip=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
identity.tableLocation, msgSchema, identity.useCDC, identity.cdcColumn, partition);
JSONObject properties = new JSONObject();
if (isCdc) {
properties.put(USE_CDC.key(), true);
properties.put(USE_CDC.key(), "true");
properties.put(CDC_CHANGE_COLUMN, CDC_CHANGE_COLUMN_DEFAULT);
}
dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.json(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public static StructType toSparkSchema(RowType rowType, Optional<String> cdcColu
String name = field.getName();
if (name.equals(SORT_FIELD)) continue;
LogicalType logicalType = field.getType();
org.apache.spark.sql.types.DataType dataType = org.apache.spark.sql.arrow.ArrowUtils.fromArrowField(ArrowUtils.toArrowField(name, logicalType));
org.apache.spark.sql.types.DataType
dataType =
org.apache.spark.sql.arrow.ArrowUtils.fromArrowField(ArrowUtils.toArrowField(name, logicalType));
stNew = stNew.add(name, dataType, logicalType.isNullable());
}

Expand All @@ -89,7 +91,15 @@ public static StructType toSparkSchema(RowType rowType, Optional<String> cdcColu
} else {
StructField field = stNew.fields()[(Integer) cdcFieldIndex.get()];
if (!field.toString().equals(cdcField.toString()))
throw new CatalogException(CDC_CHANGE_COLUMN + "=" + cdcColName + "has an invalid field of" + field + "," + CDC_CHANGE_COLUMN + " require field of " + cdcField);
throw new CatalogException(CDC_CHANGE_COLUMN +
"=" +
cdcColName +
"has an invalid field of" +
field +
"," +
CDC_CHANGE_COLUMN +
" require field of " +
cdcField);
}
}
return stNew;
Expand All @@ -101,7 +111,10 @@ public static StructType toSparkSchema(TableSchema tsc, Optional<String> cdcColu
for (int i = 0; i < tsc.getFieldCount(); i++) {
String name = tsc.getFieldName(i).get();
DataType dt = tsc.getFieldDataType(i).get();
org.apache.spark.sql.types.DataType dataType = org.apache.spark.sql.arrow.ArrowUtils.fromArrowField(ArrowUtils.toArrowField(name, dt.getLogicalType()));
org.apache.spark.sql.types.DataType
dataType =
org.apache.spark.sql.arrow.ArrowUtils.fromArrowField(ArrowUtils.toArrowField(name,
dt.getLogicalType()));
stNew = stNew.add(name, dataType, dt.getLogicalType().isNullable());
}
if (cdcColumn.isPresent()) {
Expand All @@ -114,7 +127,15 @@ public static StructType toSparkSchema(TableSchema tsc, Optional<String> cdcColu
} else {
StructField field = stNew.fields()[(Integer) cdcFieldIndex.get()];
if (!field.toString().equals(cdcField.toString()))
throw new CatalogException(CDC_CHANGE_COLUMN + "=" + cdcColName + " has an invalid field of " + field + "," + CDC_CHANGE_COLUMN + " require field of " + cdcField);
throw new CatalogException(CDC_CHANGE_COLUMN +
"=" +
cdcColName +
" has an invalid field of " +
field +
"," +
CDC_CHANGE_COLUMN +
" require field of " +
cdcField);
}
}
return stNew;
Expand Down Expand Up @@ -178,12 +199,14 @@ public static CatalogTable toFlinkCatalog(TableInfo tableInfo) {
JSONObject properties = JSON.parseObject(tableInfo.getProperties());

StructType struct = (StructType) org.apache.spark.sql.types.DataType.fromJson(tableSchema);
org.apache.arrow.vector.types.pojo.Schema arrowSchema = org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
org.apache.arrow.vector.types.pojo.Schema
arrowSchema =
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
RowType rowType = ArrowUtils.fromArrowSchema(arrowSchema);
Builder bd = Schema.newBuilder();

String lakesoulCdcColumnName = properties.getString(CDC_CHANGE_COLUMN);
boolean contains = (lakesoulCdcColumnName != null && !"".equals(lakesoulCdcColumnName));
boolean contains = (lakesoulCdcColumnName != null && !lakesoulCdcColumnName.isEmpty());

for (RowType.RowField field : rowType.getFields()) {
if (contains && field.getName().equals(lakesoulCdcColumnName)) {
Expand Down Expand Up @@ -282,7 +305,9 @@ public static String getDatabaseName(String fullDatabaseName) {

public static void setFSConfigs(Configuration conf, NativeIOBase io) {
conf.addAll(GlobalConfiguration.loadConfiguration());
org.apache.hadoop.conf.Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
org.apache.hadoop.conf.Configuration
hadoopConf =
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
String defaultFS = hadoopConf.get("fs.defaultFS");
io.setObjectStoreOption("fs.defaultFS", defaultFS);

Expand Down Expand Up @@ -357,7 +382,8 @@ public static DataFileInfo[] getTargetDataFileInfo(TableInfo tif, List<Map<Strin
}
}

public static Map<String, Map<Integer, List<Path>>> splitDataInfosToRangeAndHashPartition(String tid, DataFileInfo[] dfinfos) {
public static Map<String, Map<Integer, List<Path>>> splitDataInfosToRangeAndHashPartition(String tid,
DataFileInfo[] dfinfos) {
Map<String, Map<Integer, List<Path>>> splitByRangeAndHashPartition = new LinkedHashMap<>();
TableInfo tif = DataOperation.dbManager().getTableInfoByTableId(tid);
for (DataFileInfo pif : dfinfos) {
Expand Down Expand Up @@ -393,7 +419,8 @@ public static <R> R getType(Schema.UnresolvedColumn unresolvedColumn) {

public static boolean isExistHashPartition(TableInfo tif) {
JSONObject tableProperties = JSON.parseObject(tif.getProperties());
if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM()) && tableProperties.getString(LakeSoulOptions.HASH_BUCKET_NUM()).equals("-1")) {
if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM()) &&
tableProperties.getString(LakeSoulOptions.HASH_BUCKET_NUM()).equals("-1")) {
return false;
} else {
return tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM());
Expand Down

0 comments on commit fec81fa

Please sign in to comment.