Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow avro block size to be configurable in AvroIO.write #24454

Merged
merged 2 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -571,7 +572,8 @@ private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> default
.setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
.setMetadata(ImmutableMap.of())
.setWindowedWrites(false)
.setNoSpilling(false);
.setNoSpilling(false)
.setSyncInterval(DataFileConstants.DEFAULT_SYNC_INTERVAL);
}

@Experimental(Kind.SCHEMAS)
Expand Down Expand Up @@ -1318,6 +1320,8 @@ public abstract static class TypedWrite<UserT, DestinationT, OutputT>

abstract boolean getGenericRecords();

abstract int getSyncInterval();

abstract @Nullable Schema getSchema();

abstract boolean getWindowedWrites();
Expand Down Expand Up @@ -1362,6 +1366,8 @@ abstract Builder<UserT, DestinationT, OutputT> setShardTemplate(

abstract Builder<UserT, DestinationT, OutputT> setGenericRecords(boolean genericRecords);

abstract Builder<UserT, DestinationT, OutputT> setSyncInterval(int syncInterval);

abstract Builder<UserT, DestinationT, OutputT> setSchema(Schema schema);

abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean windowedWrites);
Expand Down Expand Up @@ -1473,6 +1479,14 @@ public <NewDestinationT> TypedWrite<UserT, NewDestinationT, OutputT> to(
.build();
}

/**
* Sets the approximate number of uncompressed bytes to write in each block for the AVRO
* container format.
*/
public TypedWrite<UserT, DestinationT, OutputT> withSyncInterval(int syncInterval) {
return toBuilder().setSyncInterval(syncInterval).build();
}

/**
* Sets the output schema. Can only be used when the output type is {@link GenericRecord} and
* when not using {@link #to(DynamicAvroDestinations)}.
Expand Down Expand Up @@ -1659,7 +1673,11 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
}
WriteFiles<UserT, DestinationT, OutputT> write =
WriteFiles.to(
new AvroSink<>(tempDirectory, resolveDynamicDestinations(), getGenericRecords()));
new AvroSink<>(
tempDirectory,
resolveDynamicDestinations(),
getGenericRecords(),
getSyncInterval()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
Expand Down Expand Up @@ -1742,10 +1760,16 @@ public Write<T> to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) {
return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null));
}

/** See {@link TypedWrite#withSyncInterval}. */
public Write<T> withSyncInterval(int syncInterval) {
return new Write<>(inner.withSyncInterval(syncInterval));
}

/** See {@link TypedWrite#withSchema}. */
public Write<T> withSchema(Schema schema) {
return new Write<>(inner.withSchema(schema));
}

/** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
@Experimental(Kind.FILESYSTEM)
public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
Expand Down
20 changes: 15 additions & 5 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class AvroSink<UserT, DestinationT, OutputT>
extends FileBasedSink<UserT, DestinationT, OutputT> {
private final boolean genericRecords;
private final int syncInterval;

@FunctionalInterface
public interface DatumWriterFactory<T> extends Serializable {
Expand All @@ -48,10 +49,12 @@ public interface DatumWriterFactory<T> extends Serializable {
AvroSink(
ValueProvider<ResourceId> outputPrefix,
DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
boolean genericRecords) {
boolean genericRecords,
int syncInterval) {
// Avro handles compression internally using the codec.
super(outputPrefix, dynamicDestinations, Compression.UNCOMPRESSED);
this.genericRecords = genericRecords;
this.syncInterval = syncInterval;
}

@Override
Expand All @@ -61,24 +64,27 @@ public DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinati

@Override
public WriteOperation<DestinationT, OutputT> createWriteOperation() {
return new AvroWriteOperation<>(this, genericRecords);
return new AvroWriteOperation<>(this, genericRecords, syncInterval);
}

/** A {@link WriteOperation WriteOperation} for Avro files. */
private static class AvroWriteOperation<DestinationT, OutputT>
extends WriteOperation<DestinationT, OutputT> {
private final DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations;
private final boolean genericRecords;
private final int syncInterval;

private AvroWriteOperation(AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords) {
private AvroWriteOperation(
AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords, int syncInterval) {
super(sink);
this.dynamicDestinations = sink.getDynamicDestinations();
this.genericRecords = genericRecords;
this.syncInterval = syncInterval;
}

@Override
public Writer<DestinationT, OutputT> createWriter() throws Exception {
return new AvroWriter<>(this, dynamicDestinations, genericRecords);
return new AvroWriter<>(this, dynamicDestinations, genericRecords, syncInterval);
}
}

Expand All @@ -90,14 +96,17 @@ private static class AvroWriter<DestinationT, OutputT> extends Writer<Destinatio

private final DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations;
private final boolean genericRecords;
private final int syncInterval;

public AvroWriter(
WriteOperation<DestinationT, OutputT> writeOperation,
DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations,
boolean genericRecords) {
boolean genericRecords,
int syncInterval) {
super(writeOperation, MimeTypes.BINARY);
this.dynamicDestinations = dynamicDestinations;
this.genericRecords = genericRecords;
this.syncInterval = syncInterval;
}

@SuppressWarnings("deprecation") // uses internal test functionality.
Expand Down Expand Up @@ -133,6 +142,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
+ v.getClass().getSimpleName());
}
}
dataFileWriter.setSyncInterval(syncInterval);
dataFileWriter.create(schema, Channels.newOutputStream(channel));
}

Expand Down