diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index 18c4307f29612..448bc4c31e57b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -852,7 +852,12 @@ public Row build() { throw new IllegalArgumentException( "Row expected " + schema.getFieldCount() - + " fields. initialized with " + + String.format( + " fields (%s).", + schema.getFields().stream() + .map(Object::toString) + .collect(Collectors.joining(", "))) + + " initialized with " + values.size() + " fields."); } diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index bf5d09382873c..146d40cc70dce 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -43,6 +43,7 @@ dependencies { // Test dependencies testImplementation library.java.junit + testImplementation project(path: ":sdks:java:io:jdbc") testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation project(":runners:google-cloud-dataflow-java") diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java index 7aa8b6d0dc7f5..4fefe917e8c43 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java @@ -18,9 +18,7 @@ package org.apache.beam.io.debezium; import org.apache.kafka.connect.source.SourceConnector; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.checkerframework.checker.nullness.qual.NonNull; /** Enumeration of debezium connectors. */ public enum Connectors { @@ -30,7 +28,6 @@ public enum Connectors { ORACLE("Oracle", "io.debezium.connector.oracle.OracleConnector"), DB2("DB2", "io.debezium.connector.db2.Db2Connector"), ; - private static final Logger LOG = LoggerFactory.getLogger(Connectors.class); private final String name; private final String connector; @@ -45,12 +42,14 @@ public String getName() { } /** Class connector to debezium. */ - public @Nullable Class getConnector() { + public @NonNull Class getConnector() { Class connectorClass = null; try { connectorClass = (Class) Class.forName(this.connector); } catch (ClassCastException | ClassNotFoundException e) { - LOG.error("Connector class is not found", e); + throw new IllegalArgumentException( + String.format( + "Unable to resolve class %s to use as Debezium connector.", this.connector)); } return connectorClass; } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java index b7c084fcba78d..492dea4f4be0e 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -23,12 +23,14 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -36,7 +38,9 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +144,8 @@ public abstract static class Read extends PTransform> abstract @Nullable Integer getMaxNumberOfRecords(); + abstract @Nullable Long getMaxTimeToRun(); + abstract @Nullable Coder getCoder(); abstract Builder toBuilder(); @@ -154,6 +160,8 @@ abstract static class Builder { abstract Builder setMaxNumberOfRecords(Integer maxNumberOfRecords); + abstract Builder setMaxTimeToRun(Long miliseconds); + abstract Read build(); } @@ -202,6 +210,53 @@ public Read withMaxNumberOfRecords(Integer maxNumberOfRecords) { return toBuilder().setMaxNumberOfRecords(maxNumberOfRecords).build(); } + /** + * Once the connector has run for the determined amount of time, it will stop. The value can be + * null (default) which means it will not stop. This parameter is mainly intended for testing. + * + * @param miliseconds The maximum number of miliseconds to run before stopping the connector. + * @return PTransform {@link #read} + */ + public Read withMaxTimeToRun(Long miliseconds) { + return toBuilder().setMaxTimeToRun(miliseconds).build(); + } + + protected Schema getRecordSchema() { + KafkaSourceConsumerFn fn = + new KafkaSourceConsumerFn<>( + getConnectorConfiguration().getConnectorClass().get(), + getFormatFunction(), + getMaxNumberOfRecords()); + fn.register( + new KafkaSourceConsumerFn.OffsetTracker( + new KafkaSourceConsumerFn.OffsetHolder(null, null, 0))); + + Map connectorConfig = + Maps.newHashMap(getConnectorConfiguration().getConfigurationMap()); + connectorConfig.put("snapshot.mode", "schema_only"); + SourceRecord sampledRecord = + fn.getOneRecord(getConnectorConfiguration().getConfigurationMap()); + fn.reset(); + Schema keySchema = + sampledRecord.keySchema() != null + ? KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.keySchema()) + : Schema.builder().build(); + Schema valueSchema = + KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.valueSchema()); + + return Schema.builder() + .addFields(valueSchema.getFields()) + .setOptions( + Schema.Options.builder() + .setOption( + "primaryKeyColumns", + Schema.FieldType.array(Schema.FieldType.STRING), + keySchema.getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.toList()))) + .build(); + } + @Override public PCollection expand(PBegin input) { return input @@ -213,7 +268,8 @@ public PCollection expand(PBegin input) { new KafkaSourceConsumerFn<>( getConnectorConfiguration().getConnectorClass().get(), getFormatFunction(), - getMaxNumberOfRecords()))) + getMaxNumberOfRecords(), + getMaxTimeToRun()))) .setCoder(getCoder()); } } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java new file mode 100644 index 0000000000000..1d3cb8a97ddf5 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A schema-aware transform provider for {@link DebeziumIO}. This class provides a {@link + * PTransform} that returns a change stream for a relational database. + * + *

The transform needs to access the source database on expansion and at pipeline + * runtime. At expansion, the output {@link org.apache.beam.sdk.values.PCollection} schema is + * retrieved, while at runtime, the change stream is consumed. + * + *

This transform is tested against MySQL and Postgres, but it should work well for any + * data source supported by Debezium. + */ +public class DebeziumReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration> { + + private static final Logger LOG = + LoggerFactory.getLogger(DebeziumReadSchemaTransformProvider.class); + private final Boolean isTest; + private final Integer testLimitRecords; + private final Long testLimitMilliseconds; + + DebeziumReadSchemaTransformProvider() { + this(false, -1, Long.MAX_VALUE); + } + + @VisibleForTesting + protected DebeziumReadSchemaTransformProvider( + Boolean isTest, Integer recordLimit, Long timeLimitMs) { + this.isTest = isTest; + this.testLimitRecords = recordLimit; + this.testLimitMilliseconds = timeLimitMs; + } + + @Override + protected @NonNull @Initialized Class + configurationClass() { + return DebeziumReadSchemaTransformConfiguration.class; + } + + @Override + protected @NonNull @Initialized SchemaTransform from( + DebeziumReadSchemaTransformConfiguration configuration) { + // TODO(pabloem): Validate configuration parameters to ensure formatting is correct. + return new SchemaTransform() { + @Override + public @UnknownKeyFor @NonNull @Initialized PTransform< + @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, + @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> + buildTransform() { + return new PTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + // TODO(pabloem): Test this behavior + Collection connectors = + Arrays.stream(Connectors.values()) + .map(Object::toString) + .collect(Collectors.toSet()); + if (!connectors.contains(configuration.getDatabase())) { + throw new IllegalArgumentException( + "Unsupported database " + + configuration.getDatabase() + + ". Unable to select a JDBC driver for it. Supported Databases are: " + + String.join(", ", connectors)); + } + Class connectorClass = + Objects.requireNonNull(Connectors.valueOf(configuration.getDatabase())) + .getConnector(); + DebeziumIO.ConnectorConfiguration connectorConfiguration = + DebeziumIO.ConnectorConfiguration.create() + .withUsername(configuration.getUsername()) + .withPassword(configuration.getPassword()) + .withHostName(configuration.getHost()) + .withPort(Integer.toString(configuration.getPort())) + .withConnectorClass(connectorClass); + connectorConfiguration = + connectorConfiguration + .withConnectionProperty("table.include.list", configuration.getTable()) + .withConnectionProperty("include.schema.changes", "false") + .withConnectionProperty("database.server.name", "beam-pipeline-server"); + if (configuration.getDatabase().equals("POSTGRES")) { + LOG.info( + "As Database is POSTGRES, we set the `database.dbname` property to {}.", + configuration.getTable().substring(0, configuration.getTable().indexOf("."))); + connectorConfiguration = + connectorConfiguration.withConnectionProperty( + "database.dbname", + configuration.getTable().substring(0, configuration.getTable().indexOf("."))); + } + + final List debeziumConnectionProperties = + configuration.getDebeziumConnectionProperties(); + if (debeziumConnectionProperties != null) { + for (String connectionProperty : debeziumConnectionProperties) { + String[] parts = connectionProperty.split("=", -1); + String key = parts[0]; + String value = parts[1]; + connectorConfiguration.withConnectionProperty(key, value); + } + } + + DebeziumIO.Read readTransform = + DebeziumIO.read().withConnectorConfiguration(connectorConfiguration); + + if (isTest) { + readTransform = + readTransform + .withMaxNumberOfRecords(testLimitRecords) + .withMaxTimeToRun(testLimitMilliseconds); + } + + // TODO(pabloem): Database connection issues can be debugged here. + Schema recordSchema = readTransform.getRecordSchema(); + LOG.info( + "Computed schema for table {} from {}: {}", + configuration.getTable(), + configuration.getDatabase(), + recordSchema); + SourceRecordMapper formatFn = + KafkaConnectUtils.beamRowFromSourceRecordFn(recordSchema); + readTransform = + readTransform.withFormatFunction(formatFn).withCoder(RowCoder.of(recordSchema)); + + return PCollectionRowTuple.of("output", input.getPipeline().apply(readTransform)); + } + }; + } + }; + } + + @Override + public @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:debezium_read:v1"; + } + + @Override + public @NonNull @Initialized List<@NonNull @Initialized String> inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public @NonNull @Initialized List<@NonNull @Initialized String> outputCollectionNames() { + return Collections.singletonList("output"); + } + + @AutoValue + public abstract static class DebeziumReadSchemaTransformConfiguration { + public abstract String getUsername(); + + public abstract String getPassword(); + + public abstract String getHost(); + + public abstract Integer getPort(); + + public abstract String getTable(); + + public abstract @NonNull String getDatabase(); + + public abstract @Nullable List getDebeziumConnectionProperties(); + + public static Builder builder() { + return new AutoValue_DebeziumReadSchemaTransformProvider_DebeziumReadSchemaTransformConfiguration + .Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUsername(String username); + + public abstract Builder setPassword(String password); + + public abstract Builder setHost(String host); + + public abstract Builder setPort(Integer port); + + public abstract Builder setDatabase(String database); + + public abstract Builder setTable(String table); + + public abstract Builder setDebeziumConnectionProperties( + List debeziumConnectionProperties); + + public abstract DebeziumReadSchemaTransformConfiguration build(); + } + } +} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java index abf5703bf2092..56bba975b189d 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java @@ -18,6 +18,9 @@ package org.apache.beam.io.debezium; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; public class KafkaConnectUtils { public static Schema beamSchemaFromKafkaConnectSchema( @@ -74,4 +77,47 @@ public static Schema.FieldType beamSchemaTypeFromKafkaType( "Unable to convert Kafka field schema %s to Beam Schema", kafkaFieldSchema)); } } + + public static SourceRecordMapper beamRowFromSourceRecordFn(final Schema recordSchema) { + return new SourceRecordMapper() { + @Override + public Row mapSourceRecord(SourceRecord sourceRecord) throws Exception { + return beamRowFromKafkaStruct((Struct) sourceRecord.value(), recordSchema); + } + + private Row beamRowFromKafkaStruct(Struct kafkaStruct, Schema beamSchema) { + Row.Builder rowBuilder = Row.withSchema(beamSchema); + for (Schema.Field f : beamSchema.getFields()) { + Object structField = kafkaStruct.getWithoutDefault(f.getName()); + switch (kafkaStruct.schema().field(f.getName()).schema().type()) { + case ARRAY: + case MAP: + // TODO(pabloem): Handle nested structs + throw new IllegalArgumentException("UNABLE TO CONVERT FIELD " + f); + case STRUCT: + Schema fieldSchema = f.getType().getRowSchema(); + if (fieldSchema == null) { + throw new IllegalArgumentException( + String.format( + "Improper schema for Beam record: %s has no row schema to build a Row from.", + f.getName())); + } + if (structField == null) { + // If the field is null, then we must add a null field to ensure we encode things + // properly. + rowBuilder = rowBuilder.addValue(null); + break; + } + rowBuilder = + rowBuilder.addValue(beamRowFromKafkaStruct((Struct) structField, fieldSchema)); + break; + default: + rowBuilder = rowBuilder.addValue(structField); + break; + } + } + return rowBuilder.build(); + } + }; + } } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java index 6d5658f72b06e..3ee59eab124a7 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -25,6 +25,7 @@ import io.debezium.relational.history.HistoryRecord; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,6 +40,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -55,7 +57,8 @@ * *

Quick Overview

* - * SDF used to process records fetched from supported Debezium Connectors. + * This is a Splittable {@link DoFn} used to process records fetched from supported Debezium + * Connectors. * *

Currently it has a time limiter (see {@link OffsetTracker}) which, if set, it will stop * automatically after the specified elapsed minutes. Otherwise, it will keep running until the user @@ -63,7 +66,8 @@ * *

It might be initialized either as: * - *

KafkaSourceConsumerFn(connectorClass, SourceRecordMapper)
+ *
KafkaSourceConsumerFn(connectorClass, SourceRecordMapper, maxRecords, milisecondsToRun)
+ * 
* * Or with a time limiter: * @@ -77,8 +81,8 @@ public class KafkaSourceConsumerFn extends DoFn, T> { private final Class connectorClass; private final SourceRecordMapper fn; - private long minutesToRun = -1; - private Integer maxRecords; + private final Long milisecondsToRun; + private final Integer maxRecords; private static DateTime startTime; private static final Map>> @@ -89,12 +93,18 @@ public class KafkaSourceConsumerFn extends DoFn, T> { * * @param connectorClass Supported Debezium connector class * @param fn a SourceRecordMapper - * @param minutesToRun Maximum time to run (in minutes) + * @param maxRecords Maximum number of records to fetch before finishing. + * @param milisecondsToRun Maximum time to run (in milliseconds) */ - KafkaSourceConsumerFn(Class connectorClass, SourceRecordMapper fn, long minutesToRun) { + KafkaSourceConsumerFn( + Class connectorClass, + SourceRecordMapper fn, + Integer maxRecords, + Long milisecondsToRun) { this.connectorClass = (Class) connectorClass; this.fn = fn; - this.minutesToRun = minutesToRun; + this.maxRecords = maxRecords; + this.milisecondsToRun = milisecondsToRun; } /** @@ -102,18 +112,17 @@ public class KafkaSourceConsumerFn extends DoFn, T> { * * @param connectorClass Supported Debezium connector class * @param fn a SourceRecordMapper + * @param maxRecords Maximum number of records to fetch before finishing. */ KafkaSourceConsumerFn(Class connectorClass, SourceRecordMapper fn, Integer maxRecords) { - this.connectorClass = (Class) connectorClass; - this.fn = fn; - this.maxRecords = maxRecords; + this(connectorClass, fn, maxRecords, null); } @GetInitialRestriction public OffsetHolder getInitialRestriction(@Element Map unused) throws IOException { KafkaSourceConsumerFn.startTime = new DateTime(); - return new OffsetHolder(null, null, null, this.maxRecords, this.minutesToRun); + return new OffsetHolder(null, null, null, this.maxRecords, this.milisecondsToRun); } @NewTracker @@ -127,6 +136,45 @@ public Coder getRestrictionCoder() { return SerializableCoder.of(OffsetHolder.class); } + protected SourceRecord getOneRecord(Map configuration) { + try { + SourceConnector connector = connectorClass.getDeclaredConstructor().newInstance(); + connector.start(configuration); + + SourceTask task = (SourceTask) connector.taskClass().getDeclaredConstructor().newInstance(); + task.initialize(new BeamSourceTaskContext(null)); + task.start(connector.taskConfigs(1).get(0)); + List records = Lists.newArrayList(); + int loops = 0; + while (records.size() == 0) { + if (loops > 3) { + throw new RuntimeException("could not fetch database schema"); + } + records = task.poll(); + // Waiting for the Database snapshot to finish. + Thread.sleep(2000); + loops += 1; + } + task.stop(); + connector.stop(); + return records.get(0); + } catch (NoSuchMethodException + | InterruptedException + | InvocationTargetException + | IllegalAccessException + | InstantiationException e) { + throw new RuntimeException("Unexpected exception fetching database schema.", e); + } + } + + void register(RestrictionTracker> tracker) { + restrictionTrackers.put(this.getHashCode(), tracker); + } + + void reset() { + restrictionTrackers.remove(this.getHashCode()); + } + /** * Process the retrieved element. Currently it just logs the retrieved record as JSON. * @@ -145,7 +193,7 @@ public ProcessContinuation process( Map configuration = new HashMap<>(element); // Adding the current restriction to the class object to be found by the database history - restrictionTrackers.put(this.getHashCode(), tracker); + register(tracker); configuration.put(BEAM_INSTANCE_PROPERTY, this.getHashCode()); SourceConnector connector = connectorClass.getDeclaredConstructor().newInstance(); @@ -168,7 +216,7 @@ public ProcessContinuation process( } LOG.debug("-------- {} records found", records.size()); - if (!records.isEmpty()) { + while (records != null && !records.isEmpty()) { for (SourceRecord record : records) { LOG.debug("-------- Record found: {}", record); @@ -184,22 +232,24 @@ public ProcessContinuation process( receiver.output(json); } - task.commit(); + records = task.poll(); } } catch (Exception ex) { - LOG.error( - "-------- Error on consumer: {}. with stacktrace: {}", - ex.getMessage(), - ex.getStackTrace()); + throw new RuntimeException("Error occurred when consuming changes from Database. ", ex); } finally { - restrictionTrackers.remove(this.getHashCode()); + reset(); LOG.debug("------- Stopping SourceTask"); task.stop(); } - return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); + long elapsedTime = System.currentTimeMillis() - KafkaSourceConsumerFn.startTime.getMillis(); + if (milisecondsToRun != null && milisecondsToRun > 0 && elapsedTime >= milisecondsToRun) { + return ProcessContinuation.stop(); + } else { + return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); + } } public String getHashCode() { @@ -255,37 +305,36 @@ public Map, Map> offsets( } static class OffsetHolder implements Serializable { - public final @Nullable Map offset; - public final @Nullable List history; - public final @Nullable Integer fetchedRecords; - public final @Nullable Integer maxRecords; - public final long minutesToRun; + public @Nullable Map offset; + public @Nullable List history; + public @Nullable Integer fetchedRecords; + public @Nullable Integer maxRecords; + public final @Nullable Long milisToRun; OffsetHolder( @Nullable Map offset, @Nullable List history, @Nullable Integer fetchedRecords, @Nullable Integer maxRecords, - long minutesToRun) { + @Nullable Long milisToRun) { this.offset = offset; this.history = history == null ? new ArrayList<>() : history; this.fetchedRecords = fetchedRecords; this.maxRecords = maxRecords; - this.minutesToRun = minutesToRun; + this.milisToRun = milisToRun; } OffsetHolder( @Nullable Map offset, @Nullable List history, @Nullable Integer fetchedRecords) { - this(offset, history, fetchedRecords, null, -1); + this(offset, history, fetchedRecords, null, -1L); } } /** {@link RestrictionTracker} for Debezium connectors. */ static class OffsetTracker extends RestrictionTracker> { private OffsetHolder restriction; - private static final long MILLIS = 60 * 1000; OffsetTracker(OffsetHolder holder) { this.restriction = holder; @@ -316,28 +365,20 @@ public boolean tryClaim(Map position) { int fetchedRecords = this.restriction.fetchedRecords == null ? 0 : this.restriction.fetchedRecords + 1; LOG.debug("------------Fetched records {} / {}", fetchedRecords, this.restriction.maxRecords); - LOG.debug( - "-------------- Time running: {} / {}", - elapsedTime, - (this.restriction.minutesToRun * MILLIS)); - this.restriction = - new OffsetHolder( - position, - this.restriction.history, - fetchedRecords, - this.restriction.maxRecords, - this.restriction.minutesToRun); + LOG.debug("-------------- Time running: {} / {}", elapsedTime, (this.restriction.milisToRun)); + this.restriction.offset = position; + this.restriction.fetchedRecords = fetchedRecords; LOG.debug("-------------- History: {}", this.restriction.history); - if (this.restriction.maxRecords == null && this.restriction.minutesToRun == -1) { + if (this.restriction.maxRecords == null && this.restriction.milisToRun == -1) { return true; } - if (this.restriction.maxRecords != null) { - return fetchedRecords < this.restriction.maxRecords; - } - - return elapsedTime < this.restriction.minutesToRun * MILLIS; + // If we've reached the maximum number of records OR the maximum time, we reject + // the attempt to claim. + // If we've reached neither, then we continue approve the claim. + return (this.restriction.maxRecords == null || fetchedRecords < this.restriction.maxRecords) + && (this.restriction.milisToRun == null || elapsedTime < this.restriction.milisToRun); } @Override @@ -356,7 +397,8 @@ public void checkDone() throws IllegalStateException {} @Override public IsBounded isBounded() { - return IsBounded.BOUNDED; + // TODO(pabloem): Implement truncate call that allows us to restart the state + return IsBounded.UNBOUNDED; } } diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java index 6056ca0ebf6c9..5a5dedfe2e6ee 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java @@ -17,27 +17,47 @@ */ package org.apache.beam.io.debezium; +import static org.apache.beam.io.debezium.DebeziumIOPostgresSqlConnectorIT.TABLE_SCHEMA; import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import io.debezium.connector.mysql.MySqlConnector; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.time.Duration; +import javax.sql.DataSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.utility.DockerImageName; @RunWith(JUnit4.class) public class DebeziumIOMySqlConnectorIT { + + private static final Logger LOG = LoggerFactory.getLogger(DebeziumIOMySqlConnectorIT.class); /** * Debezium - MySqlContainer * @@ -57,6 +77,108 @@ public class DebeziumIOMySqlConnectorIT { .forStatusCodeMatching(response -> response == 200) .withStartupTimeout(Duration.ofMinutes(2))); + public static DataSource getMysqlDatasource(Void unused) { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setJdbcUrl(MY_SQL_CONTAINER.getJdbcUrl()); + hikariConfig.setUsername(MY_SQL_CONTAINER.getUsername()); + hikariConfig.setPassword(MY_SQL_CONTAINER.getPassword()); + hikariConfig.setDriverClassName(MY_SQL_CONTAINER.getDriverClassName()); + return new HikariDataSource(hikariConfig); + } + + private void monitorEssentialMetrics() { + DataSource ds = getMysqlDatasource(null); + try { + Connection conn = ds.getConnection(); + Statement st = conn.createStatement(); + while (true) { + ResultSet rs = st.executeQuery("SHOW STATUS WHERE `variable_name` = 'Threads_connected'"); + if (rs.next()) { + LOG.info("Open connections: {}", rs.getLong(2)); + rs.close(); + Thread.sleep(4000); + } else { + throw new IllegalArgumentException("OIOI"); + } + } + } catch (InterruptedException | SQLException ex) { + throw new IllegalArgumentException("Oi", ex); + } + } + + @Test + public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException { + long writeSize = 500L; + long testTime = writeSize * 200L; + MY_SQL_CONTAINER.start(); + + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline writePipeline = Pipeline.create(options); + writePipeline + .apply( + GenerateSequence.from(0) + .to(writeSize) + .withRate(10, org.joda.time.Duration.standardSeconds(1))) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + num -> + Row.withSchema(TABLE_SCHEMA) + .withFieldValue( + "id", + // We need this tricky conversion because the original "customers" + // table already + // contains rows 1001, 1002, 1003, 1004. + num <= 1000 + ? Long.valueOf(num).intValue() + : Long.valueOf(num).intValue() + 4) + .withFieldValue("first_name", Long.toString(num)) + .withFieldValue("last_name", Long.toString(writeSize - num)) + .withFieldValue("email", Long.toString(num) + "@beamail.com") + // TODO(pabloem): Add other data types + .build())) + .setRowSchema(TABLE_SCHEMA) + .apply( + JdbcIO.write() + .withTable("inventory.customers") + .withDataSourceProviderFn(DebeziumIOMySqlConnectorIT::getMysqlDatasource)); + + Pipeline readPipeline = Pipeline.create(options); + PCollection result = + PCollectionRowTuple.empty(readPipeline) + .apply( + new DebeziumReadSchemaTransformProvider( + true, Long.valueOf(writeSize).intValue() + 4, testTime) + .from( + DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration + .builder() + .setDatabase("MYSQL") + .setPassword("dbz") + .setUsername("debezium") + .setHost("localhost") + .setTable("inventory.customers") + .setPort(MY_SQL_CONTAINER.getMappedPort(3306)) + .build()) + .buildTransform()) + .get("output"); + + PAssert.that(result) + .satisfies( + rows -> { + assertThat( + Lists.newArrayList(rows).size(), equalTo(Long.valueOf(writeSize + 4).intValue())); + return null; + }); + Thread writeThread = new Thread(() -> writePipeline.run().waitUntilFinish()); + Thread monitorThread = new Thread(this::monitorEssentialMetrics); + monitorThread.start(); + writeThread.start(); + readPipeline.run().waitUntilFinish(); + writeThread.join(); + monitorThread.interrupt(); + monitorThread.join(); + } + /** * Debezium - MySQL connector Test. * diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java index 16efff9e43be7..cc6aee97a4a42 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java @@ -17,20 +17,32 @@ */ package org.apache.beam.io.debezium; -import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import io.debezium.connector.postgresql.PostgresConnector; +import javax.sql.DataSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.joda.time.Duration; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.postgresql.ds.PGSimpleDataSource; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; @@ -51,6 +63,91 @@ public class DebeziumIOPostgresSqlConnectorIT { .withExposedPorts(5432) .withDatabaseName("inventory"); + static final Schema TABLE_SCHEMA = + Schema.builder() + .addInt32Field("id") + .addStringField("first_name") + .addStringField("last_name") + .addStringField("email") + .build(); + + static DataSource getPostgresDatasource() { + PGSimpleDataSource dataSource = new PGSimpleDataSource(); + dataSource.setDatabaseName("inventory"); + dataSource.setServerName(POSTGRES_SQL_CONTAINER.getContainerIpAddress()); + dataSource.setPortNumber(POSTGRES_SQL_CONTAINER.getMappedPort(5432)); + dataSource.setUser("debezium"); + dataSource.setPassword("dbz"); + return dataSource; + } + + @Test + public void testDebeziumSchemaTransformPostgresRead() throws InterruptedException { + long writeSize = 500L; + long testTime = writeSize * 200L; + POSTGRES_SQL_CONTAINER.start(); + + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline writePipeline = Pipeline.create(options); + writePipeline + .apply(GenerateSequence.from(0).to(writeSize).withRate(10, Duration.standardSeconds(1))) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + num -> + Row.withSchema(TABLE_SCHEMA) + .withFieldValue( + "id", + // We need this tricky conversion because the original "customers" + // table already + // contains rows 1001, 1002, 1003, 1004. + num <= 1000 + ? Long.valueOf(num).intValue() + : Long.valueOf(num).intValue() + 4) + .withFieldValue("first_name", Long.toString(num)) + .withFieldValue("last_name", Long.toString(writeSize - num)) + .withFieldValue("email", Long.toString(num) + "@beamail.com") + // TODO(pabloem): Add other data types + .build())) + .setRowSchema(TABLE_SCHEMA) + .apply( + JdbcIO.write() + .withTable("inventory.inventory.customers") + .withDataSourceConfiguration( + JdbcIO.DataSourceConfiguration.create(getPostgresDatasource()))); + + Pipeline readPipeline = Pipeline.create(options); + PCollection result = + PCollectionRowTuple.empty(readPipeline) + .apply( + new DebeziumReadSchemaTransformProvider( + true, Long.valueOf(writeSize).intValue() + 4, testTime) + .from( + DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration + .builder() + .setDatabase("POSTGRES") + .setPassword("dbz") + .setUsername("debezium") + .setHost("localhost") + .setTable("inventory.customers") + .setPort(POSTGRES_SQL_CONTAINER.getMappedPort(5432)) + .build()) + .buildTransform()) + .get("output"); + + PAssert.that(result) + .satisfies( + rows -> { + assertThat( + Lists.newArrayList(rows).size(), equalTo(Long.valueOf(writeSize + 4).intValue())); + return null; + }); + Thread writeThread = new Thread(() -> writePipeline.run().waitUntilFinish()); + writeThread.start(); + readPipeline.run().waitUntilFinish(); + writeThread.join(); + } + /** * Debezium - PostgresSql connector Test. * diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java new file mode 100644 index 0000000000000..7d206f7da8989 --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +import java.time.Duration; +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.kafka.connect.errors.ConnectException; +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +@RunWith(Parameterized.class) +public class DebeziumReadSchemaTransformTest { + + @ClassRule + public static final PostgreSQLContainer POSTGRES_SQL_CONTAINER = + new PostgreSQLContainer<>( + DockerImageName.parse("debezium/example-postgres:latest") + .asCompatibleSubstituteFor("postgres")) + .withPassword("dbz") + .withUsername("debezium") + .withExposedPorts(5432) + .withDatabaseName("inventory"); + + @ClassRule + public static final MySQLContainer MY_SQL_CONTAINER = + new MySQLContainer<>( + DockerImageName.parse("debezium/example-mysql:1.4") + .asCompatibleSubstituteFor("mysql")) + .withPassword("debezium") + .withUsername("mysqluser") + .withExposedPorts(3306) + .waitingFor( + new HttpWaitStrategy() + .forPort(3306) + .forStatusCodeMatching(response -> response == 200) + .withStartupTimeout(Duration.ofMinutes(2))); + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList( + new Object[][] { + {POSTGRES_SQL_CONTAINER, "debezium", "dbz", "POSTGRES", 5432}, + {MY_SQL_CONTAINER, "debezium", "dbz", "MYSQL", 3306} + }); + } + + @Parameterized.Parameter(0) + public Container databaseContainer; + + @Parameterized.Parameter(1) + public String userName; + + @Parameterized.Parameter(2) + public String password; + + @Parameterized.Parameter(3) + public String database; + + @Parameterized.Parameter(4) + public Integer port; + + private PTransform makePtransform( + String user, String password, String database, Integer port, String host) { + return new DebeziumReadSchemaTransformProvider(true, 10, 100L) + .from( + DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration.builder() + .setDatabase(database) + .setPassword(password) + .setUsername(user) + .setHost(host) + // In postgres, this field is "schema.table", while in MySQL it + // is "database.table". + .setTable("inventory.customers") + .setPort(port) + .build()) + .buildTransform(); + } + + @Test + public void testNoProblem() { + Pipeline readPipeline = Pipeline.create(); + PCollection result = + PCollectionRowTuple.empty(readPipeline) + .apply( + makePtransform( + userName, + password, + database, + databaseContainer.getMappedPort(port), + "localhost")) + .get("output"); + assertThat( + result.getSchema().getFields().stream() + .map(field -> field.getName()) + .collect(Collectors.toList()), + Matchers.containsInAnyOrder("before", "after", "source", "op", "ts_ms", "transaction")); + } + + @Test + public void testWrongUser() { + Pipeline readPipeline = Pipeline.create(); + ConnectException ex = + assertThrows( + ConnectException.class, + () -> { + PCollectionRowTuple.empty(readPipeline) + .apply( + makePtransform( + "wrongUser", + password, + database, + databaseContainer.getMappedPort(port), + "localhost")) + .get("output"); + }); + assertThat(ex.getCause().getMessage(), Matchers.containsString("password")); + assertThat(ex.getCause().getMessage(), Matchers.containsString("wrongUser")); + } + + @Test + public void testWrongPassword() { + Pipeline readPipeline = Pipeline.create(); + ConnectException ex = + assertThrows( + ConnectException.class, + () -> { + PCollectionRowTuple.empty(readPipeline) + .apply( + makePtransform( + userName, + "wrongPassword", + database, + databaseContainer.getMappedPort(port), + "localhost")) + .get("output"); + }); + assertThat(ex.getCause().getMessage(), Matchers.containsString("password")); + assertThat(ex.getCause().getMessage(), Matchers.containsString(userName)); + } + + @Test + public void testWrongPort() { + Pipeline readPipeline = Pipeline.create(); + ConnectException ex = + assertThrows( + ConnectException.class, + () -> { + PCollectionRowTuple.empty(readPipeline) + .apply(makePtransform(userName, password, database, 12345, "localhost")) + .get("output"); + }); + Throwable lowestCause = ex.getCause(); + while (lowestCause.getCause() != null) { + lowestCause = lowestCause.getCause(); + } + assertThat(lowestCause.getMessage(), Matchers.containsString("Connection refused")); + } + + @Test + public void testWrongHost() throws Exception { + Pipeline readPipeline = Pipeline.create(); + Exception ex = + assertThrows( + Exception.class, + () -> { + PCollectionRowTuple.empty(readPipeline) + .apply( + makePtransform( + userName, + password, + database, + databaseContainer.getMappedPort(port), + "23.128.129.130")) + .get("output"); + }); + Throwable lowestCause = ex.getCause(); + while (lowestCause.getCause() != null) { + lowestCause = lowestCause.getCause(); + } + assertThat(lowestCause.getMessage(), Matchers.containsString("Connection refused")); + } +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java index 85ca71c7cf432..ca9f69f552191 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java @@ -100,7 +100,7 @@ public void testStoppableKafkaSourceConsumerFn() { .setCoder(VarIntCoder.of()); pipeline.run().waitUntilFinish(); - Assert.assertEquals(3, CounterTask.getCountTasks()); + Assert.assertEquals(1, CounterTask.getCountTasks()); } } diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java index b8a6c9b031e3a..dc4338ac04858 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java @@ -51,12 +51,13 @@ public void testRestrictByNumberOfRecords() throws IOException { @Test public void testRestrictByAmountOfTime() throws IOException, InterruptedException { - long millis = 60 * 1000; - long minutesToRun = 1; Map position = new HashMap<>(); KafkaSourceConsumerFn kafkaSourceConsumerFn = new KafkaSourceConsumerFn( - MySqlConnector.class, new SourceRecordJson.SourceRecordJsonMapper(), minutesToRun); + MySqlConnector.class, + new SourceRecordJson.SourceRecordJsonMapper(), + 100000, + 500L); // Run for 500 ms KafkaSourceConsumerFn.OffsetHolder restriction = kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>()); KafkaSourceConsumerFn.OffsetTracker tracker = @@ -64,7 +65,7 @@ public void testRestrictByAmountOfTime() throws IOException, InterruptedExceptio assertTrue(tracker.tryClaim(position)); - Thread.sleep(minutesToRun * millis + 100); + Thread.sleep(1000); // Sleep for a whole 2 seconds assertFalse(tracker.tryClaim(position)); } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 4f0270358e8b9..9cd9e5561a450 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -2096,7 +2096,17 @@ private List getFilteredFields(Schema schema) { checkState( tableSchema.getFieldCount() >= schema.getFieldCount(), - "Input schema has more fields than actual table."); + String.format( + "Input schema has more fields (%s) than actual table (%s).%n\t" + + "Input schema fields: %s | Table fields: %s", + tableSchema.getFieldCount(), + schema.getFieldCount(), + schema.getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.joining(", ")), + tableSchema.getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.joining(", ")))); // filter out missing fields from output table List missingFields =