diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index 18c4307f29612..448bc4c31e57b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -852,7 +852,12 @@ public Row build() { throw new IllegalArgumentException( "Row expected " + schema.getFieldCount() - + " fields. initialized with " + + String.format( + " fields (%s).", + schema.getFields().stream() + .map(Object::toString) + .collect(Collectors.joining(", "))) + + " initialized with " + values.size() + " fields."); } diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index bf5d09382873c..146d40cc70dce 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -43,6 +43,7 @@ dependencies { // Test dependencies testImplementation library.java.junit + testImplementation project(path: ":sdks:java:io:jdbc") testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation project(":runners:google-cloud-dataflow-java") diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java index 7aa8b6d0dc7f5..4fefe917e8c43 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/Connectors.java @@ -18,9 +18,7 @@ package org.apache.beam.io.debezium; import org.apache.kafka.connect.source.SourceConnector; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.checkerframework.checker.nullness.qual.NonNull; /** Enumeration of debezium connectors. */ public enum Connectors { @@ -30,7 +28,6 @@ public enum Connectors { ORACLE("Oracle", "io.debezium.connector.oracle.OracleConnector"), DB2("DB2", "io.debezium.connector.db2.Db2Connector"), ; - private static final Logger LOG = LoggerFactory.getLogger(Connectors.class); private final String name; private final String connector; @@ -45,12 +42,14 @@ public String getName() { } /** Class connector to debezium. */ - public @Nullable Class getConnector() { + public @NonNull Class getConnector() { Class connectorClass = null; try { connectorClass = (Class) Class.forName(this.connector); } catch (ClassCastException | ClassNotFoundException e) { - LOG.error("Connector class is not found", e); + throw new IllegalArgumentException( + String.format( + "Unable to resolve class %s to use as Debezium connector.", this.connector)); } return connectorClass; } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java index b7c084fcba78d..9697a3da65bf5 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -37,6 +38,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,6 +204,25 @@ public Read withMaxNumberOfRecords(Integer maxNumberOfRecords) { return toBuilder().setMaxNumberOfRecords(maxNumberOfRecords).build(); } + protected Schema getRecordSchema() { + SourceRecord sampledRecord = + new KafkaSourceConsumerFn<>( + getConnectorConfiguration().getConnectorClass().get(), + getFormatFunction(), + getMaxNumberOfRecords()) + .getOneRecord(getConnectorConfiguration().getConfigurationMap()); + Schema keySchema = + sampledRecord.keySchema() != null + ? KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.keySchema()) + : Schema.builder().build(); + Schema valueSchema = + KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(sampledRecord.valueSchema()); + return Schema.builder() + .addNullableField("key", Schema.FieldType.row(keySchema)) + .addRowField("value", valueSchema) + .build(); + } + @Override public PCollection expand(PBegin input) { return input diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java new file mode 100644 index 0000000000000..74ba8bc1aae91 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DebeziumReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration> { + + private static final Logger LOG = + LoggerFactory.getLogger(DebeziumReadSchemaTransformProvider.class); + private final Boolean isTest; + private final Integer testLimitRecords; + + DebeziumReadSchemaTransformProvider() { + this(false, 0); + } + + @VisibleForTesting + DebeziumReadSchemaTransformProvider(Boolean isTest, Integer recordLimit) { + this.isTest = isTest; + this.testLimitRecords = recordLimit; + } + + @Override + protected @NonNull @Initialized Class + configurationClass() { + return DebeziumReadSchemaTransformConfiguration.class; + } + + @Override + protected @NonNull @Initialized SchemaTransform from( + DebeziumReadSchemaTransformConfiguration configuration) { + return new SchemaTransform() { + @Override + public @UnknownKeyFor @NonNull @Initialized PTransform< + @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, + @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> + buildTransform() { + return new PTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + // TODO(pabloem): Test this behavior + if (!Arrays.stream(Connectors.values()) + .map(Objects::toString) + .collect(Collectors.toSet()) + .contains(configuration.getDatabase())) { + throw new IllegalArgumentException( + "Unsupported dabase " + + configuration.getDatabase() + + ". Unable to select a JDBC driver for it. Supported Databases are: " + + String.join( + ", ", + Arrays.stream(Connectors.values()) + .map(Object::toString) + .collect(Collectors.toList()))); + } + Class connectorClass = + Objects.requireNonNull(Connectors.valueOf(configuration.getDatabase())) + .getConnector(); + DebeziumIO.ConnectorConfiguration connectorConfiguration = + DebeziumIO.ConnectorConfiguration.create() + .withUsername(configuration.getUsername()) + .withPassword(configuration.getPassword()) + .withHostName(configuration.getHost()) + .withPort(Integer.toString(configuration.getPort())) + .withConnectorClass(connectorClass); + connectorConfiguration = + connectorConfiguration + .withConnectionProperty("table.include.list", configuration.getTable()) + .withConnectionProperty("include.schema.changes", "false") + .withConnectionProperty("database.server.name", "beam-pipeline-server") + .withConnectionProperty("database.dbname", "inventory") + .withConnectionProperty("database.include.list", "inventory"); + + final List debeziumConnectionProperties = + configuration.getDebeziumConnectionProperties(); + if (debeziumConnectionProperties != null) { + for (String connectionProperty : debeziumConnectionProperties) { + String[] parts = connectionProperty.split("=", -1); + String key = parts[0]; + String value = parts[1]; + connectorConfiguration.withConnectionProperty(key, value); + } + } + + DebeziumIO.Read readTransform = + DebeziumIO.read().withConnectorConfiguration(connectorConfiguration); + + if (isTest) { + readTransform = readTransform.withMaxNumberOfRecords(testLimitRecords); + } + + // TODO(pabloem): Database connection issues can be debugged here. + Schema recordSchema = readTransform.getRecordSchema(); + LOG.info( + "Computed schema for table {} from {}: {}", + configuration.getTable(), + configuration.getDatabase(), + recordSchema); + SourceRecordMapper formatFn = + KafkaConnectUtils.beamRowFromSourceRecordFn(recordSchema); + readTransform = + readTransform.withFormatFunction(formatFn).withCoder(RowCoder.of(recordSchema)); + + return PCollectionRowTuple.of("output", input.getPipeline().apply(readTransform)); + } + }; + } + }; + } + + @Override + public @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:debezium_read:v1"; + } + + @Override + public @NonNull @Initialized List<@NonNull @Initialized String> inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public @NonNull @Initialized List<@NonNull @Initialized String> outputCollectionNames() { + return Collections.singletonList("output"); + } + + @AutoValue + public abstract static class DebeziumReadSchemaTransformConfiguration { + public abstract String getUsername(); + + public abstract String getPassword(); + + public abstract String getHost(); + + public abstract Integer getPort(); + + public abstract String getTable(); + + public abstract @NonNull String getDatabase(); + + public abstract @Nullable List getDebeziumConnectionProperties(); + + public static Builder builder() { + return new AutoValue_DebeziumReadSchemaTransformProvider_DebeziumReadSchemaTransformConfiguration + .Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUsername(String username); + + public abstract Builder setPassword(String password); + + public abstract Builder setHost(String host); + + public abstract Builder setPort(Integer port); + + public abstract Builder setDatabase(String database); + + public abstract Builder setTable(String table); + + public abstract Builder setDebeziumConnectionProperties( + List debeziumConnectionProperties); + + public abstract DebeziumReadSchemaTransformConfiguration build(); + } + } +} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java index abf5703bf2092..b302ef9a751a7 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java @@ -18,6 +18,9 @@ package org.apache.beam.io.debezium; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; public class KafkaConnectUtils { public static Schema beamSchemaFromKafkaConnectSchema( @@ -74,4 +77,54 @@ public static Schema.FieldType beamSchemaTypeFromKafkaType( "Unable to convert Kafka field schema %s to Beam Schema", kafkaFieldSchema)); } } + + public static SourceRecordMapper beamRowFromSourceRecordFn(Schema recordSchema) { + final Schema keySchema = recordSchema.getField("key").getType().getRowSchema(); + final Schema valueSchema = recordSchema.getField("value").getType().getRowSchema(); + if (keySchema == null || valueSchema == null) { + throw new IllegalArgumentException("Improper schema for Beam record " + recordSchema); + } + return new SourceRecordMapper() { + @Override + public Row mapSourceRecord(SourceRecord sourceRecord) throws Exception { + return Row.withSchema(recordSchema) + .withFieldValue("key", beamRowFromKafkaStruct((Struct) sourceRecord.key(), keySchema)) + .withFieldValue( + "value", beamRowFromKafkaStruct((Struct) sourceRecord.value(), valueSchema)) + .build(); + } + + private Row beamRowFromKafkaStruct(Struct kafkaStruct, Schema beamSchema) { + Row.Builder rowBuilder = Row.withSchema(beamSchema); + for (Schema.Field f : beamSchema.getFields()) { + Object structField = kafkaStruct.getWithoutDefault(f.getName()); + switch (kafkaStruct.schema().field(f.getName()).schema().type()) { + case ARRAY: + case MAP: + // TODO(pabloem): Handle nested structs + throw new IllegalArgumentException("UNABLE TO CONVERT FIELD " + f); + case STRUCT: + Schema fieldSchema = f.getType().getRowSchema(); + if (fieldSchema == null) { + throw new IllegalArgumentException( + "Improper schema for Beam record " + fieldSchema); + } + if (structField == null) { + // If the field is null, then we must add a null field to ensure we encode things + // properly. + rowBuilder = rowBuilder.addValue(null); + break; + } + rowBuilder = + rowBuilder.addValue(beamRowFromKafkaStruct((Struct) structField, fieldSchema)); + break; + default: + rowBuilder = rowBuilder.addValue(structField); + break; + } + } + return rowBuilder.build(); + } + }; + } } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java index 6d5658f72b06e..fbacc13a341f0 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -25,6 +25,7 @@ import io.debezium.relational.history.HistoryRecord; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -127,6 +128,27 @@ public Coder getRestrictionCoder() { return SerializableCoder.of(OffsetHolder.class); } + protected SourceRecord getOneRecord(Map configuration) { + try { + SourceConnector connector = connectorClass.getDeclaredConstructor().newInstance(); + connector.start(configuration); + + SourceTask task = (SourceTask) connector.taskClass().getDeclaredConstructor().newInstance(); + task.initialize(new BeamSourceTaskContext(null)); + task.start(connector.taskConfigs(1).get(0)); + List records = task.poll(); + task.stop(); + connector.stop(); + return records.get(0); + } catch (NoSuchMethodException + | InterruptedException + | InvocationTargetException + | IllegalAccessException + | InstantiationException e) { + throw new RuntimeException("AI DIOS!"); + } + } + /** * Process the retrieved element. Currently it just logs the retrieved record as JSON. * @@ -188,10 +210,7 @@ public ProcessContinuation process( task.commit(); } } catch (Exception ex) { - LOG.error( - "-------- Error on consumer: {}. with stacktrace: {}", - ex.getMessage(), - ex.getStackTrace()); + throw new RuntimeException("Error occurred when consuming changes from Database. ", ex); } finally { restrictionTrackers.remove(this.getHashCode()); diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java index 16efff9e43be7..bc8cf19badbe9 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java @@ -21,16 +21,29 @@ import static org.hamcrest.MatcherAssert.assertThat; import io.debezium.connector.postgresql.PostgresConnector; +import javax.sql.DataSource; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.joda.time.Duration; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.postgresql.ds.PGSimpleDataSource; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; @@ -51,6 +64,87 @@ public class DebeziumIOPostgresSqlConnectorIT { .withExposedPorts(5432) .withDatabaseName("inventory"); + static final Schema TABLE_SCHEMA = + Schema.builder() + .addInt32Field("id") + .addStringField("first_name") + .addStringField("last_name") + .addStringField("email") + .build(); + + static DataSource getPostgresDatasource() { + PGSimpleDataSource dataSource = new PGSimpleDataSource(); + dataSource.setDatabaseName("inventory"); + dataSource.setServerName(POSTGRES_SQL_CONTAINER.getContainerIpAddress()); + dataSource.setPortNumber(POSTGRES_SQL_CONTAINER.getMappedPort(5432)); + dataSource.setUser("debezium"); + dataSource.setPassword("dbz"); + return dataSource; + } + + @Test + public void testDebeziumSchemaTransformPostgresRead() { + long WRITE_SIZE = 1000L; + POSTGRES_SQL_CONTAINER.start(); + + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline writePipeline = Pipeline.create(options); + writePipeline + .apply(GenerateSequence.from(0).to(WRITE_SIZE).withRate(10, Duration.standardSeconds(1))) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + num -> + Row.withSchema(TABLE_SCHEMA) + .withFieldValue("id", Long.valueOf(num).intValue()) + .withFieldValue("first_name", Long.toString(num)) + .withFieldValue("last_name", Long.toString(WRITE_SIZE - num)) + .withFieldValue("email", Long.toString(num) + "@beamail.com") + // .withFieldValue("name", + // Long.toString(num)) + // .withFieldValue("age", num % 100) + // .withFieldValue("temperature", num / + // 100.0) + // .withFieldValue("distance", num * 1000.0) + // .withFieldValue("birthYear", num) + // TODO(pabloem): Add other data types + .build())) + .setRowSchema(TABLE_SCHEMA) + .apply( + JdbcIO.write() + .withTable("inventory.inventory.customers") + .withDataSourceConfiguration( + JdbcIO.DataSourceConfiguration.create(getPostgresDatasource()))); + + Pipeline readPipeline = Pipeline.create(options); + PCollection result = + PCollectionRowTuple.empty(readPipeline) + .apply( + new DebeziumReadSchemaTransformProvider(true, 1004) + .from( + DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration + .builder() + .setDatabase("POSTGRES") + .setPassword("dbz") + .setUsername("debezium") + .setHost("localhost") + .setTable("inventory.customers") + .setPort(POSTGRES_SQL_CONTAINER.getMappedPort(5432)) + .build()) + .buildTransform()) + .get("output"); + + PAssert.that(result) + .satisfies(rows -> { + assert Lists.newArrayList(rows).size() == 1004; + return null; + }); + + PipelineResult writeResult = writePipeline.run(); + readPipeline.run().waitUntilFinish(); + writeResult.waitUntilFinish(); + } + /** * Debezium - PostgresSql connector Test. * diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 4f0270358e8b9..d29bcf7004071 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -2096,7 +2096,17 @@ private List getFilteredFields(Schema schema) { checkState( tableSchema.getFieldCount() >= schema.getFieldCount(), - "Input schema has more fields than actual table."); + String.format( + "Input schema has more fields (%s) than actual table (%s).\n\t" + + "Input schema fields: %s | Table fields: %s", + tableSchema.getFieldCount(), + schema.getFieldCount(), + schema.getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.joining(", ")), + tableSchema.getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.joining(", ")))); // filter out missing fields from output table List missingFields =