diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 575ba6dedef7..f9844f3a73a5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -700,6 +700,15 @@ public abstract static class TypedWrite /** A function that converts UserT to a String, for writing to the file. */ abstract @Nullable SerializableFunction getFormatFunction(); + /** Batch size for input records. */ + abstract @Nullable Integer getBatchSize(); + + /** Batch size in bytes for input records. */ + abstract @Nullable Integer getBatchSizeBytes(); + + /** Batch max buffering duration for input records. */ + abstract @Nullable Duration getBatchMaxBufferingDuration(); + /** Whether to write windowed output files. */ abstract boolean getWindowedWrites(); @@ -757,6 +766,13 @@ abstract Builder setFormatFunction( abstract Builder setNumShards( @Nullable ValueProvider numShards); + abstract Builder setBatchSize(@Nullable Integer batchSize); + + abstract Builder setBatchSizeBytes(@Nullable Integer batchSizeBytes); + + abstract Builder setBatchMaxBufferingDuration( + @Nullable Duration batchMaxBufferingDuration); + abstract Builder setWindowedWrites(boolean windowedWrites); abstract Builder setAutoSharding(boolean windowedWrites); @@ -868,6 +884,38 @@ public TypedWrite withFormatFunction( return toBuilder().setFormatFunction(formatFunction).build(); } + /** + * Returns a new {@link TypedWrite} that will batch the input records using specified batch + * size. The default value is {@link WriteFiles#FILE_TRIGGERING_RECORD_COUNT}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public TypedWrite withBatchSize(@Nullable Integer batchSize) { + return toBuilder().setBatchSize(batchSize).build(); + } + + /** + * Returns a new {@link TypedWrite} that will batch the input records using specified batch size + * in bytes. The default value is {@link WriteFiles#FILE_TRIGGERING_BYTE_COUNT}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public TypedWrite withBatchSizeBytes(@Nullable Integer batchSizeBytes) { + return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); + } + + /** + * Returns a new {@link TypedWrite} that will batch the input records using specified max + * buffering duration. The default value is {@link + * WriteFiles#FILE_TRIGGERING_RECORD_BUFFERING_DURATION}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public TypedWrite withBatchMaxBufferingDuration( + @Nullable Duration batchMaxBufferingDuration) { + return toBuilder().setBatchMaxBufferingDuration(batchMaxBufferingDuration).build(); + } + /** Set the base directory used to generate temporary files. */ public TypedWrite withTempDirectory( ValueProvider tempDirectory) { @@ -1119,6 +1167,15 @@ public WriteFilesResult expand(PCollection input) { if (getSkipIfEmpty()) { write = write.withSkipIfEmpty(); } + if (getBatchSize() != null) { + write = write.withBatchSize(getBatchSize()); + } + if (getBatchSizeBytes() != null) { + write = write.withBatchSizeBytes(getBatchSizeBytes()); + } + if (getBatchMaxBufferingDuration() != null) { + write = write.withBatchMaxBufferingDuration(getBatchMaxBufferingDuration()); + } return input.apply("WriteFiles", write); } @@ -1291,6 +1348,21 @@ public Write withNoSpilling() { return new Write(inner.withNoSpilling()); } + /** See {@link TypedWrite#withBatchSize(Integer)}. */ + public Write withBatchSize(@Nullable Integer batchSize) { + return new Write(inner.withBatchSize(batchSize)); + } + + /** See {@link TypedWrite#withBatchSizeBytes(Integer)}. */ + public Write withBatchSizeBytes(@Nullable Integer batchSizeBytes) { + return new Write(inner.withBatchSizeBytes(batchSizeBytes)); + } + + /** See {@link TypedWrite#withBatchMaxBufferingDuration(Duration)}. */ + public Write withBatchMaxBufferingDuration(@Nullable Duration batchMaxBufferingDuration) { + return new Write(inner.withBatchMaxBufferingDuration(batchMaxBufferingDuration)); + } + /** * Specify that output filenames are wanted. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index ff2b7e013fd5..cb48931958ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -148,9 +148,9 @@ public abstract class WriteFiles // The record count and buffering duration to trigger flushing records to a tmp file. Mainly used // for writing unbounded data to avoid generating too many small files. - private static final int FILE_TRIGGERING_RECORD_COUNT = 100000; - private static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB as of now - private static final Duration FILE_TRIGGERING_RECORD_BUFFERING_DURATION = + public static final int FILE_TRIGGERING_RECORD_COUNT = 100000; + public static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB as of now + public static final Duration FILE_TRIGGERING_RECORD_BUFFERING_DURATION = Duration.standardSeconds(5); static final int UNKNOWN_SHARDNUM = -1; @@ -196,6 +196,12 @@ public static WriteFiles> getSideInputs(); public abstract @Nullable ShardingFunction getShardingFunction(); @@ -226,6 +232,14 @@ abstract Builder setMaxNumWritersPerBundle( abstract Builder setSkipIfEmpty(boolean skipIfEmpty); + abstract Builder setBatchSize(@Nullable Integer batchSize); + + abstract Builder setBatchSizeBytes( + @Nullable Integer batchSizeBytes); + + abstract Builder setBatchMaxBufferingDuration( + @Nullable Duration batchMaxBufferingDuration); + abstract Builder setSideInputs( List> sideInputs); @@ -286,6 +300,38 @@ public WriteFiles withSkipIfEmpty(boolean skipIfEm return toBuilder().setSkipIfEmpty(skipIfEmpty).build(); } + /** + * Returns a new {@link WriteFiles} that will batch the input records using specified batch size. + * The default value is {@link #FILE_TRIGGERING_RECORD_COUNT}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public WriteFiles withBatchSize(@Nullable Integer batchSize) { + return toBuilder().setBatchSize(batchSize).build(); + } + + /** + * Returns a new {@link WriteFiles} that will batch the input records using specified batch size + * in bytes. The default value is {@link #FILE_TRIGGERING_BYTE_COUNT}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public WriteFiles withBatchSizeBytes( + @Nullable Integer batchSizeBytes) { + return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); + } + + /** + * Returns a new {@link WriteFiles} that will batch the input records using specified max + * buffering duration. The default value is {@link #FILE_TRIGGERING_RECORD_BUFFERING_DURATION}. + * + *

This option is used only for writing unbounded data with auto-sharding. + */ + public WriteFiles withBatchMaxBufferingDuration( + @Nullable Duration batchMaxBufferingDuration) { + return toBuilder().setBatchMaxBufferingDuration(batchMaxBufferingDuration).build(); + } + public WriteFiles withSideInputs( List> sideInputs) { return toBuilder().setSideInputs(sideInputs).build(); @@ -445,7 +491,12 @@ public WriteFilesResult expand(PCollection input) { tempFileResults = input.apply( "WriteAutoShardedBundlesToTempFiles", - new WriteAutoShardedBundlesToTempFiles(destinationCoder, fileResultCoder)); + new WriteAutoShardedBundlesToTempFiles( + destinationCoder, + fileResultCoder, + getBatchSize(), + getBatchSizeBytes(), + getBatchMaxBufferingDuration())); } } @@ -887,11 +938,24 @@ private class WriteAutoShardedBundlesToTempFiles extends PTransform, PCollection>>> { private final Coder destinationCoder; private final Coder> fileResultCoder; + private final int batchSize; + private final int batchSizeBytes; + private final Duration maxBufferingDuration; private WriteAutoShardedBundlesToTempFiles( - Coder destinationCoder, Coder> fileResultCoder) { + Coder destinationCoder, + Coder> fileResultCoder, + Integer batchSize, + Integer batchSizeBytes, + Duration maxBufferingDuration) { this.destinationCoder = destinationCoder; this.fileResultCoder = fileResultCoder; + this.batchSize = batchSize != null ? batchSize : FILE_TRIGGERING_RECORD_COUNT; + this.batchSizeBytes = batchSizeBytes != null ? batchSizeBytes : FILE_TRIGGERING_BYTE_COUNT; + this.maxBufferingDuration = + maxBufferingDuration != null + ? maxBufferingDuration + : FILE_TRIGGERING_RECORD_BUFFERING_DURATION; } @Override @@ -919,9 +983,9 @@ public PCollection>> expand(PCollection inp .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder())) .apply( "ShardAndBatch", - GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) - .withByteSize(FILE_TRIGGERING_BYTE_COUNT) - .withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION) + GroupIntoBatches.ofSize(batchSize) + .withByteSize(batchSizeBytes) + .withMaxBufferingDuration(maxBufferingDuration) .withShardedKey()) .setCoder( KvCoder.of( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java index 7551db057063..312605f3fcc5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java @@ -63,6 +63,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesUnboundedPCollections; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; @@ -97,6 +98,10 @@ public class TextIOWriteTest { private static final String MY_HEADER = "myHeader"; private static final String MY_FOOTER = "myFooter"; + private static final int CUSTOM_FILE_TRIGGERING_RECORD_COUNT = 50000; + private static final int CUSTOM_FILE_TRIGGERING_BYTE_COUNT = 32 * 1024 * 1024; // 32MiB + private static final Duration CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION = + Duration.standardSeconds(4); @Rule public transient TemporaryFolder tempFolder = new TemporaryFolder(); @@ -698,6 +703,42 @@ public void testRuntimeOptionsNotCalledInApply() throws Exception { p.apply(Create.of("")).apply(TextIO.write().to(options.getOutput())); } + @Test + @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) + public void testWriteUnboundedWithCustomBatchParameters() throws Exception { + Coder coder = StringUtf8Coder.of(); + String outputName = "file.txt"; + Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite"); + ResourceId baseFilename = + FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); + + PCollection input = + p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))); + + TextIO.Write write = + TextIO.write() + .to(baseFilename) + .withWindowedWrites() + .withShardNameTemplate(DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE) + .withBatchSize(CUSTOM_FILE_TRIGGERING_RECORD_COUNT) + .withBatchSizeBytes(CUSTOM_FILE_TRIGGERING_BYTE_COUNT) + .withBatchMaxBufferingDuration(CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION); + + input.apply(write); + p.run(); + + assertOutputFiles( + LINES2_ARRAY, + null, + null, + 3, + baseFilename, + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE, + false); + } + @Test @Category(NeedsRunner.class) public void testWindowedWritesWithOnceTrigger() throws Throwable { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 0eb1036f758f..0ab8efac7eb1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -118,6 +118,11 @@ public class WriteFilesTest { @Rule public final TestPipeline p = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); + private static final int CUSTOM_FILE_TRIGGERING_RECORD_COUNT = 50000; + private static final int CUSTOM_FILE_TRIGGERING_BYTE_COUNT = 32 * 1024 * 1024; // 32MiB + private static final Duration CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION = + Duration.standardSeconds(4); + @SuppressWarnings("unchecked") // covariant cast private static final PTransform, PCollection> IDENTITY_MAP = (PTransform) @@ -344,6 +349,23 @@ public void testWithShardingUnbounded() throws IOException { true); } + @Test + @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) + public void testWriteUnboundedWithCustomBatchParameters() throws IOException { + runShardedWrite( + Arrays.asList("one", "two", "three", "four", "five", "six"), + Window.into(FixedWindows.of(Duration.standardSeconds(10))), + getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink()) + .withWindowedWrites() + .withAutoSharding() + .withBatchSize(CUSTOM_FILE_TRIGGERING_RECORD_COUNT) + .withBatchSizeBytes(CUSTOM_FILE_TRIGGERING_BYTE_COUNT) + .withBatchMaxBufferingDuration(CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION), + null, + true); + } + @Test @Category({ NeedsRunner.class,