Skip to content

Commit

Permalink
[yaml] Preserve windowing for unbounded input when using WriteToJson …
Browse files Browse the repository at this point in the history
…Java provider

Signed-off-by: Jeffrey Kinard <jeff@thekinards.com>
  • Loading branch information
Polber committed Sep 27, 2024
1 parent f2d0558 commit f3a5b25
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
Expand Down Expand Up @@ -134,10 +135,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (configuration.getDelimiter() != null) {
format = format.withDelimiter(configuration.getDelimiter().charAt(0));
}
WriteFilesResult<?> result =
input
.get(INPUT_ROWS_TAG)
.apply(CsvIO.writeRows(configuration.getPath(), format).withSuffix(""));

// Preserve input windowing
CsvIO.Write<Row> writeTransform =
CsvIO.writeRows(configuration.getPath(), format).withSuffix("");
if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) {
writeTransform = writeTransform.withWindowedWrites();
}

WriteFilesResult<?> result = input.get(INPUT_ROWS_TAG).apply(writeTransform);
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
Expand Down Expand Up @@ -121,8 +122,13 @@ protected static class JsonWriteTransform extends SchemaTransform {

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
WriteFilesResult<?> result =
input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix(""));
// Preserve input windowing
JsonIO.Write<Row> writeTransform = JsonIO.writeRows(configuration.getPath()).withSuffix("");
if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) {
writeTransform = writeTransform.withWindowedWrites();
}

WriteFilesResult<?> result = input.get(INPUT_ROWS_TAG).apply(writeTransform);
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,
Expand Down

0 comments on commit f3a5b25

Please sign in to comment.