Skip to content

Commit

Permalink
Use SerializableCoder
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jan 5, 2024
1 parent ae11f9a commit 8f39bd1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
package org.apache.beam.sdk.extensions.smb;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
Expand All @@ -36,8 +32,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
Expand Down Expand Up @@ -255,24 +249,4 @@ private ReadableFile toReadableFile(ResourceId resourceId) {
throw new RuntimeException(String.format("Exception opening bucket file %s", resourceId), e);
}
}

static class FileOperationsCoder<T> extends CustomCoder<FileOperations<T>> {
@Override
public void encode(FileOperations<T> value, OutputStream outStream)
throws CoderException, IOException {
final ObjectOutputStream oos = new ObjectOutputStream(outStream);
oos.writeObject(value);
oos.flush();
}

@Override
public FileOperations<T> decode(InputStream inStream) throws CoderException, IOException {
final ObjectInputStream ois = new ObjectInputStream(inStream);
try {
return (FileOperations<T>) ois.readObject();
} catch (ClassNotFoundException e) {
throw new CoderException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ private void writeObject(ObjectOutputStream outStream) throws IOException {
// Map distinct FileOperations/FileSuffixes to indices in a map, for efficient encoding of
// large BucketedInputs
final Map<KV<String, String>, Integer> fileOperationsMetadata = new HashMap<>();
final Map<Integer, KV<String, FileOperations<V>>> fileOperationsEncoding = new HashMap<>();
final Map<Integer, KV<String, FileOperations>> fileOperationsEncoding = new HashMap<>();
final Map<ResourceId, Integer> directoriesEncoding = new HashMap<>();
int i = 0;

Expand All @@ -653,8 +653,8 @@ private void writeObject(ObjectOutputStream outStream) throws IOException {
directoriesEncoding.put(entry.getKey(), fileOperationsMetadata.get(metadataKey));
}

final Coder<FileOperations<V>> fileOperationsCoder =
new FileOperations.FileOperationsCoder<>();
final SerializableCoder<FileOperations> fileOperationsCoder =
SerializableCoder.of(FileOperations.class);

MapCoder.of(VarIntCoder.of(), KvCoder.of(StringUtf8Coder.of(), fileOperationsCoder))
.encode(fileOperationsEncoding, outStream);
Expand All @@ -668,11 +668,10 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio
this.predicate = (Predicate<V>) inStream.readObject();
this.keying = (Keying) inStream.readObject();

final Coder<FileOperations<V>> fileOperationsCoder =
new FileOperations.FileOperationsCoder<>();

final Map<Integer, KV<String, FileOperations<V>>> fileOperationsEncoding =
MapCoder.of(VarIntCoder.of(), KvCoder.of(StringUtf8Coder.of(), fileOperationsCoder))
final Map<Integer, KV<String, FileOperations>> fileOperationsEncoding =
MapCoder.of(
VarIntCoder.of(),
KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(FileOperations.class)))
.decode(inStream);

final Map<ResourceId, Integer> directoriesEncoding =
Expand All @@ -683,7 +682,13 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio
.collect(
Collectors.toMap(
Map.Entry::getKey,
dirAndIndex -> fileOperationsEncoding.get(dirAndIndex.getValue())));
dirAndIndex -> {
final String dir =
fileOperationsEncoding.get(dirAndIndex.getValue()).getKey();
final FileOperations<V> fileOps =
fileOperationsEncoding.get(dirAndIndex.getValue()).getValue();
return KV.of(dir, fileOps);
}));
}
}

Expand Down

0 comments on commit 8f39bd1

Please sign in to comment.