Skip to content

Commit

Permalink
[FLINK-35772][filesystem] Drop DuplicatingFileSystem in favour of the…
Browse files Browse the repository at this point in the history
… newer PathsCopyingFileSystem
  • Loading branch information
pnowojski committed Aug 21, 2024
1 parent 624bc50 commit 56c8199
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,18 @@
import java.util.List;

/**
* An extension interface for {@link FileSystem FileSystems} that can perform cheap DFS side
* duplicate operation. Such an operation can improve the time required for creating cheaply
* independent snapshots from incremental snapshots.
* This interface is no longer used. Implementing it doesn't have any effect. Please migrate to
* {@link PathsCopyingFileSystem} which provides the same functionality.
*/
@Deprecated
public interface DuplicatingFileSystem {
/**
* Tells if we can perform duplicate/copy between given paths.
*
* <p>This should be a rather cheap operation, preferably not involving any remote accesses. You
* can check e.g. if both paths are on the same host.
*
* @param source The path of the source file to duplicate
* @param destination The path where to duplicate the source file
* @return true, if we can perform the duplication
*/
/** Please use {@link PathsCopyingFileSystem#canCopyPaths(Path, Path)}. */
boolean canFastDuplicate(Path source, Path destination) throws IOException;

/**
* Duplicates the source path into the destination path.
*
* <p>You should first check if you can duplicate with {@link #canFastDuplicate(Path, Path)}.
*
* @param requests Pairs of src/dst to copy.
*/
/** Please use {@link PathsCopyingFileSystem#copyFiles(List, ICloseableRegistry)}. */
void duplicate(List<CopyRequest> requests) throws IOException;

/** A pair of source and destination to duplicate a file. */
/** Please use {@link PathsCopyingFileSystem.CopyRequest}. */
interface CopyRequest {
/** The path of the source file to duplicate. */
Path getSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.flink.runtime.state.filesystem;

import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.DuplicatingFileSystem.CopyRequest;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest;
import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.StreamStateHandle;

Expand All @@ -33,14 +34,14 @@

/**
* An implementation of {@link CheckpointStateToolset} that does file based duplicating with as
* {@link DuplicatingFileSystem}.
* {@link PathsCopyingFileSystem}.
*/
public class FsCheckpointStateToolset implements CheckpointStateToolset {

private final Path basePath;
private final DuplicatingFileSystem fs;
private final PathsCopyingFileSystem fs;

public FsCheckpointStateToolset(Path basePath, DuplicatingFileSystem fs) {
public FsCheckpointStateToolset(Path basePath, PathsCopyingFileSystem fs) {
this.basePath = basePath;
this.fs = fs;
}
Expand All @@ -52,7 +53,7 @@ public boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOExceptio
}
final Path srcPath = ((FileStateHandle) stateHandle).getFilePath();
final Path dst = getNewDstPath(srcPath.getName());
return fs.canFastDuplicate(srcPath, dst);
return fs.canCopyPaths(srcPath, dst);
}

@Override
Expand All @@ -67,7 +68,7 @@ public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles)
final Path srcPath = ((FileStateHandle) handle).getFilePath();
requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName())));
}
fs.duplicate(requests);
fs.copyFiles(requests, new CloseableRegistry());

return IntStream.range(0, stateHandles.size())
.mapToObj(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
Expand Down Expand Up @@ -213,9 +213,9 @@ public CheckpointStateOutputStream createTaskOwnedStateStream() {

@Override
public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
if (fileSystem instanceof DuplicatingFileSystem) {
if (fileSystem instanceof PathsCopyingFileSystem) {
return new FsCheckpointStateToolset(
taskOwnedStateDirectory, (DuplicatingFileSystem) fileSystem);
taskOwnedStateDirectory, (PathsCopyingFileSystem) fileSystem);
} else {
return new NotDuplicatingCheckpointStateToolset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.flink.runtime.state.filesystem;

import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
Expand Down Expand Up @@ -132,12 +132,13 @@ public FsCheckpointStreamFactory(
this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
this.fileStateThreshold = fileStateSizeThreshold;
this.writeBufferSize = writeBufferSize;
if (fileSystem instanceof DuplicatingFileSystem) {
final DuplicatingFileSystem duplicatingFileSystem = (DuplicatingFileSystem) fileSystem;
if (fileSystem instanceof PathsCopyingFileSystem) {
final PathsCopyingFileSystem pathsCopyingFileSystem =
(PathsCopyingFileSystem) fileSystem;
this.privateStateToolset =
new FsCheckpointStateToolset(checkpointDirectory, duplicatingFileSystem);
new FsCheckpointStateToolset(checkpointDirectory, pathsCopyingFileSystem);
this.sharedStateToolset =
new FsCheckpointStateToolset(sharedStateDirectory, duplicatingFileSystem);
new FsCheckpointStateToolset(sharedStateDirectory, pathsCopyingFileSystem);
} else {
this.privateStateToolset = null;
this.sharedStateToolset = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.flink.runtime.state.filesystem;

import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.testutils.TestFileSystem;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -90,14 +92,16 @@ void testDuplicating() throws IOException {
new Path("test-path", "test-file3"), "test-file3", 0));
}

private static final class TestDuplicatingFileSystem implements DuplicatingFileSystem {
private static final class TestDuplicatingFileSystem extends TestFileSystem
implements PathsCopyingFileSystem {

@Override
public boolean canFastDuplicate(Path source, Path destination) throws IOException {
public boolean canCopyPaths(Path source, Path destination) throws IOException {
return !source.equals(destination);
}

@Override
public void duplicate(List<CopyRequest> requests) throws IOException {}
public void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry)
throws IOException {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.apache.flink.runtime.state.filesystem;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
Expand Down Expand Up @@ -324,15 +325,16 @@ void testDuplicationCheckpointStateToolset() throws Exception {
}

private static final class TestDuplicatingFileSystem extends TestFileSystem
implements DuplicatingFileSystem {
implements PathsCopyingFileSystem {

@Override
public boolean canFastDuplicate(Path source, Path destination) throws IOException {
public boolean canCopyPaths(Path source, Path destination) throws IOException {
return !source.equals(destination);
}

@Override
public void duplicate(List<CopyRequest> requests) throws IOException {}
public void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry)
throws IOException {}
}

// ------------------------------------------------------------------------
Expand Down

0 comments on commit 56c8199

Please sign in to comment.