Skip to content

Commit

Permalink
[HUDI-5936] Fix serialization problem when FileStatus is not serializ…
Browse files Browse the repository at this point in the history
…able (apache#10065)

Co-authored-by: Shawn Chang <yxchang@amazon.com>
  • Loading branch information
CTTY and CTTY authored Nov 16, 2023
1 parent dcd5a81 commit bada5d9
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.fs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
* A non-serializable file system for testing only. See {@link TestHoodieSerializableFileStatus}
* Can't make this an inner class as the outer class would also be non-serializable and invalidate
* the purpose of testing
*/
public class NonSerializableFileSystem extends FileSystem {
@Override
public URI getUri() {
try {
return new URI("");
} catch (URISyntaxException e) {
return null;
}
}

@Override
public FSDataInputStream open(Path path, int i) throws IOException {
return null;
}

@Override
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i,
short i1, long l, Progressable progressable) throws IOException {
return null;
}

@Override
public FSDataOutputStream append(Path path, int i, Progressable progressable)
throws IOException {
return null;
}

@Override
public boolean rename(Path path, Path path1) throws IOException {
return false;
}

@Override
public boolean delete(Path path, boolean b) throws IOException {
return false;
}

@Override
public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
FileStatus[] ret = new FileStatus[5];
for (int i = 0; i < 5; i++) {
ret[i] = new FileStatus(100L, false, 1, 10000L,
0L, 0, null, "owner", "group", path) {
Configuration conf = getConf();

@Override
public long getLen() {
return -1;
}
};
}
return ret;
}

@Override
public void setWorkingDirectory(Path path) {}

@Override
public Path getWorkingDirectory() {
return null;
}

@Override
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
return false;
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
return null;
}

public Configuration getConf() {
return new Configuration();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.fs;

import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkException;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Test the if {@link HoodieSerializableFileStatus} is serializable
*/
@TestInstance(Lifecycle.PER_CLASS)
public class TestHoodieSerializableFileStatus extends HoodieSparkClientTestHarness {

HoodieEngineContext engineContext;
List<Path> testPaths;

@BeforeAll
public void setUp() throws IOException {
initSparkContexts();
testPaths = new ArrayList<>(5);
for (int i = 0; i < 5; i++) {
testPaths.add(new Path("s3://table-bucket/"));
}
engineContext = new HoodieSparkEngineContext(jsc);
}

@AfterAll
public void tearDown() {
cleanupSparkContexts();
}

@Test
public void testNonSerializableFileStatus() {
Exception e = Assertions.assertThrows(SparkException.class,
() -> {
List<FileStatus> statuses = engineContext.flatMap(testPaths, path -> {
FileSystem fileSystem = new NonSerializableFileSystem();
return Arrays.stream(fileSystem.listStatus(path));
}, 5);
},
"Serialization is supposed to fail!");
Assertions.assertTrue(e.getMessage().contains("com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException"));
}

@Test
public void testHoodieFileStatusSerialization() {
List<HoodieSerializableFileStatus> statuses = engineContext.flatMap(testPaths, path -> {
FileSystem fileSystem = new NonSerializableFileSystem();
return Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
}, 5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.fs;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;

import java.io.Serializable;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;

/**
* A serializable file status implementation
* <p>
* Use `HoodieFileStatus` generated by Avro instead this class if possible
* This class is needed because `hudi-hadoop-mr-bundle` relies on Avro 1.8.2,
* and won't work well with `HoodieFileStatus`
*/
public class HoodieSerializableFileStatus implements Serializable {

private Path path;
private long length;
private Boolean isDir;
private short blockReplication;
private long blockSize;
private long modificationTime;
private long accessTime;
private FsPermission permission;
private String owner;
private String group;
private Path symlink;

HoodieSerializableFileStatus(Path path, long length, boolean isDir, short blockReplication,
long blockSize, long modificationTime, long accessTime,
FsPermission permission, String owner, String group, Path symlink) {
this.path = path;
this.length = length;
this.isDir = isDir;
this.blockReplication = blockReplication;
this.blockSize = blockSize;
this.modificationTime = modificationTime;
this.accessTime = accessTime;
this.permission = permission;
this.owner = owner;
this.group = group;
this.symlink = symlink;
}

public Path getPath() {
return path;
}

public long getLen() {
return length;
}

public Boolean isDirectory() {
return isDir;
}

public short getReplication() {
return blockReplication;
}

public long getBlockSize() {
return blockSize;
}

public long getModificationTime() {
return modificationTime;
}

public long getAccessTime() {
return accessTime;
}

public FsPermission getPermission() {
return permission;
}

public String getOwner() {
return owner;
}

public String getGroup() {
return group;
}

public Path getSymlink() {
return symlink;
}

public static HoodieSerializableFileStatus fromFileStatus(FileStatus status) {
Path symlink;
try {
symlink = status.getSymlink();
} catch (IOException ioe) {
// status is not symlink
symlink = null;
}

return new HoodieSerializableFileStatus(status.getPath(), status.getLen(), status.isDir(),
status.getReplication(), status.getBlockSize(), status.getModificationTime(),
status.getAccessTime(), status.getPermission(), status.getOwner(), status.getGroup(), symlink);
}

public static HoodieSerializableFileStatus[] fromFileStatuses(FileStatus[] statuses) {
return Arrays.stream(statuses)
.map(status -> HoodieSerializableFileStatus.fromFileStatus(status))
.collect(Collectors.toList())
.toArray(new HoodieSerializableFileStatus[statuses.length]);
}

public static FileStatus toFileStatus(HoodieSerializableFileStatus status) {
return new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(),
status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
status.getOwner(), status.getGroup(), status.getSymlink(), status.getPath());
}

public static FileStatus[] toFileStatuses(HoodieSerializableFileStatus[] statuses) {
return Arrays.stream(statuses)
.map(status -> HoodieSerializableFileStatus.toFileStatus(status))
.collect(Collectors.toList())
.toArray(new FileStatus[statuses.length]);
}
}
Loading

0 comments on commit bada5d9

Please sign in to comment.