Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloem committed Dec 20, 2022
1 parent d5c9751 commit 1383126
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -38,6 +39,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A schema-aware transform provider for {@link DebeziumIO}. This class provides a {@link
* PTransform} that returns a change stream for a relational database.
*
* <p>The transform needs to access the source database <b>on expansion</b> and at <b>pipeline
* runtime</b>. At expansion, the output {@link org.apache.beam.sdk.values.PCollection} schema is
* retrieved, while at runtime, the change stream is consumed.
*
* <p>This transform is tested against <b>MySQL and Postgres</b>, but it should work well for any
* data source supported by Debezium.
*/
public class DebeziumReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration> {
Expand All @@ -53,7 +65,8 @@ public class DebeziumReadSchemaTransformProvider
}

@VisibleForTesting
DebeziumReadSchemaTransformProvider(Boolean isTest, Integer recordLimit, Long timeLimitMs) {
protected DebeziumReadSchemaTransformProvider(
Boolean isTest, Integer recordLimit, Long timeLimitMs) {
this.isTest = isTest;
this.testLimitRecords = recordLimit;
this.testLimitMilliseconds = timeLimitMs;
Expand All @@ -68,6 +81,7 @@ public class DebeziumReadSchemaTransformProvider
@Override
protected @NonNull @Initialized SchemaTransform from(
DebeziumReadSchemaTransformConfiguration configuration) {
// TODO(pabloem): Validate configuration parameters to ensure formatting is correct.
return new SchemaTransform() {
@Override
public @UnknownKeyFor @NonNull @Initialized PTransform<
Expand All @@ -78,19 +92,16 @@ public class DebeziumReadSchemaTransformProvider
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// TODO(pabloem): Test this behavior
if (!Arrays.stream(Connectors.values())
.map(Objects::toString)
.collect(Collectors.toSet())
.contains(configuration.getDatabase())) {
Collection<String> connectors =
Arrays.stream(Connectors.values())
.map(Object::toString)
.collect(Collectors.toSet());
if (!connectors.contains(configuration.getDatabase())) {
throw new IllegalArgumentException(
"Unsupported dabase "
"Unsupported database "
+ configuration.getDatabase()
+ ". Unable to select a JDBC driver for it. Supported Databases are: "
+ String.join(
", ",
Arrays.stream(Connectors.values())
.map(Object::toString)
.collect(Collectors.toList())));
+ String.join(", ", connectors));
}
Class<?> connectorClass =
Objects.requireNonNull(Connectors.valueOf(configuration.getDatabase()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@
*
* <h3>Quick Overview</h3>
*
* SDF used to process records fetched from supported Debezium Connectors.
* This is a Splittable {@link DoFn} used to process records fetched from supported Debezium
* Connectors.
*
* <p>Currently it has a time limiter (see {@link OffsetTracker}) which, if set, it will stop
* automatically after the specified elapsed minutes. Otherwise, it will keep running until the user
* explicitly interrupts it.
*
* <p>It might be initialized either as:
*
* <pre>KafkaSourceConsumerFn(connectorClass, SourceRecordMapper)</pre>
* <pre>KafkaSourceConsumerFn(connectorClass, SourceRecordMapper, maxRecords, milisecondsToRun)
* </pre>
*
* Or with a time limiter:
*
Expand All @@ -79,8 +81,8 @@ public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
private final Class<? extends SourceConnector> connectorClass;
private final SourceRecordMapper<T> fn;

private Long milisecondsToRun = null;
private Integer maxRecords;
private final Long milisecondsToRun;
private final Integer maxRecords;

private static DateTime startTime;
private static final Map<String, RestrictionTracker<OffsetHolder, Map<String, Object>>>
Expand All @@ -91,6 +93,7 @@ public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
*
* @param connectorClass Supported Debezium connector class
* @param fn a SourceRecordMapper
* @param maxRecords Maximum number of records to fetch before finishing.
* @param milisecondsToRun Maximum time to run (in milliseconds)
*/
KafkaSourceConsumerFn(
Expand All @@ -109,11 +112,10 @@ public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
*
* @param connectorClass Supported Debezium connector class
* @param fn a SourceRecordMapper
* @param maxRecords Maximum number of records to fetch before finishing.
*/
KafkaSourceConsumerFn(Class<?> connectorClass, SourceRecordMapper<T> fn, Integer maxRecords) {
this.connectorClass = (Class<? extends SourceConnector>) connectorClass;
this.fn = fn;
this.maxRecords = maxRecords;
this(connectorClass, fn, maxRecords, null);
}

@GetInitialRestriction
Expand Down

0 comments on commit 1383126

Please sign in to comment.