Skip to content

Commit

Permalink
prototyping a debezium-based schema transform for continuous ingestio…
Browse files Browse the repository at this point in the history
…n of change streams from databases
  • Loading branch information
pabloem committed Dec 9, 2022
1 parent 61e2ec1 commit ecd8c76
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,12 @@ public Row build() {
throw new IllegalArgumentException(
"Row expected "
+ schema.getFieldCount()
+ " fields. initialized with "
+ String.format(
" fields (%s).",
schema.getFields().stream()
.map(Object::toString)
.collect(Collectors.joining(", ")))
+ " initialized with "
+ values.size()
+ " fields.");
}
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {

// Test dependencies
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:jdbc")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(":runners:google-cloud-dataflow-java")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.beam.io.debezium;

import org.apache.kafka.connect.source.SourceConnector;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.checkerframework.checker.nullness.qual.NonNull;

/** Enumeration of debezium connectors. */
public enum Connectors {
Expand All @@ -30,7 +28,6 @@ public enum Connectors {
ORACLE("Oracle", "io.debezium.connector.oracle.OracleConnector"),
DB2("DB2", "io.debezium.connector.db2.Db2Connector"),
;
private static final Logger LOG = LoggerFactory.getLogger(Connectors.class);
private final String name;
private final String connector;

Expand All @@ -45,12 +42,14 @@ public String getName() {
}

/** Class connector to debezium. */
public @Nullable Class<? extends SourceConnector> getConnector() {
public @NonNull Class<? extends SourceConnector> getConnector() {
Class<? extends SourceConnector> connectorClass = null;
try {
connectorClass = (Class<? extends SourceConnector>) Class.forName(this.connector);
} catch (ClassCastException | ClassNotFoundException e) {
LOG.error("Connector class is not found", e);
throw new IllegalArgumentException(
String.format(
"Unable to resolve class %s to use as Debezium connector.", this.connector));
}
return connectorClass;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -37,6 +38,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -202,6 +204,25 @@ public Read<T> withMaxNumberOfRecords(Integer maxNumberOfRecords) {
return toBuilder().setMaxNumberOfRecords(maxNumberOfRecords).build();
}

protected Schema getRecordSchema() {
SourceRecord sampledRecord =
new KafkaSourceConsumerFn<>(
getConnectorConfiguration().getConnectorClass().get(),
getFormatFunction(),
getMaxNumberOfRecords())
.getOneRecord(getConnectorConfiguration().getConfigurationMap());
Schema keySchema =
sampledRecord.keySchema() != null
? KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.keySchema())
: Schema.builder().build();
Schema valueSchema =
KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.valueSchema());
return Schema.builder()
.addNullableField("key", Schema.FieldType.row(keySchema))
.addRowField("value", valueSchema)
.build();
}

@Override
public PCollection<T> expand(PBegin input) {
return input
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.debezium;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebeziumReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration> {

private static final Logger LOG =
LoggerFactory.getLogger(DebeziumReadSchemaTransformProvider.class);
private final Boolean isTest;
private final Integer testLimitRecords;

DebeziumReadSchemaTransformProvider() {
this(false, 0);
}

@VisibleForTesting
DebeziumReadSchemaTransformProvider(Boolean isTest, Integer recordLimit) {
this.isTest = isTest;
this.testLimitRecords = recordLimit;
}

@Override
protected @NonNull @Initialized Class<DebeziumReadSchemaTransformConfiguration>
configurationClass() {
return DebeziumReadSchemaTransformConfiguration.class;
}

@Override
protected @NonNull @Initialized SchemaTransform from(
DebeziumReadSchemaTransformConfiguration configuration) {
return new SchemaTransform() {
@Override
public @UnknownKeyFor @NonNull @Initialized PTransform<
@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
buildTransform() {
return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// TODO(pabloem): Test this behavior
if (!Arrays.stream(Connectors.values())
.map(Objects::toString)
.collect(Collectors.toSet())
.contains(configuration.getDatabase())) {
throw new IllegalArgumentException(
"Unsupported dabase "
+ configuration.getDatabase()
+ ". Unable to select a JDBC driver for it. Supported Databases are: "
+ String.join(
", ",
Arrays.stream(Connectors.values())
.map(Object::toString)
.collect(Collectors.toList())));
}
Class<?> connectorClass =
Objects.requireNonNull(Connectors.valueOf(configuration.getDatabase()))
.getConnector();
DebeziumIO.ConnectorConfiguration connectorConfiguration =
DebeziumIO.ConnectorConfiguration.create()
.withUsername(configuration.getUsername())
.withPassword(configuration.getPassword())
.withHostName(configuration.getHost())
.withPort(Integer.toString(configuration.getPort()))
.withConnectorClass(connectorClass);
connectorConfiguration =
connectorConfiguration
.withConnectionProperty("table.include.list", configuration.getTable())
.withConnectionProperty("include.schema.changes", "false")
.withConnectionProperty("database.server.name", "beam-pipeline-server")
.withConnectionProperty("database.dbname", "inventory")
.withConnectionProperty("database.include.list", "inventory");

final List<String> debeziumConnectionProperties =
configuration.getDebeziumConnectionProperties();
if (debeziumConnectionProperties != null) {
for (String connectionProperty : debeziumConnectionProperties) {
String[] parts = connectionProperty.split("=", -1);
String key = parts[0];
String value = parts[1];
connectorConfiguration.withConnectionProperty(key, value);
}
}

DebeziumIO.Read<Row> readTransform =
DebeziumIO.<Row>read().withConnectorConfiguration(connectorConfiguration);

if (isTest) {
readTransform = readTransform.withMaxNumberOfRecords(testLimitRecords);
}

// TODO(pabloem): Database connection issues can be debugged here.
Schema recordSchema = readTransform.getRecordSchema();
LOG.info(
"Computed schema for table {} from {}: {}",
configuration.getTable(),
configuration.getDatabase(),
recordSchema);
SourceRecordMapper<Row> formatFn =
KafkaConnectUtils.beamRowFromSourceRecordFn(recordSchema);
readTransform =
readTransform.withFormatFunction(formatFn).withCoder(RowCoder.of(recordSchema));

return PCollectionRowTuple.of("output", input.getPipeline().apply(readTransform));
}
};
}
};
}

@Override
public @NonNull @Initialized String identifier() {
return "beam:schematransform:org.apache.beam:debezium_read:v1";
}

@Override
public @NonNull @Initialized List<@NonNull @Initialized String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @NonNull @Initialized List<@NonNull @Initialized String> outputCollectionNames() {
return Collections.singletonList("output");
}

@AutoValue
public abstract static class DebeziumReadSchemaTransformConfiguration {
public abstract String getUsername();

public abstract String getPassword();

public abstract String getHost();

public abstract Integer getPort();

public abstract String getTable();

public abstract @NonNull String getDatabase();

public abstract @Nullable List<String> getDebeziumConnectionProperties();

public static Builder builder() {
return new AutoValue_DebeziumReadSchemaTransformProvider_DebeziumReadSchemaTransformConfiguration
.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setUsername(String username);

public abstract Builder setPassword(String password);

public abstract Builder setHost(String host);

public abstract Builder setPort(Integer port);

public abstract Builder setDatabase(String database);

public abstract Builder setTable(String table);

public abstract Builder setDebeziumConnectionProperties(
List<String> debeziumConnectionProperties);

public abstract DebeziumReadSchemaTransformConfiguration build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.beam.io.debezium;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class KafkaConnectUtils {
public static Schema beamSchemaFromKafkaConnectSchema(
Expand Down Expand Up @@ -74,4 +77,54 @@ public static Schema.FieldType beamSchemaTypeFromKafkaType(
"Unable to convert Kafka field schema %s to Beam Schema", kafkaFieldSchema));
}
}

public static SourceRecordMapper<Row> beamRowFromSourceRecordFn(Schema recordSchema) {
final Schema keySchema = recordSchema.getField("key").getType().getRowSchema();
final Schema valueSchema = recordSchema.getField("value").getType().getRowSchema();
if (keySchema == null || valueSchema == null) {
throw new IllegalArgumentException("Improper schema for Beam record " + recordSchema);
}
return new SourceRecordMapper<Row>() {
@Override
public Row mapSourceRecord(SourceRecord sourceRecord) throws Exception {
return Row.withSchema(recordSchema)
.withFieldValue("key", beamRowFromKafkaStruct((Struct) sourceRecord.key(), keySchema))
.withFieldValue(
"value", beamRowFromKafkaStruct((Struct) sourceRecord.value(), valueSchema))
.build();
}

private Row beamRowFromKafkaStruct(Struct kafkaStruct, Schema beamSchema) {
Row.Builder rowBuilder = Row.withSchema(beamSchema);
for (Schema.Field f : beamSchema.getFields()) {
Object structField = kafkaStruct.getWithoutDefault(f.getName());
switch (kafkaStruct.schema().field(f.getName()).schema().type()) {
case ARRAY:
case MAP:
// TODO(pabloem): Handle nested structs
throw new IllegalArgumentException("UNABLE TO CONVERT FIELD " + f);
case STRUCT:
Schema fieldSchema = f.getType().getRowSchema();
if (fieldSchema == null) {
throw new IllegalArgumentException(
"Improper schema for Beam record " + fieldSchema);
}
if (structField == null) {
// If the field is null, then we must add a null field to ensure we encode things
// properly.
rowBuilder = rowBuilder.addValue(null);
break;
}
rowBuilder =
rowBuilder.addValue(beamRowFromKafkaStruct((Struct) structField, fieldSchema));
break;
default:
rowBuilder = rowBuilder.addValue(structField);
break;
}
}
return rowBuilder.build();
}
};
}
}
Loading

0 comments on commit ecd8c76

Please sign in to comment.