diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 9d4b75d9c98b1..9abce9ecf5f6e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -571,7 +572,8 @@ private static TypedWrite.Builder default .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC) .setMetadata(ImmutableMap.of()) .setWindowedWrites(false) - .setNoSpilling(false); + .setNoSpilling(false) + .setSyncInterval(DataFileConstants.DEFAULT_SYNC_INTERVAL); } @Experimental(Kind.SCHEMAS) @@ -1318,6 +1320,8 @@ public abstract static class TypedWrite abstract boolean getGenericRecords(); + abstract int getSyncInterval(); + abstract @Nullable Schema getSchema(); abstract boolean getWindowedWrites(); @@ -1362,6 +1366,8 @@ abstract Builder setShardTemplate( abstract Builder setGenericRecords(boolean genericRecords); + abstract Builder setSyncInterval(int syncInterval); + abstract Builder setSchema(Schema schema); abstract Builder setWindowedWrites(boolean windowedWrites); @@ -1473,6 +1479,14 @@ public TypedWrite to( .build(); } + /** + * Sets the approximate number of uncompressed bytes to write in each block for the AVRO + * container format. + */ + public TypedWrite withSyncInterval(int syncInterval) { + return toBuilder().setSyncInterval(syncInterval).build(); + } + /** * Sets the output schema. Can only be used when the output type is {@link GenericRecord} and * when not using {@link #to(DynamicAvroDestinations)}. @@ -1659,7 +1673,11 @@ public WriteFilesResult expand(PCollection input) { } WriteFiles write = WriteFiles.to( - new AvroSink<>(tempDirectory, resolveDynamicDestinations(), getGenericRecords())); + new AvroSink<>( + tempDirectory, + resolveDynamicDestinations(), + getGenericRecords(), + getSyncInterval())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -1742,10 +1760,16 @@ public Write to(DynamicAvroDestinations dynamicDestinations) { return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null)); } + /** See {@link TypedWrite#withSyncInterval}. */ + public Write withSyncInterval(int syncInterval) { + return new Write<>(inner.withSyncInterval(syncInterval)); + } + /** See {@link TypedWrite#withSchema}. */ public Write withSchema(Schema schema) { return new Write<>(inner.withSchema(schema)); } + /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ @Experimental(Kind.FILESYSTEM) public Write withTempDirectory(ValueProvider tempDirectory) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index acdedf9c0a61a..fe463d704a581 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -39,6 +39,7 @@ public class AvroSink extends FileBasedSink { private final boolean genericRecords; + private final int syncInterval; @FunctionalInterface public interface DatumWriterFactory extends Serializable { @@ -48,10 +49,12 @@ public interface DatumWriterFactory extends Serializable { AvroSink( ValueProvider outputPrefix, DynamicAvroDestinations dynamicDestinations, - boolean genericRecords) { + boolean genericRecords, + int syncInterval) { // Avro handles compression internally using the codec. super(outputPrefix, dynamicDestinations, Compression.UNCOMPRESSED); this.genericRecords = genericRecords; + this.syncInterval = syncInterval; } @Override @@ -61,7 +64,7 @@ public DynamicAvroDestinations getDynamicDestinati @Override public WriteOperation createWriteOperation() { - return new AvroWriteOperation<>(this, genericRecords); + return new AvroWriteOperation<>(this, genericRecords, syncInterval); } /** A {@link WriteOperation WriteOperation} for Avro files. */ @@ -69,16 +72,19 @@ private static class AvroWriteOperation extends WriteOperation { private final DynamicAvroDestinations dynamicDestinations; private final boolean genericRecords; + private final int syncInterval; - private AvroWriteOperation(AvroSink sink, boolean genericRecords) { + private AvroWriteOperation( + AvroSink sink, boolean genericRecords, int syncInterval) { super(sink); this.dynamicDestinations = sink.getDynamicDestinations(); this.genericRecords = genericRecords; + this.syncInterval = syncInterval; } @Override public Writer createWriter() throws Exception { - return new AvroWriter<>(this, dynamicDestinations, genericRecords); + return new AvroWriter<>(this, dynamicDestinations, genericRecords, syncInterval); } } @@ -90,14 +96,17 @@ private static class AvroWriter extends Writer dynamicDestinations; private final boolean genericRecords; + private final int syncInterval; public AvroWriter( WriteOperation writeOperation, DynamicAvroDestinations dynamicDestinations, - boolean genericRecords) { + boolean genericRecords, + int syncInterval) { super(writeOperation, MimeTypes.BINARY); this.dynamicDestinations = dynamicDestinations; this.genericRecords = genericRecords; + this.syncInterval = syncInterval; } @SuppressWarnings("deprecation") // uses internal test functionality. @@ -133,6 +142,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { + v.getClass().getSimpleName()); } } + dataFileWriter.setSyncInterval(syncInterval); dataFileWriter.create(schema, Channels.newOutputStream(channel)); }