diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java new file mode 100644 index 0000000000000..0d70a063b7503 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.it; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * End-to-end test of Cloud Spanner Change Streams with strict commit timestamp and transaction + * ordering. + */ +@RunWith(JUnit4.class) +public class SpannerChangeStreamOrderedByTimestampAndTransactionIdIT { + + private static final Logger LOG = + LoggerFactory.getLogger(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.class); + + @ClassRule public static final IntegrationTestEnv ENV = new IntegrationTestEnv(); + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private static String projectId; + private static String instanceId; + private static String databaseId; + private static String tableName; + private static String changeStreamName; + private static DatabaseClient databaseClient; + + @BeforeClass + public static void setup() throws InterruptedException, ExecutionException, TimeoutException { + projectId = ENV.getProjectId(); + instanceId = ENV.getInstanceId(); + databaseId = ENV.getDatabaseId(); + tableName = ENV.createSingersTable(); + changeStreamName = ENV.createChangeStreamFor(tableName); + databaseClient = ENV.getDatabaseClient(); + } + + @Test + public void testTransactionBoundaries() { + final SpannerConfig spannerConfig = + SpannerConfig.create() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withDatabaseId(databaseId); + // Commit a initial transaction to get the timestamp to start reading from. + List mutations = new ArrayList<>(); + mutations.add(insertRecordMutation(0, "FirstName0", "LastName0")); + final long timeIncrementInSeconds = 2; + final Timestamp startTimestamp = databaseClient.write(mutations); + writeTransactionsToDatabase(); + + // Sleep the time increment interval. + try { + Thread.sleep(timeIncrementInSeconds * 1000); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + + // This will be the second batch of transactions that will have strict timestamp ordering + // per key. + writeTransactionsToDatabase(); + + // Sleep the time increment interval. + try { + Thread.sleep(timeIncrementInSeconds * 1000); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + + // This will be the final batch of transactions that will have strict timestamp ordering + // per key. + com.google.cloud.Timestamp endTimestamp = writeTransactionsToDatabase(); + + final PCollection tokens = + pipeline + .apply( + SpannerIO.readChangeStream() + .withSpannerConfig(spannerConfig) + .withChangeStreamName(changeStreamName) + .withMetadataDatabase(databaseId) + .withInclusiveStartAt(startTimestamp) + .withInclusiveEndAt(endTimestamp)) + .apply( + ParDo.of( + new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.KeyBySortKeyFn())) + .apply( + ParDo.of( + new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT + .CreateArtificialKeyFn())) + .apply( + ParDo.of( + new BufferRecordsUntilOutputTimestamp(endTimestamp, timeIncrementInSeconds))) + .apply( + ParDo.of(new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.ToStringFn())); + + // Assert that the returned PCollection contains all six transactions (in string representation) + // and that each transaction contains, in order, the list of mutations added. + PAssert.that(tokens) + .containsInAnyOrder( + // Insert Singer 0 into the table. + "{\"SingerId\":\"0\"},INSERT\n" + + // Insert Singer 1 and 2 into the table, + + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" + + // Delete Singer 1 and Insert Singer 3 into the table. + + "{\"SingerId\":\"1\"},DELETE\n" + + "{\"SingerId\":\"3\"},INSERT\n" + + // Delete Singers 2 and 3. + + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n" + + // Delete Singer 0. + + "{\"SingerId\":\"0\"},DELETE\n", + + // Second batch of transactions. + // Insert Singer 1 and 2 into the table, + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" + + // Delete Singer 1 and Insert Singer 3 into the table. + + "{\"SingerId\":\"1\"},DELETE\n" + + "{\"SingerId\":\"3\"},INSERT\n" + + // Delete Singers 2 and 3. + + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n", + + // Third batch of transactions. + // Insert Singer 1 and 2 into the table, + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" + + // Delete Singer 1 and Insert Singer 3 into the table. + + "{\"SingerId\":\"1\"},DELETE\n" + + "{\"SingerId\":\"3\"},INSERT\n" + + // Delete Singers 2 and 3. + + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"); + + pipeline + .runWithAdditionalOptionArgs(Collections.singletonList("--streaming")) + .waitUntilFinish(); + } + + // KeyByTransactionIdFn takes in a DataChangeRecord and outputs a key-value pair of + // {SortKey, DataChangeRecord} + private static class KeyBySortKeyFn + extends DoFn< + DataChangeRecord, + KV> { + + private static final long serialVersionUID = 1270485392415293532L; + + @ProcessElement + public void processElement( + @Element DataChangeRecord record, + OutputReceiver< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + outputReceiver) { + outputReceiver.output( + KV.of( + new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey( + record.getCommitTimestamp(), record.getServerTransactionId()), + record)); + } + } + + // CreateArtificialKeyFn keys each input element by an artifical byte key. This is because buffers + // and timers are per key and window, and we want to buffer all data change records in a time + // interval, rather than buffer per key. + private static class CreateArtificialKeyFn + extends DoFn< + KV, + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> { + private static final long serialVersionUID = -3363057370822294686L; + + @ProcessElement + public void processElement( + @Element + KV + element, + OutputReceiver< + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> + outputReceiver) { + outputReceiver.output(KV.of(new byte[0], element)); + } + } + + // Timers and buffers are per-key. + // Buffer each data change record until the watermark passes the timestamp at which we want + // to output the buffered data change records. + // We utilize a looping timer to determine when to flush the buffer: + // + // 1. When we see a data change record for the first time (i.e. no data change records in + // the buffer), we will set the timer to fire at an interval after the data change record's + // timestamp. + // 2. Then, when the timer fires, if the current timer's expiration time is before the pipeline + // end time, if set, we still have data left to process. We will set the next timer to the + // current timer's expiration time plus incrementIntervalInSeconds. + // 3. Otherwise, we will not set a timer. + // + private static class BufferRecordsUntilOutputTimestamp + extends DoFn< + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>, + Iterable< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> { + private static final long serialVersionUID = 5050535558953049259L; + + private final long incrementIntervalInSeconds; + private final @Nullable Instant pipelineEndTime; + + private BufferRecordsUntilOutputTimestamp( + @Nullable com.google.cloud.Timestamp endTimestamp, long incrementIntervalInSeconds) { + this.incrementIntervalInSeconds = incrementIntervalInSeconds; + if (endTimestamp != null) { + this.pipelineEndTime = new Instant(endTimestamp.toSqlTimestamp()); + } else { + pipelineEndTime = null; + } + } + + @SuppressWarnings("unused") + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @SuppressWarnings("unused") + @StateId("buffer") + private final StateSpec< + BagState< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> + buffer = StateSpecs.bag(); + + @SuppressWarnings("unused") + @StateId("keySeen") + private final StateSpec> keySeen = StateSpecs.value(BooleanCoder.of()); + + @ProcessElement + public void process( + @Element + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + element, + @StateId("buffer") + BagState< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + buffer, + @TimerId("timer") Timer timer, + @StateId("keySeen") ValueState keySeen) { + buffer.add(element.getValue()); + + // Only set the timer if this is the first time we are receiving a data change record + // with this key. + Boolean hasKeyBeenSeen = keySeen.read(); + if (hasKeyBeenSeen == null) { + Instant commitTimestamp = + new Instant(element.getValue().getKey().getCommitTimestamp().toSqlTimestamp()); + Instant outputTimestamp = + commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds)); + LOG.debug("Setting timer at {} for key {}", outputTimestamp.toString(), element.getKey()); + timer.set(outputTimestamp); + keySeen.write(true); + } + } + + @OnTimer("timer") + public void onExpiry( + OnTimerContext context, + @StateId("buffer") + BagState< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + buffer, + @TimerId("timer") Timer timer) { + if (!buffer.isEmpty().read()) { + final List< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + records = + StreamSupport.stream(buffer.read().spliterator(), false) + .collect(Collectors.toList()); + buffer.clear(); + + List> + recordsToOutput = new ArrayList<>(); + for (KV + record : records) { + Instant recordCommitTimestamp = + new Instant(record.getKey().getCommitTimestamp().toSqlTimestamp()); + // When the watermark passes time T, this means that all records with event time < T + // have been processed and successfully committed. Since the timer fires when the + // watermark passes the expiration time, we should only output records with event time + // < expiration time. + final String recordString = getRecordString(record.getValue()); + if (recordCommitTimestamp.isBefore(context.timestamp())) { + LOG.debug( + "Outputting transactions {} with id {} at expiration timestamp {}", + recordString, + record.getKey().toString(), + context.timestamp().toString()); + recordsToOutput.add(record); + } else { + LOG.debug( + "Expired at {} but adding transaction {} back to buffer " + + "due to commit timestamp {}", + context.timestamp().toString(), + recordString, + recordCommitTimestamp.toString()); + buffer.add(record); + } + } + + // Output records, if there are any to output. + if (!recordsToOutput.isEmpty()) { + context.outputWithTimestamp(recordsToOutput, context.timestamp()); + LOG.debug( + "Expired at {}, outputting records for key {}", + context.timestamp().toString(), + recordsToOutput.get(0).getKey().toString()); + } else { + LOG.debug("Expired at {} with no records", context.timestamp().toString()); + } + } + + Instant nextTimer = + context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds)); + if (pipelineEndTime == null || context.timestamp().isBefore(pipelineEndTime)) { + // If the current timer's timestamp is before the pipeline end time, or there is no + // pipeline end time, we still have data left to process. + LOG.debug("Setting next timer to {}", nextTimer.toString()); + timer.set(nextTimer); + } else { + LOG.debug( + "Timer not being set as exceeded pipeline end time: " + pipelineEndTime.toString()); + } + } + } + + // ToStringFn takes in a list of key-value pairs of SortKey, Iterable and + // outputs a string representation. + private static class ToStringFn + extends DoFn< + Iterable< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>, + String> { + + private static final long serialVersionUID = 2307936669684679038L; + + @ProcessElement + public void processElement( + @Element + Iterable< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + element, + OutputReceiver outputReceiver) { + final StringBuilder builder = new StringBuilder(); + + List> + sortedTransactions = + StreamSupport.stream(element.spliterator(), false) + .sorted((kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey())) + .collect(Collectors.toList()); + + sortedTransactions.forEach( + record -> { + builder.append(getRecordString(record.getValue())); + }); + outputReceiver.output(builder.toString()); + } + } + + // Get a string representation of the mods and the mod type in the data change record. + private static String getRecordString(DataChangeRecord record) { + final StringBuilder builder = new StringBuilder(); + String modString = ""; + for (Mod mod : record.getMods()) { + modString += mod.getKeysJson(); + } + builder.append(String.join(",", modString, record.getModType().toString())); + builder.append("\n"); + return builder.toString(); + } + + private Timestamp writeTransactionsToDatabase() { + List mutations = new ArrayList<>(); + + // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table. + mutations.add(insertRecordMutation(1, "FirstName1", "LastName2")); + mutations.add(insertRecordMutation(2, "FirstName2", "LastName2")); + Timestamp t1 = databaseClient.write(mutations); + LOG.debug("The first transaction committed with timestamp: " + t1.toString()); + mutations.clear(); + + // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from the table. + mutations.add(insertRecordMutation(3, "FirstName3", "LastName3")); + mutations.add(deleteRecordMutation(1)); + Timestamp t2 = databaseClient.write(mutations); + LOG.debug("The second transaction committed with timestamp: " + t2.toString()); + mutations.clear(); + + // 3. Commit a transaction to delete Singer 2 and Singer 3 from the table. + mutations.add(deleteRecordMutation(2)); + mutations.add(deleteRecordMutation(3)); + Timestamp t3 = databaseClient.write(mutations); + LOG.debug("The third transaction committed with timestamp: " + t3.toString()); + mutations.clear(); + + // 4. Commit a transaction to delete Singer 0. + mutations.add(deleteRecordMutation(0)); + Timestamp t4 = databaseClient.write(mutations); + LOG.debug("The fourth transaction committed with timestamp: " + t4.toString()); + return t4; + } + + // Create an insert mutation. + private static Mutation insertRecordMutation(long singerId, String firstName, String lastName) { + return Mutation.newInsertBuilder(tableName) + .set("SingerId") + .to(singerId) + .set("FirstName") + .to(firstName) + .set("LastName") + .to(lastName) + .build(); + } + + // Create a delete mutation. + private static Mutation deleteRecordMutation(long singerId) { + return Mutation.delete(tableName, KeySet.newBuilder().addKey(Key.of(singerId)).build()); + } + + private static class SortKey + implements Serializable, + Comparable { + + private static final long serialVersionUID = 2105939115467195036L; + + private Timestamp commitTimestamp; + private String transactionId; + + public SortKey() {} + + public SortKey(Timestamp commitTimestamp, String transactionId) { + this.commitTimestamp = commitTimestamp; + this.transactionId = transactionId; + } + + public static long getSerialVersionUID() { + return serialVersionUID; + } + + public Timestamp getCommitTimestamp() { + return commitTimestamp; + } + + public void setCommitTimestamp(Timestamp commitTimestamp) { + this.commitTimestamp = commitTimestamp; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey sortKey = + (SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey) o; + return Objects.equals(commitTimestamp, sortKey.commitTimestamp) + && Objects.equals(transactionId, sortKey.transactionId); + } + + @Override + public int hashCode() { + return Objects.hash(commitTimestamp, transactionId); + } + + @Override + public int compareTo(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey other) { + return Comparator + .comparingDouble( + sortKey -> + sortKey.getCommitTimestamp().getSeconds() + + sortKey.getCommitTimestamp().getNanos() / 1000000000.0) + .thenComparing(sortKey -> sortKey.getTransactionId()) + .compare(this, other); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java index fc7ff7118a466..78f742f3f3806 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java @@ -31,11 +31,9 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.StateSpec; @@ -98,7 +96,7 @@ public void testOrderedWithinKey() { .withDatabaseId(databaseId); // Get the time increment interval at which to flush data changes ordered by key. - final long timeIncrementInSeconds = 70; + final long timeIncrementInSeconds = 2; // Commit a initial transaction to get the timestamp to start reading from. List mutations = new ArrayList<>(); @@ -113,7 +111,7 @@ public void testOrderedWithinKey() { try { Thread.sleep(timeIncrementInSeconds * 1000); } catch (InterruptedException e) { - System.out.println(e); + LOG.error(e.toString(), e); } // This will be the second batch of transactions that will have strict timestamp ordering @@ -124,14 +122,14 @@ public void testOrderedWithinKey() { try { Thread.sleep(timeIncrementInSeconds * 1000); } catch (InterruptedException e) { - System.out.println(e); + LOG.error(e.toString(), e); } // This will be the final batch of transactions that will have strict timestamp ordering // per key. com.google.cloud.Timestamp endTimestamp = writeTransactionsToDatabase(); - LOG.debug( + LOG.info( "Reading change streams from {} to {}", startTimestamp.toString(), endTimestamp.toString()); final PCollection tokens = @@ -146,8 +144,7 @@ public void testOrderedWithinKey() { .apply(ParDo.of(new BreakRecordByModFn())) .apply(ParDo.of(new KeyByIdFn())) .apply(ParDo.of(new KeyValueByCommitTimestampAndTransactionIdFn<>())) - .apply( - ParDo.of(new BufferKeyUntilOutputTimestamp(endTimestamp, timeIncrementInSeconds))) + .apply(ParDo.of(new BufferKeyUntilOutputTimestamp(timeIncrementInSeconds))) .apply(ParDo.of(new ToStringFn())); // Assert that the returned PCollection contains one entry per key for the committed @@ -163,74 +160,40 @@ public void testOrderedWithinKey() { + "{\"FirstName\":\"Inserting mutation 0\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"1\"}\n" - + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 1\"};" - + "Deleted record;" + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"2\"}\n" + "{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 2\"};" + "Deleted record;", "{\"SingerId\":\"3\"}\n" + "{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 3\"};" - + "Deleted record;", - "{\"SingerId\":\"4\"}\n" - + "{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};" - + "Deleted record;", - "{\"SingerId\":\"5\"}\n" - + "{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 5\"};" + "Deleted record;", // Second batch of records ordered within key. "{\"SingerId\":\"1\"}\n" - + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 1\"};" - + "Deleted record;" + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"2\"}\n" + "{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 2\"};" + "Deleted record;", "{\"SingerId\":\"3\"}\n" + "{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 3\"};" - + "Deleted record;", - "{\"SingerId\":\"4\"}\n" - + "{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};" - + "Deleted record;", - "{\"SingerId\":\"5\"}\n" - + "{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 5\"};" + "Deleted record;", // Third batch of records ordered within key. "{\"SingerId\":\"1\"}\n" - + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 1\"};" - + "Deleted record;" + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"2\"}\n" + "{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 2\"};" + "Deleted record;", "{\"SingerId\":\"3\"}\n" + "{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 3\"};" - + "Deleted record;", - "{\"SingerId\":\"4\"}\n" - + "{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};" - + "Deleted record;", - "{\"SingerId\":\"5\"}\n" - + "{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 5\"};" + "Deleted record;"); - pipeline.run().waitUntilFinish(); + pipeline + .runWithAdditionalOptionArgs(Collections.singletonList("--streaming")) + .waitUntilFinish(); } // Data change records may contain multiple mods if there are multiple primary keys. @@ -241,22 +204,6 @@ private static class BreakRecordByModFn extends DoFn outputReceiver) { - final ChangeStreamRecordMetadata fakeChangeStreamMetadata = - ChangeStreamRecordMetadata.newBuilder() - .withPartitionToken("1") - .withRecordTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(2L)) - .withPartitionStartTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(3L)) - .withPartitionEndTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(4L)) - .withPartitionCreatedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(5L)) - .withPartitionScheduledAt(com.google.cloud.Timestamp.ofTimeMicroseconds(6L)) - .withPartitionRunningAt(com.google.cloud.Timestamp.ofTimeMicroseconds(7L)) - .withQueryStartedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(8L)) - .withRecordStreamStartedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(9L)) - .withRecordStreamEndedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(10L)) - .withRecordReadAt(com.google.cloud.Timestamp.ofTimeMicroseconds(11L)) - .withTotalStreamTimeMillis(12L) - .withNumberOfRecordsRead(13L) - .build(); record.getMods().stream() .map( mod -> @@ -273,7 +220,7 @@ public void processElement( record.getValueCaptureType(), record.getNumberOfRecordsInTransaction(), record.getNumberOfPartitionsInTransaction(), - fakeChangeStreamMetadata)) + record.getMetadata())) .forEach(outputReceiver::output); } } @@ -332,16 +279,9 @@ private static class BufferKeyUntilOutputTimestamp private static final long serialVersionUID = 5050535558953049259L; private final long incrementIntervalInSeconds; - private final @Nullable Instant pipelineEndTime; - private BufferKeyUntilOutputTimestamp( - @Nullable com.google.cloud.Timestamp endTimestamp, long incrementIntervalInSeconds) { + private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) { this.incrementIntervalInSeconds = incrementIntervalInSeconds; - if (endTimestamp != null) { - this.pipelineEndTime = new Instant(endTimestamp.toSqlTimestamp()); - } else { - pipelineEndTime = null; - } } @SuppressWarnings("unused") @@ -355,8 +295,8 @@ private BufferKeyUntilOutputTimestamp( buffer = StateSpecs.bag(); @SuppressWarnings("unused") - @StateId("keySeen") - private final StateSpec> keySeen = StateSpecs.value(BooleanCoder.of()); + @StateId("seenKey") + private final StateSpec> seenKey = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement public void process( @@ -367,20 +307,20 @@ public void process( BagState> buffer, @TimerId("timer") Timer timer, - @StateId("keySeen") ValueState keySeen) { + @StateId("seenKey") ValueState seenKey) { buffer.add(element.getValue()); // Only set the timer if this is the first time we are receiving a data change record // with this key. - Boolean hasKeyBeenSeen = keySeen.read(); + String hasKeyBeenSeen = seenKey.read(); if (hasKeyBeenSeen == null) { Instant commitTimestamp = new Instant(element.getValue().getValue().getCommitTimestamp().toSqlTimestamp()); Instant outputTimestamp = commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds)); - LOG.debug("Setting timer at {} for key {}", outputTimestamp.toString(), element.getKey()); + LOG.info("Setting timer at {} for key {}", outputTimestamp.toString(), element.getKey()); timer.set(outputTimestamp); - keySeen.write(true); + seenKey.write(element.getKey()); } } @@ -390,7 +330,14 @@ public void onExpiry( @StateId("buffer") BagState> buffer, - @TimerId("timer") Timer timer) { + @TimerId("timer") Timer timer, + @StateId("seenKey") ValueState seenKey) { + String keyForTimer = seenKey.read(); + Instant timerContextTimestamp = context.timestamp(); + LOG.info( + "Timer reached expiration time for key {} and for timestamp {}", + keyForTimer, + timerContextTimestamp); if (!buffer.isEmpty().read()) { final List> records = @@ -412,18 +359,18 @@ public void onExpiry( // have been processed and successfully committed. Since the timer fires when the // watermark passes the expiration time, we should only output records with event time // < expiration time. - if (recordCommitTimestamp.isBefore(context.timestamp())) { - LOG.debug( + if (recordCommitTimestamp.isBefore(timerContextTimestamp)) { + LOG.info( "Outputting record with key {} and value \"{}\" at expiration timestamp {}", record.getValue().getMods().get(0).getKeysJson(), recordString, - context.timestamp().toString()); + timerContextTimestamp.toString()); recordsToOutput.add(record); } else { - LOG.debug( + LOG.info( "Expired at {} but adding record with key {} and value {} back to buffer " + "due to commit timestamp {}", - context.timestamp().toString(), + timerContextTimestamp.toString(), record.getValue().getMods().get(0).getKeysJson(), recordString, recordCommitTimestamp.toString()); @@ -437,26 +384,24 @@ public void onExpiry( KV.of( recordsToOutput.get(0).getValue().getMods().get(0).getKeysJson(), recordsToOutput), - context.timestamp()); - LOG.debug( - "Expired at {}, outputting records for key {}", - context.timestamp().toString(), + timerContextTimestamp); + LOG.info( + "Expired at {}, outputting records for key and context timestamp {}", + timerContextTimestamp.toString(), recordsToOutput.get(0).getValue().getMods().get(0).getKeysJson()); } else { - LOG.debug("Expired at {} with no records", context.timestamp().toString()); + LOG.info("Expired at {} with no records", timerContextTimestamp.toString()); } } Instant nextTimer = - context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds)); - if (pipelineEndTime == null || context.timestamp().isBefore(pipelineEndTime)) { - // If the current timer's timestamp is before the pipeline end time, or there is no - // pipeline end time, we still have data left to process. - LOG.debug("Setting next timer to {}", nextTimer.toString()); + timerContextTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds)); + if (buffer.isEmpty() != null && !buffer.isEmpty().read()) { + LOG.info("Setting next timer to {} for key {}", nextTimer.toString(), keyForTimer); timer.set(nextTimer); } else { - LOG.debug( - "Timer not being set as exceeded pipeline end time: " + pipelineEndTime.toString()); + LOG.info("Timer not being set since the buffer is empty for key {} ", keyForTimer); + seenKey.clear(); } } } @@ -546,75 +491,35 @@ public int compareTo(SortKey other) { } } - private static com.google.cloud.Timestamp writeTransactionsToDatabase() { + private com.google.cloud.Timestamp writeTransactionsToDatabase() { List mutations = new ArrayList<>(); // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table. mutations.add(insertRecordMutation(1)); mutations.add(insertRecordMutation(2)); com.google.cloud.Timestamp t1 = databaseClient.write(mutations); - LOG.debug("The first transaction committed with timestamp: " + t1.toString()); - mutations.clear(); - - // 2. Commmit a transaction to insert Singer 4 and remove Singer 1 from the table. - mutations.add(updateRecordMutation(1)); - mutations.add(insertRecordMutation(4)); - com.google.cloud.Timestamp t2 = databaseClient.write(mutations); - LOG.debug("The second transaction committed with timestamp: " + t2.toString()); + LOG.info("The first transaction committed with timestamp: " + t1.toString()); mutations.clear(); - // 3. Commit a transaction to insert Singer 3 and Singer 5. - mutations.add(deleteRecordMutation(1)); + // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from the table. mutations.add(insertRecordMutation(3)); - mutations.add(insertRecordMutation(5)); - mutations.add(updateRecordMutation(5)); - com.google.cloud.Timestamp t3 = databaseClient.write(mutations); - LOG.debug("The third transaction committed with timestamp: " + t3.toString()); - mutations.clear(); - - // 4. Commit a transaction to update Singer 3 and Singer 2 in the table. - mutations.add(updateRecordMutation(3)); - mutations.add(updateRecordMutation(2)); - com.google.cloud.Timestamp t4 = databaseClient.write(mutations); - LOG.debug("The fourth transaction committed with timestamp: " + t4.toString()); + mutations.add(deleteRecordMutation(1)); + com.google.cloud.Timestamp t2 = databaseClient.write(mutations); + LOG.info("The second transaction committed with timestamp: " + t2.toString()); mutations.clear(); - // 5. Commit a transaction to delete 4, insert 1, delete 3, update 5. - mutations.add(deleteRecordMutation(4)); - mutations.add(insertRecordMutation(1)); + // 3. Commit a transaction to delete Singer 2 and Singer 3 from the table. + mutations.add(deleteRecordMutation(2)); mutations.add(deleteRecordMutation(3)); - mutations.add(updateRecordMutation(5)); - com.google.cloud.Timestamp t5 = databaseClient.write(mutations); - - LOG.debug("The fifth transaction committed with timestamp: " + t5.toString()); - mutations.clear(); - - // 6. Commit a transaction to delete Singers 5, insert singers 6. - mutations.add(deleteRecordMutation(5)); - mutations.add(insertRecordMutation(6)); - mutations.add(deleteRecordMutation(6)); - com.google.cloud.Timestamp t6 = databaseClient.write(mutations); - LOG.debug("The sixth transaction committed with timestamp: " + t6.toString()); + com.google.cloud.Timestamp t3 = databaseClient.write(mutations); + LOG.info("The third transaction committed with timestamp: " + t3.toString()); mutations.clear(); - // 7. Delete remaining rows from database. - mutations.add(deleteRecordMutation(1)); - mutations.add(deleteRecordMutation(2)); + // 4. Commit a transaction to delete Singer 0. mutations.add(deleteRecordMutation(0)); - com.google.cloud.Timestamp t7 = databaseClient.write(mutations); - LOG.debug("The seventh transaction committed with timestamp: " + t7.toString()); - - return t7; - } - - // Create an update mutation. - private static Mutation updateRecordMutation(long singerId) { - return Mutation.newUpdateBuilder(tableName) - .set("SingerId") - .to(singerId) - .set("FirstName") - .to("Updating mutation " + singerId) - .build(); + com.google.cloud.Timestamp t4 = databaseClient.write(mutations); + LOG.info("The fourth transaction committed with timestamp: " + t4.toString()); + return t4; } // Create an insert mutation. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java index 53566f0d6489c..9758cad812d95 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java @@ -83,6 +83,7 @@ public static void setup() throws InterruptedException, ExecutionException, Time @Test public void testOrderedWithinKey() { + LOG.info("Test pipeline: " + pipeline.toString()); final SpannerConfig spannerConfig = SpannerConfig.create() .withProjectId(projectId) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java new file mode 100644 index 0000000000000..eb5b9e3ba1514 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.it; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** End-to-end test of Cloud Spanner Change Streams Transaction Boundaries. */ +@RunWith(JUnit4.class) +public class SpannerChangeStreamTransactionBoundariesIT { + + private static final Logger LOG = + LoggerFactory.getLogger(SpannerChangeStreamTransactionBoundariesIT.class); + + @ClassRule public static final IntegrationTestEnv ENV = new IntegrationTestEnv(); + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private static String projectId; + private static String instanceId; + private static String databaseId; + private static String tableName; + private static String changeStreamName; + private static DatabaseClient databaseClient; + + @BeforeClass + public static void setup() throws InterruptedException, ExecutionException, TimeoutException { + projectId = ENV.getProjectId(); + instanceId = ENV.getInstanceId(); + databaseId = ENV.getDatabaseId(); + tableName = ENV.createSingersTable(); + changeStreamName = ENV.createChangeStreamFor(tableName); + databaseClient = ENV.getDatabaseClient(); + } + + @Test + public void testTransactionBoundaries() { + LOG.info("Test pipeline: " + pipeline.toString()); + final SpannerConfig spannerConfig = + SpannerConfig.create() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withDatabaseId(databaseId); + + // Commit a initial transaction to get the timestamp to start reading from. + List mutations = new ArrayList<>(); + mutations.add(insertRecordMutation(0, "FirstName0", "LastName0")); + final Timestamp startTimestamp = databaseClient.write(mutations); + + // Get the timestamp of the last committed transaction to get the end timestamp. + final Timestamp endTimestamp = writeTransactionsToDatabase(); + + final PCollection tokens = + pipeline + .apply( + SpannerIO.readChangeStream() + .withSpannerConfig(spannerConfig) + .withChangeStreamName(changeStreamName) + .withMetadataDatabase(databaseId) + .withInclusiveStartAt(startTimestamp) + .withInclusiveEndAt(endTimestamp)) + .apply(ParDo.of(new SpannerChangeStreamTransactionBoundariesIT.KeyByTransactionIdFn())) + .apply(ParDo.of(new SpannerChangeStreamTransactionBoundariesIT.TransactionBoundaryFn())) + .apply(ParDo.of(new SpannerChangeStreamTransactionBoundariesIT.ToStringFn())); + + // Assert that the returned PCollection contains all six transactions (in string representation) + // and that each transaction contains, in order, the list of mutations added. + PAssert.that(tokens) + .containsInAnyOrder( + // Insert Singer 0 into the table. + "{\"SingerId\":\"0\"},INSERT\n", + + // Insert Singer 1 and 2 into the table, + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n", + + // Delete Singer 1 and Insert Singer 3 into the table. + "{\"SingerId\":\"1\"},DELETE\n" + "{\"SingerId\":\"3\"},INSERT\n", + + // Insert Singers 4, 5, 6 into the table. + "{\"SingerId\":\"4\"}{\"SingerId\":\"5\"}{\"SingerId\":\"6\"},INSERT\n", + + // Update Singer 6 and Insert Singer 7 + "{\"SingerId\":\"6\"},UPDATE\n" + "{\"SingerId\":\"7\"},INSERT\n", + + // Update Singers 4 and 5 in the table. + "{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},UPDATE\n", + + // Delete Singers 3, 4, 5 from the table. + "{\"SingerId\":\"3\"}{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},DELETE\n", + + // Delete Singers 0, 2, 6, 7; + "{\"SingerId\":\"0\"}{\"SingerId\":\"2\"}{\"SingerId\":\"6\"}" + + "{\"SingerId\":\"7\"},DELETE\n"); + + final PipelineResult pipelineResult = pipeline.run(); + pipelineResult.waitUntilFinish(); + } + + // KeyByTransactionIdFn takes in a DataChangeRecord and outputs a key-value pair of + // {TransactionId, DataChangeRecord} + private static class KeyByTransactionIdFn + extends DoFn> { + + private static final long serialVersionUID = 1270485392415293532L; + + @ProcessElement + public void processElement( + @Element DataChangeRecord record, + OutputReceiver> outputReceiver) { + outputReceiver.output(KV.of(record.getServerTransactionId(), record)); + } + } + + // TransactionBoundaryFn buffers received key-value pairs of {TransactionId, DataChangeRecord} + // from KeyByTransactionIdFn and buffers them in groups based on TransactionId. + // When the number of records buffered is equal to the number of records contained in the + // entire transaction, this function sorts the DataChangeRecords in the group by record sequence + // and outputs a key-value pair of SortKey(CommitTimestamp, TransactionId), + // Iterable. + private static class TransactionBoundaryFn + extends DoFn< + KV, + KV>> { + + private static final long serialVersionUID = 5050535558953049259L; + + @SuppressWarnings("UnusedVariable") + @StateId("buffer") + private final StateSpec> buffer = StateSpecs.bag(); + + @SuppressWarnings("UnusedVariable") + @StateId("count") + private final StateSpec> countState = StateSpecs.value(); + + @ProcessElement + public void process( + ProcessContext context, + @StateId("buffer") BagState buffer, + @StateId("count") ValueState countState) { + final KV element = context.element(); + final DataChangeRecord record = element.getValue(); + + buffer.add(record); + int count = (countState.read() != null ? countState.read() : 0); + count = count + 1; + countState.write(count); + + if (count == record.getNumberOfRecordsInTransaction()) { + final List sortedRecords = + StreamSupport.stream(buffer.read().spliterator(), false) + .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence)) + .collect(Collectors.toList()); + + final Instant commitInstant = + new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp().getTime()); + context.outputWithTimestamp( + KV.of( + new SpannerChangeStreamTransactionBoundariesIT.SortKey( + sortedRecords.get(0).getCommitTimestamp(), + sortedRecords.get(0).getServerTransactionId()), + sortedRecords), + commitInstant); + buffer.clear(); + countState.clear(); + } + } + } + + // ToStringFn takes in a key-value pair of SortKey, Iterable and outputs + // a string representation. + private static class ToStringFn + extends DoFn< + KV>, + String> { + + private static final long serialVersionUID = 2307936669684679038L; + + @ProcessElement + public void processElement( + @Element + KV> + element, + OutputReceiver outputReceiver) { + final StringBuilder builder = new StringBuilder(); + final Iterable sortedRecords = element.getValue(); + sortedRecords.forEach( + record -> { + // Output the string representation of the mods and the mod type for each data change + // record. + String modString = ""; + for (Mod mod : record.getMods()) { + modString += mod.getKeysJson(); + } + builder.append(String.join(",", modString, record.getModType().toString())); + builder.append("\n"); + }); + outputReceiver.output(builder.toString()); + } + } + + private static class SortKey + implements Serializable, Comparable { + + private static final long serialVersionUID = 2105939115467195036L; + + private Timestamp commitTimestamp; + private String transactionId; + + public SortKey() {} + + public SortKey(Timestamp commitTimestamp, String transactionId) { + this.commitTimestamp = commitTimestamp; + this.transactionId = transactionId; + } + + public static long getSerialVersionUID() { + return serialVersionUID; + } + + public Timestamp getCommitTimestamp() { + return commitTimestamp; + } + + public void setCommitTimestamp(Timestamp commitTimestamp) { + this.commitTimestamp = commitTimestamp; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpannerChangeStreamTransactionBoundariesIT.SortKey sortKey = + (SpannerChangeStreamTransactionBoundariesIT.SortKey) o; + return Objects.equals(commitTimestamp, sortKey.commitTimestamp) + && Objects.equals(transactionId, sortKey.transactionId); + } + + @Override + public int hashCode() { + return Objects.hash(commitTimestamp, transactionId); + } + + @Override + public int compareTo(SpannerChangeStreamTransactionBoundariesIT.SortKey other) { + return Comparator.comparingDouble( + sortKey -> + sortKey.getCommitTimestamp().getSeconds() + + sortKey.getCommitTimestamp().getNanos() / 1000000000.0) + .thenComparing(sortKey -> sortKey.getTransactionId()) + .compare(this, other); + } + } + + private Timestamp writeTransactionsToDatabase() { + List mutations = new ArrayList<>(); + + // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table. + mutations.add(insertRecordMutation(1, "FirstName1", "LastName2")); + mutations.add(insertRecordMutation(2, "FirstName2", "LastName2")); + Timestamp t1 = databaseClient.write(mutations); + LOG.debug("The first transaction committed with timestamp: " + t1.toString()); + mutations.clear(); + + // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from the table. + mutations.add(insertRecordMutation(3, "FirstName3", "LastName3")); + mutations.add(deleteRecordMutation(1)); + Timestamp t2 = databaseClient.write(mutations); + LOG.debug("The second transaction committed with timestamp: " + t2.toString()); + mutations.clear(); + + // 3. Commit a transaction to insert Singer 4 and Singer 5 and Singer 6 into the table. + mutations.add(insertRecordMutation(4, "FirstName4", "LastName4")); + mutations.add(insertRecordMutation(5, "FirstName5", "LastName5")); + mutations.add(insertRecordMutation(6, "FirstName6", "LastName6")); + Timestamp t3 = databaseClient.write(mutations); + LOG.debug("The third transaction committed with timestamp: " + t3.toString()); + mutations.clear(); + + // 4. Commit a transaction to insert Singer 7 and update Singer 6 in the table. + mutations.add(insertRecordMutation(7, "FirstName7", "LastName7")); + mutations.add(updateRecordMutation(6, "FirstName5", "LastName5")); + Timestamp t4 = databaseClient.write(mutations); + LOG.debug("The fourth transaction committed with timestamp: " + t4.toString()); + mutations.clear(); + + // 5. Commit a transaction to update Singer 4 and Singer 5 in the table. + mutations.add(updateRecordMutation(4, "FirstName9", "LastName9")); + mutations.add(updateRecordMutation(5, "FirstName9", "LastName9")); + Timestamp t5 = databaseClient.write(mutations); + LOG.debug("The fifth transaction committed with timestamp: " + t5.toString()); + mutations.clear(); + + // 6. Commit a transaction to delete Singers 3, 4, 5. + mutations.add(deleteRecordMutation(3)); + mutations.add(deleteRecordMutation(4)); + mutations.add(deleteRecordMutation(5)); + Timestamp t6 = databaseClient.write(mutations); + mutations.clear(); + LOG.debug("The sixth transaction committed with timestamp: " + t6.toString()); + + // 7. Commit a transaction to delete Singers 0, 2, 6, 7. + mutations.add(deleteRecordMutation(0)); + mutations.add(deleteRecordMutation(2)); + mutations.add(deleteRecordMutation(6)); + mutations.add(deleteRecordMutation(7)); + Timestamp t7 = databaseClient.write(mutations); + LOG.debug("The seventh transaction committed with timestamp: " + t7.toString()); + + return t7; + } + + // Create an update mutation. + private static Mutation updateRecordMutation(long singerId, String firstName, String lastName) { + return Mutation.newUpdateBuilder(tableName) + .set("SingerId") + .to(singerId) + .set("FirstName") + .to(firstName) + .set("LastName") + .to(lastName) + .build(); + } + + // Create an insert mutation. + private static Mutation insertRecordMutation(long singerId, String firstName, String lastName) { + return Mutation.newInsertBuilder(tableName) + .set("SingerId") + .to(singerId) + .set("FirstName") + .to(firstName) + .set("LastName") + .to(lastName) + .build(); + } + + // Create a delete mutation. + private static Mutation deleteRecordMutation(long singerId) { + return Mutation.delete(tableName, KeySet.newBuilder().addKey(Key.of(singerId)).build()); + } +}