From 75a463790d7912c475939f8c63d62ffba2002b0a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:50:00 -0400 Subject: [PATCH] Support Managed Iceberg streaming writes (#32451) * iceberg streaming writes * cleanup * adress comments --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/expansion-service/build.gradle | 3 +- sdks/java/io/iceberg/hive/exec/build.gradle | 15 +++- .../sdk/io/iceberg/AppendFilesToTables.java | 19 ++++- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 51 +++++++++++- .../IcebergWriteSchemaTransformProvider.java | 80 ++++++++++++++++--- .../beam/sdk/io/iceberg/RecordWriter.java | 7 +- .../sdk/io/iceberg/RecordWriterManager.java | 29 +++---- .../sdk/io/iceberg/WriteToDestinations.java | 50 +++++++++++- .../beam/sdk/io/iceberg/IcebergIOIT.java | 74 +++++++++++++++++ .../sdk/io/iceberg/IcebergIOWriteTest.java | 62 ++++++++++++++ ...ebergWriteSchemaTransformProviderTest.java | 5 +- 12 files changed, 355 insertions(+), 42 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 1efc8e9e4405..3f63c0c9975f 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 498950b3dc47..6097e5f5a5a5 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -47,8 +47,7 @@ dependencies { // **** IcebergIO runtime dependencies **** runtimeOnly library.java.hadoop_client // Needed when using GCS as the warehouse location. - implementation library.java.bigdataoss_gcs_connector - permitUnusedDeclared library.java.bigdataoss_gcs_connector + runtimeOnly library.java.bigdataoss_gcs_connector // Needed for HiveCatalog runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2") runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow") diff --git a/sdks/java/io/iceberg/hive/exec/build.gradle b/sdks/java/io/iceberg/hive/exec/build.gradle index bb0b147c5a85..f266ab2ef4db 100644 --- a/sdks/java/io/iceberg/hive/exec/build.gradle +++ b/sdks/java/io/iceberg/hive/exec/build.gradle @@ -39,10 +39,17 @@ artifacts { shadowJar { zip64 true - relocate 'com.google.common', getJavaRelocatedPath('iceberg.hive.com.google.common') - relocate 'com.google.protobuf', getJavaRelocatedPath('iceberg.hive.com.google.protobuf') - relocate 'shaded.parquet', getJavaRelocatedPath('iceberg.hive.shaded.parquet') - relocate 'org.apache.parquet', getJavaRelocatedPath('iceberg.hive.org.apache.parquet') + def problematicPackages = [ + 'com.google.protobuf', + 'com.google.common', + 'shaded.parquet', + 'org.apache.parquet', + 'org.joda' + ] + + problematicPackages.forEach { + relocate it, getJavaRelocatedPath("iceberg.hive.${it}") + } version "3.1.3" mergeServiceFiles() diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index bb42df5a9330..b26ae83f0866 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -19,6 +19,8 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; @@ -29,14 +31,17 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class AppendFilesToTables extends PTransform, PCollection>> { - + private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class); private final IcebergCatalogConfig catalogConfig; AppendFilesToTables(IcebergCatalogConfig catalogConfig) { @@ -66,6 +71,8 @@ public String apply(FileWriteResult input) { private static class AppendFilesToTablesDoFn extends DoFn>, KV> { + private final Counter snapshotsCreated = + Metrics.counter(AppendFilesToTables.class, "snapshotsCreated"); private final IcebergCatalogConfig catalogConfig; @@ -87,15 +94,21 @@ public void processElement( @Element KV> element, OutputReceiver> out, BoundedWindow window) { + if (!element.getValue().iterator().hasNext()) { + return; + } + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); AppendFiles update = table.newAppend(); for (FileWriteResult writtenFile : element.getValue()) { update.appendManifest(writtenFile.getManifestFile()); } update.commit(); + Snapshot snapshot = table.currentSnapshot(); + LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot); + snapshotsCreated.inc(); out.outputWithTimestamp( - KV.of(element.getKey(), SnapshotInfo.fromSnapshot(table.currentSnapshot())), - window.maxTimestamp()); + KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index c3c1da7c7885..0f9612339f48 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; @@ -25,6 +26,12 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -33,6 +40,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; /** * The underlying Iceberg connector used by {@link org.apache.beam.sdk.managed.Managed#ICEBERG}. Not @@ -49,6 +57,7 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) { @AutoValue public abstract static class WriteRows extends PTransform, IcebergWriteResult> { + private static final int TRIGGERING_RECORD_COUNT = 50_000; abstract IcebergCatalogConfig getCatalogConfig(); @@ -56,6 +65,8 @@ public abstract static class WriteRows extends PTransform, Iceb abstract @Nullable DynamicDestinations getDynamicDestinations(); + abstract @Nullable Duration getTriggeringFrequency(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -66,6 +77,8 @@ abstract static class Builder { abstract Builder setDynamicDestinations(DynamicDestinations destinations); + abstract Builder setTriggeringFrequency(Duration triggeringFrequency); + abstract WriteRows build(); } @@ -77,6 +90,21 @@ public WriteRows to(DynamicDestinations destinations) { return toBuilder().setDynamicDestinations(destinations).build(); } + /** + * Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot} + * is produced. + * + *

Roughly every triggeringFrequency duration, this connector will try to accumulate all + * {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each + * commit results in a new table {@link org.apache.iceberg.Snapshot}. + * + *

This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming + * pipeline). + */ + public WriteRows withTriggeringFrequency(Duration triggeringFrequency) { + return toBuilder().setTriggeringFrequency(triggeringFrequency).build(); + } + @Override public IcebergWriteResult expand(PCollection input) { List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); @@ -89,11 +117,32 @@ public IcebergWriteResult expand(PCollection input) { destinations = DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier())); } + + if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { + Duration triggeringFrequency = getTriggeringFrequency(); + checkArgumentNotNull( + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + input = + input.apply( + "WindowIntoGlobal", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(triggeringFrequency), + AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT)))) + .discardingFiredPanes()); + } else { + Preconditions.checkArgument( + getTriggeringFrequency() == null, + "Triggering frequency is only applicable for streaming pipelines."); + } return input .apply("Set Destination Metadata", new AssignDestinations(destinations)) .apply( "Write Rows to Destinations", - new WriteToDestinations(getCatalogConfig(), destinations)); + new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 3f0f88946d9c..9f1b51cf2300 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -17,13 +17,20 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration; + import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.managed.ManagedTransformConstants; +import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -35,6 +42,8 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; /** * SchemaTransform implementation for {@link IcebergIO#writeRows}. Writes Beam Rows to Iceberg and @@ -42,7 +51,7 @@ */ @AutoService(SchemaTransformProvider.class) public class IcebergWriteSchemaTransformProvider - extends TypedSchemaTransformProvider { + extends TypedSchemaTransformProvider { static final String INPUT_TAG = "input"; static final String OUTPUT_TAG = "output"; @@ -57,8 +66,55 @@ public String description() { + "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), \"manifestListLocation\" (str)}"; } + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_IcebergWriteSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("Identifier of the Iceberg table.") + public abstract String getTable(); + + @SchemaFieldDescription("Name of the catalog containing the table.") + public abstract @Nullable String getCatalogName(); + + @SchemaFieldDescription("Properties used to set up the Iceberg catalog.") + public abstract @Nullable Map getCatalogProperties(); + + @SchemaFieldDescription("Properties passed to the Hadoop Configuration.") + public abstract @Nullable Map getConfigProperties(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the frequency at which snapshots are produced.") + public abstract @Nullable Integer getTriggeringFrequencySeconds(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String table); + + public abstract Builder setCatalogName(String catalogName); + + public abstract Builder setCatalogProperties(Map catalogProperties); + + public abstract Builder setConfigProperties(Map confProperties); + + public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + + public abstract Configuration build(); + } + + public IcebergCatalogConfig getIcebergCatalog() { + return IcebergCatalogConfig.builder() + .setCatalogName(getCatalogName()) + .setCatalogProperties(getCatalogProperties()) + .setConfigProperties(getConfigProperties()) + .build(); + } + } + @Override - protected SchemaTransform from(SchemaTransformConfiguration configuration) { + protected SchemaTransform from(Configuration configuration) { return new IcebergWriteSchemaTransform(configuration); } @@ -78,9 +134,9 @@ public String identifier() { } static class IcebergWriteSchemaTransform extends SchemaTransform { - private final SchemaTransformConfiguration configuration; + private final Configuration configuration; - IcebergWriteSchemaTransform(SchemaTransformConfiguration configuration) { + IcebergWriteSchemaTransform(Configuration configuration) { this.configuration = configuration; } @@ -89,7 +145,7 @@ Row getConfigurationRow() { // To stay consistent with our SchemaTransform configuration naming conventions, // we sort lexicographically and convert field names to snake_case return SchemaRegistry.createDefault() - .getToRowFunction(SchemaTransformConfiguration.class) + .getToRowFunction(Configuration.class) .apply(configuration) .sorted() .toSnakeCase(); @@ -102,11 +158,17 @@ Row getConfigurationRow() { public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection rows = input.get(INPUT_TAG); + IcebergIO.WriteRows writeTransform = + IcebergIO.writeRows(configuration.getIcebergCatalog()) + .to(TableIdentifier.parse(configuration.getTable())); + + Integer trigFreq = configuration.getTriggeringFrequencySeconds(); + if (trigFreq != null) { + writeTransform = writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq)); + } + // TODO: support dynamic destinations - IcebergWriteResult result = - rows.apply( - IcebergIO.writeRows(configuration.getIcebergCatalog()) - .to(TableIdentifier.parse(configuration.getTable()))); + IcebergWriteResult result = rows.apply(writeTransform); PCollection snapshots = result diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 6bbb103e17cf..1434400563bb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -36,7 +36,8 @@ class RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class); - private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters"); + private final Counter activeIcebergWriters = + Metrics.counter(RecordWriterManager.class, "activeIcebergWriters"); private final DataWriter icebergDataWriter; private final Table table; private final String absoluteFilename; @@ -92,7 +93,7 @@ class RecordWriter { default: throw new RuntimeException("Unknown File Format: " + fileFormat); } - activeWriters.inc(); + activeIcebergWriters.inc(); LOG.info( "Opened {} writer for table {}, partition {}. Writing to path: {}", fileFormat, @@ -115,7 +116,7 @@ public void close() throws IOException { fileFormat, table.name(), absoluteFilename), e); } - activeWriters.dec(); + activeIcebergWriters.dec(); LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index b16f0caeb81b..5979e2a60131 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.WindowedValue; @@ -47,8 +49,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions. @@ -60,8 +60,9 @@ * *

A {@link DestinationState} maintains its writers in a {@link Cache}. If a {@link RecordWriter} * is inactive for 1 minute, the {@link DestinationState} will automatically close it to free up - * resources. Calling {@link #close()} on this {@link RecordWriterManager} will do the following for - * each {@link DestinationState}: + * resources. When a data writer is closed, its resulting {@link DataFile} gets written. Calling + * {@link #close()} on this {@link RecordWriterManager} will do the following for each {@link + * DestinationState}: * *

    *
  1. Close all underlying {@link RecordWriter}s @@ -73,7 +74,10 @@ * #getManifestFiles()}. */ class RecordWriterManager implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class); + private final Counter dataFilesWritten = + Metrics.counter(RecordWriterManager.class, "dataFilesWritten"); + private final Counter manifestFilesWritten = + Metrics.counter(RecordWriterManager.class, "manifestFilesWritten"); /** * Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per @@ -88,6 +92,7 @@ class DestinationState { private final PartitionKey partitionKey; private final String tableLocation; private final FileIO fileIO; + private final Table table; private final String stateToken = UUID.randomUUID().toString(); private final List dataFiles = Lists.newArrayList(); @VisibleForTesting final Cache writers; @@ -100,6 +105,7 @@ class DestinationState { this.partitionKey = new PartitionKey(spec, schema); this.tableLocation = table.location(); this.fileIO = table.io(); + this.table = table; // build a cache of RecordWriters. // writers will expire after 1 min of idle time. @@ -123,6 +129,7 @@ class DestinationState { } openWriters--; dataFiles.add(recordWriter.getDataFile()); + dataFilesWritten.inc(); }) .build(); } @@ -170,8 +177,8 @@ private RecordWriter createWriter(PartitionKey partitionKey) { try { RecordWriter writer = new RecordWriter( - catalog, - icebergDestination, + table, + icebergDestination.getFileFormat(), filePrefix + "_" + stateToken + "_" + recordIndex, partitionKey); openWriters++; @@ -261,13 +268,7 @@ public void close() throws IOException { manifestWriter = openWriter; } ManifestFile manifestFile = manifestWriter.toManifestFile(); - - LOG.info( - "Successfully wrote manifest file, adding {} data files ({} rows) to table '{}': {}.", - manifestFile.addedFilesCount(), - manifestFile.addedRowsCount(), - windowedDestination.getValue().getTableIdentifier(), - outputFile.location()); + manifestFilesWritten.inc(); totalManifestFiles .computeIfAbsent(windowedDestination, dest -> Lists.newArrayList()) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index 65fd551c782a..f71ff24a1a37 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -28,24 +28,35 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ShardedKey; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; class WriteToDestinations extends PTransform, IcebergWriteResult> { static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB static final int DEFAULT_NUM_FILE_SHARDS = 0; - static final int FILE_TRIGGERING_RECORD_COUNT = 50_000; private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; + private final @Nullable Duration triggeringFrequency; - WriteToDestinations(IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations) { + WriteToDestinations( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + @Nullable Duration triggeringFrequency) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; + this.triggeringFrequency = triggeringFrequency; } @Override @@ -108,11 +119,44 @@ public KV, Row> apply(Row elem) { "Write remaining rows to files", new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations)); + PCollection writeUngroupedResultPColl = writeUngroupedResult.getWrittenFiles(); + + if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { + // for streaming pipelines, re-window both outputs to keep Flatten happy + writeGroupedResult = + writeGroupedResult.apply( + "RewindowGroupedRecords", + Window.into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()); + writeUngroupedResultPColl = + writeUngroupedResultPColl.apply( + "RewindowUnGroupedRecords", + Window.into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()); + } + PCollection allWrittenFiles = - PCollectionList.of(writeUngroupedResult.getWrittenFiles()) + PCollectionList.of(writeUngroupedResultPColl) .and(writeGroupedResult) .apply("Flatten Written Files", Flatten.pCollections()); + if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { + checkArgumentNotNull( + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + // apply the user's trigger before we start committing and creating snapshots + allWrittenFiles = + allWrittenFiles.apply( + "ApplyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) + .discardingFiredPanes()); + } + // Apply any sharded writes and flatten everything for catalog updates PCollection> snapshots = allWrittenFiles.apply(new AppendFilesToTables(catalogConfig)); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 2e748e9644e8..8c6d3d99e35e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -19,10 +19,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -33,9 +35,14 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PeriodicImpulse; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AppendFiles; @@ -57,6 +64,8 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -307,4 +316,69 @@ public void testWritePartitionedData() { assertThat( returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); } + + @Test + public void testStreamingWrite() { + PartitionSpec partitionSpec = + PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); + Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); + + Map config = new HashMap<>(managedIcebergConfig()); + config.put("triggering_frequency_seconds", 4); + + // over a span of 10 seconds, create elements from longs in range [0, 1000) + PCollection input = + pipeline + .apply( + PeriodicImpulse.create() + .stopAfter(Duration.millis(9_990)) + .withInterval(Duration.millis(10))) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via(instant -> ROW_FUNC.apply((instant.getMillis() / 10) % 1000))) + .setRowSchema(BEAM_SCHEMA); + + assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED)); + + input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); + pipeline.run().waitUntilFinish(); + + List returnedRecords = readRecords(table); + assertThat( + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + } + + @Test + public void testStreamingWriteWithPriorWindowing() { + PartitionSpec partitionSpec = + PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); + Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); + + Map config = new HashMap<>(managedIcebergConfig()); + config.put("triggering_frequency_seconds", 4); + + // over a span of 10 seconds, create elements from longs in range [0, 1000) + PCollection input = + pipeline + .apply( + PeriodicImpulse.create() + .stopAfter(Duration.millis(9_990)) + .withInterval(Duration.millis(10))) + .apply( + Window.into(FixedWindows.of(Duration.standardSeconds(1))) + .accumulatingFiredPanes()) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via(instant -> ROW_FUNC.apply((instant.getMillis() / 10) % 1000))) + .setRowSchema(BEAM_SCHEMA); + + assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED)); + + input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); + pipeline.run().waitUntilFinish(); + + List returnedRecords = readRecords(table); + assertThat( + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 2abe6b093481..d3bf13a16787 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -25,8 +25,13 @@ import java.util.Map; import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -47,6 +52,8 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -303,4 +310,59 @@ public void testIdempotentCommit() throws Exception { secondUpdate.appendFile(dataFile); secondUpdate.commit(); } + + @Test + public void testStreamingWrite() { + TableIdentifier tableId = + TableIdentifier.of( + "default", "streaming_" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + // Create a table and add records to it. + Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + List inputRows = TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1); + TestStream stream = + TestStream.create(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .advanceWatermarkTo(new Instant(0)) + // the first two rows are written within the same triggering interval, + // so they should both be in the first snapshot + .addElements(inputRows.get(0)) + .advanceProcessingTime(Duration.standardSeconds(1)) + .addElements(inputRows.get(1)) + .advanceProcessingTime(Duration.standardSeconds(5)) + // the third row is written in a new triggering interval, + // so we create a new snapshot for it. + .addElements(inputRows.get(2)) + .advanceProcessingTime(Duration.standardSeconds(5)) + .advanceWatermarkToInfinity(); + + PCollection> output = + testPipeline + .apply("Stream Records", stream) + .apply( + "Append To Table", + IcebergIO.writeRows(catalog) + .to(tableId) + .withTriggeringFrequency(Duration.standardSeconds(3))) + .getSnapshots(); + // verify that 2 snapshots are created (one per triggering interval) + PCollection snapshots = output.apply(Count.globally()); + PAssert.that(snapshots).containsInAnyOrder(1L, 1L); + testPipeline.run().waitUntilFinish(); + + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 6b555e7e14d0..779687c97768 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration; import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG; import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.OUTPUT_TAG; import static org.hamcrest.MatcherAssert.assertThat; @@ -88,8 +89,8 @@ public void testSimpleAppend() { properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); properties.put("warehouse", warehouse.location); - SchemaTransformConfiguration config = - SchemaTransformConfiguration.builder() + Configuration config = + Configuration.builder() .setTable(identifier) .setCatalogName("name") .setCatalogProperties(properties)