From bada5d91a8dc3f74b35f29070c81fd52bebb9d8f Mon Sep 17 00:00:00 2001 From: Shawn Chang <42792772+CTTY@users.noreply.github.com> Date: Wed, 15 Nov 2023 16:50:38 -0800 Subject: [PATCH] [HUDI-5936] Fix serialization problem when FileStatus is not serializable (#10065) Co-authored-by: Shawn Chang --- .../common/fs/NonSerializableFileSystem.java | 115 ++++++++++++++ .../fs/TestHoodieSerializableFileStatus.java | 86 +++++++++++ .../fs/HoodieSerializableFileStatus.java | 144 ++++++++++++++++++ .../FileSystemBackedTableMetadata.java | 28 ++-- 4 files changed, 361 insertions(+), 12 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/NonSerializableFileSystem.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieSerializableFileStatus.java diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/NonSerializableFileSystem.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/NonSerializableFileSystem.java new file mode 100644 index 0000000000000..b612f088b8065 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/NonSerializableFileSystem.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * A non-serializable file system for testing only. See {@link TestHoodieSerializableFileStatus} + * Can't make this an inner class as the outer class would also be non-serializable and invalidate + * the purpose of testing + */ +public class NonSerializableFileSystem extends FileSystem { + @Override + public URI getUri() { + try { + return new URI(""); + } catch (URISyntaxException e) { + return null; + } + } + + @Override + public FSDataInputStream open(Path path, int i) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, + short i1, long l, Progressable progressable) throws IOException { + return null; + } + + @Override + public FSDataOutputStream append(Path path, int i, Progressable progressable) + throws IOException { + return null; + } + + @Override + public boolean rename(Path path, Path path1) throws IOException { + return false; + } + + @Override + public boolean delete(Path path, boolean b) throws IOException { + return false; + } + + @Override + public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { + FileStatus[] ret = new FileStatus[5]; + for (int i = 0; i < 5; i++) { + ret[i] = new FileStatus(100L, false, 1, 10000L, + 0L, 0, null, "owner", "group", path) { + Configuration conf = getConf(); + + @Override + public long getLen() { + return -1; + } + }; + } + return ret; + } + + @Override + public void setWorkingDirectory(Path path) {} + + @Override + public Path getWorkingDirectory() { + return null; + } + + @Override + public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + return null; + } + + public Configuration getConf() { + return new Configuration(); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java new file mode 100644 index 0000000000000..9d5e4e700c6e1 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.testutils.HoodieSparkClientTestHarness; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkException; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Test the if {@link HoodieSerializableFileStatus} is serializable + */ +@TestInstance(Lifecycle.PER_CLASS) +public class TestHoodieSerializableFileStatus extends HoodieSparkClientTestHarness { + + HoodieEngineContext engineContext; + List testPaths; + + @BeforeAll + public void setUp() throws IOException { + initSparkContexts(); + testPaths = new ArrayList<>(5); + for (int i = 0; i < 5; i++) { + testPaths.add(new Path("s3://table-bucket/")); + } + engineContext = new HoodieSparkEngineContext(jsc); + } + + @AfterAll + public void tearDown() { + cleanupSparkContexts(); + } + + @Test + public void testNonSerializableFileStatus() { + Exception e = Assertions.assertThrows(SparkException.class, + () -> { + List statuses = engineContext.flatMap(testPaths, path -> { + FileSystem fileSystem = new NonSerializableFileSystem(); + return Arrays.stream(fileSystem.listStatus(path)); + }, 5); + }, + "Serialization is supposed to fail!"); + Assertions.assertTrue(e.getMessage().contains("com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException")); + } + + @Test + public void testHoodieFileStatusSerialization() { + List statuses = engineContext.flatMap(testPaths, path -> { + FileSystem fileSystem = new NonSerializableFileSystem(); + return Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path))); + }, 5); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieSerializableFileStatus.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieSerializableFileStatus.java new file mode 100644 index 0000000000000..99c7e35935cd3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieSerializableFileStatus.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.io.Serializable; +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * A serializable file status implementation + *

+ * Use `HoodieFileStatus` generated by Avro instead this class if possible + * This class is needed because `hudi-hadoop-mr-bundle` relies on Avro 1.8.2, + * and won't work well with `HoodieFileStatus` + */ +public class HoodieSerializableFileStatus implements Serializable { + + private Path path; + private long length; + private Boolean isDir; + private short blockReplication; + private long blockSize; + private long modificationTime; + private long accessTime; + private FsPermission permission; + private String owner; + private String group; + private Path symlink; + + HoodieSerializableFileStatus(Path path, long length, boolean isDir, short blockReplication, + long blockSize, long modificationTime, long accessTime, + FsPermission permission, String owner, String group, Path symlink) { + this.path = path; + this.length = length; + this.isDir = isDir; + this.blockReplication = blockReplication; + this.blockSize = blockSize; + this.modificationTime = modificationTime; + this.accessTime = accessTime; + this.permission = permission; + this.owner = owner; + this.group = group; + this.symlink = symlink; + } + + public Path getPath() { + return path; + } + + public long getLen() { + return length; + } + + public Boolean isDirectory() { + return isDir; + } + + public short getReplication() { + return blockReplication; + } + + public long getBlockSize() { + return blockSize; + } + + public long getModificationTime() { + return modificationTime; + } + + public long getAccessTime() { + return accessTime; + } + + public FsPermission getPermission() { + return permission; + } + + public String getOwner() { + return owner; + } + + public String getGroup() { + return group; + } + + public Path getSymlink() { + return symlink; + } + + public static HoodieSerializableFileStatus fromFileStatus(FileStatus status) { + Path symlink; + try { + symlink = status.getSymlink(); + } catch (IOException ioe) { + // status is not symlink + symlink = null; + } + + return new HoodieSerializableFileStatus(status.getPath(), status.getLen(), status.isDir(), + status.getReplication(), status.getBlockSize(), status.getModificationTime(), + status.getAccessTime(), status.getPermission(), status.getOwner(), status.getGroup(), symlink); + } + + public static HoodieSerializableFileStatus[] fromFileStatuses(FileStatus[] statuses) { + return Arrays.stream(statuses) + .map(status -> HoodieSerializableFileStatus.fromFileStatus(status)) + .collect(Collectors.toList()) + .toArray(new HoodieSerializableFileStatus[statuses.length]); + } + + public static FileStatus toFileStatus(HoodieSerializableFileStatus status) { + return new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(), + status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), + status.getOwner(), status.getGroup(), status.getSymlink(), status.getPath()); + } + + public static FileStatus[] toFileStatuses(HoodieSerializableFileStatus[] statuses) { + return Arrays.stream(statuses) + .map(status -> HoodieSerializableFileStatus.toFileStatus(status)) + .collect(Collectors.toList()) + .toArray(new FileStatus[statuses.length]); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 3737793e0c661..574e81e2a77ef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieSerializableFileStatus; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; @@ -158,9 +159,10 @@ private List getPartitionPathWithPathPrefixUsingFilterExpression(String // List all directories in parallel engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all partitions with prefix " + relativePathPrefix); - List dirToFileListing = engineContext.flatMap(pathsToList, path -> { + // Need to use serializable file status here, see HUDI-5936 + List dirToFileListing = engineContext.flatMap(pathsToList, path -> { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); - return Arrays.stream(fileSystem.listStatus(path)); + return Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path))); }, listingParallelism); pathsToList.clear(); @@ -172,15 +174,16 @@ private List getPartitionPathWithPathPrefixUsingFilterExpression(String // and second entry holds optionally a directory path to be processed further. engineContext.setJobStatus(this.getClass().getSimpleName(), "Processing listed partitions"); List, Option>> result = engineContext.map(dirToFileListing, fileStatus -> { - FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get()); + Path path = fileStatus.getPath(); + FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); if (fileStatus.isDirectory()) { - if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) { - return Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), fileStatus.getPath())), Option.empty()); - } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { - return Pair.of(Option.empty(), Option.of(fileStatus.getPath())); + if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) { + return Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), path)), Option.empty()); + } else if (!path.getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { + return Pair.of(Option.empty(), Option.of(path)); } - } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { - String partitionName = FSUtils.getRelativePartitionPath(dataBasePath.get(), fileStatus.getPath().getParent()); + } else if (path.getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { + String partitionName = FSUtils.getRelativePartitionPath(dataBasePath.get(), path.getParent()); return Pair.of(Option.of(partitionName), Option.empty()); } return Pair.of(Option.empty(), Option.empty()); @@ -230,13 +233,14 @@ public Map getAllFilesInPartitions(Collection part int parallelism = Math.min(DEFAULT_LISTING_PARALLELISM, partitionPaths.size()); engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all files in " + partitionPaths.size() + " partitions"); - List> partitionToFiles = engineContext.map(new ArrayList<>(partitionPaths), partitionPathStr -> { + // Need to use serializable file status here, see HUDI-5936 + List> partitionToFiles = engineContext.map(new ArrayList<>(partitionPaths), partitionPathStr -> { Path partitionPath = new Path(partitionPathStr); FileSystem fs = partitionPath.getFileSystem(hadoopConf.get()); - return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartition(fs, partitionPath)); + return Pair.of(partitionPathStr, HoodieSerializableFileStatus.fromFileStatuses(FSUtils.getAllDataFilesInPartition(fs, partitionPath))); }, parallelism); - return partitionToFiles.stream().collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + return partitionToFiles.stream().collect(Collectors.toMap(Pair::getLeft, pair -> HoodieSerializableFileStatus.toFileStatuses(pair.getRight()))); } @Override