diff --git a/.github/workflows/flink-cdc-hdfs-test.yml b/.github/workflows/flink-cdc-hdfs-test.yml index 4d2a7e51f..5d882d899 100644 --- a/.github/workflows/flink-cdc-hdfs-test.yml +++ b/.github/workflows/flink-cdc-hdfs-test.yml @@ -88,7 +88,7 @@ jobs: cp ./lakesoul-spark/target/$SPARK_TEST_JAR_NAME ./script/benchmark/work-dir - uses: beyondstorage/setup-hdfs@master with: - hdfs-version: '3.3.2' + hdfs-version: '3.3.6' - name: Modify HDFS Host run: | sed -i 's/localhost/172.17.0.1/g' $HADOOP_HOME/etc/hadoop/core-site.xml diff --git a/.github/workflows/maven-test.yml b/.github/workflows/maven-test.yml index 22d29636b..87cc577f3 100644 --- a/.github/workflows/maven-test.yml +++ b/.github/workflows/maven-test.yml @@ -242,7 +242,7 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: beyondstorage/setup-hdfs@master with: - hdfs-version: '3.3.2' + hdfs-version: '3.3.6' - name: Modify HDFS User Group Mapping run: | sed -i '/^<\/configuration>/i hadoop.user.group.static.mapping.overridesadmin1=domain1;user1=domain1;user2=domain1;admin2=domain2' $HADOOP_HOME/etc/hadoop/core-site.xml @@ -398,7 +398,7 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: beyondstorage/setup-hdfs@master with: - hdfs-version: '3.3.2' + hdfs-version: '3.3.6' - name: Modify HDFS User Group Mapping run: | sed -i '/^<\/configuration>/i hadoop.user.group.static.mapping.overridesadmin1=domain1;user1=domain1;user2=domain1;admin2=domain2' $HADOOP_HOME/etc/hadoop/core-site.xml diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml index f9dbc9e0d..e9a42ba9c 100644 --- a/lakesoul-flink/pom.xml +++ b/lakesoul-flink/pom.xml @@ -250,7 +250,7 @@ SPDX-License-Identifier: Apache-2.0 org.furyio fury-core - 0.1.0 + 0.4.0 diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java index 7352c60af..235782cdf 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java @@ -9,8 +9,8 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import io.fury.Fury; -import io.fury.Language; import io.fury.ThreadLocalFury; +import io.fury.config.Language; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryFormat; @@ -78,10 +78,13 @@ public BinarySourceRecordSerializer() { } @Override public void write(Kryo kryo, Output output, BinarySourceRecord object) { - fury.getCurrentFury().serializeJavaObject(output, object); + fury.execute(f -> { + f.serializeJavaObject(output, object); + return 0; + }); } @Override public BinarySourceRecord read(Kryo kryo, Input input, Class type) { - return fury.getCurrentFury().deserializeJavaObject(input, BinarySourceRecord.class); + return fury.execute(f -> f.deserializeJavaObject(input, BinarySourceRecord.class)); } } diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java index a7febb6f8..2babb5484 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java @@ -138,9 +138,8 @@ public void testDeletePkSQL() throws ExecutionException, InterruptedException { } catch (Throwable e) { System.out.println("Unsupported DELETE SQL"); } - StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE); String testSelect = "select * from user_info"; - TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); + TableImpl flinkTable = (TableImpl) tEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]"}); }