Skip to content

Commit

Permalink
Debeziumio PoC (apache#7)
Browse files Browse the repository at this point in the history
* New DebeziumIO class.

* Merge connector code

* DebeziumIO and MySqlConnector integrated.

* Added FormatFuntion param to Read builder on DebeziumIO.

* Added arguments checker to DebeziumIO.

* Add simple JSON mapper object (#1)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>

* Add debeziumio tests

* Debeziumio testing json mapper (#3)

* Some code refactors. Use a default DBHistory if not provided

* Add basic tests for Json mapper

* Debeziumio time restriction (apache#5)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

* Some code refactors. Use a default DBHistory if not provided

* Adding based-time restriction

Stop polling after specified amount of time

* Add basic tests for Json mapper

* Adding new restriction

Uses a time-based restriction

* Adding optional restrcition

Uses an optional time-based restriction

Co-authored-by: juanitodread <juanitodread@gmail.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>

* Upgrade DebeziumIO connector (apache#4)

* Address comments (Change dependencies to testCompile, Set JsonMapper/Coder as default, refactors) (apache#8)

* Revert file

* Change dependencies to testCompile
* Move Counter sample to unit test

* Set JsonMapper as default mapper function
* Set String Coder as default coder when using JsonMapper
* Change logs from info to debug

* Debeziumio javadoc (apache#9)

* Adding javadoc

* Added some titles and examples

* Added SourceRecordJson doc

* Added Basic Connector doc

* Added KafkaSourceConsumer doc

* Javadoc cleanup

* Removing BasicConnector

No usages of this class were found overall

* Editing documentation

* Debeziumio fetched records restriction (apache#10)

* Adding javadoc

* Adding restriction by number of fetched records

Also adding a quick-fix for null value within SourceRecords
Minor fix on both MySQL and PostgreSQL Connectors Tests

* Run either by time or by number of records

* Added DebeziumOffsetTrackerTest

Tests both restrictions: By amount of time and by Number of records

* Removing comment

* DebeziumIO test for DB2. (apache#11)

* DebeziumIO test for DB2.

* DebeziumIO javadoc.

* Clean code:removed commented code lines on DebeziumIOConnectorTest.java

* Clean code:removing unused imports and using readAsJson().

Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>

* Debezium limit records (now configurable) (apache#12)

* Adding javadoc

* Records Limit is now configurable

(It was fixed before)

* Debeziumio dockerize (apache#13)

* Add mysql docker container to tests

* Move debezium mysql integration test to its own file

* Add assertion to verify that the results contains a record.

* Debeziumio readme (apache#15)

* Adding javadoc

* Adding README file

* Add number of records configuration to the DebeziumIO component (apache#16)

* Code refactors (apache#17)

* Remove/ignore null warnings

* Remove DB2 code

* Remove docker dependency in DebeziumIO unit test and max number of recods to MySql integration test

* Change access modifiers accordingly

* Remove incomplete integration tests (Postgres and SqlServer)

* Add experimenal tag

* Debezium testing stoppable consumer (apache#18)

* Add try-catch-finally, stop SourceTask at finally.

* Fix warnings

* stopConsumer and processedRecords local variables removed. UT for task stop use case added

* Fix minor code style issue

Co-authored-by: juanitodread <juanitodread@gmail.com>

* Fix style issues (check, spotlessApply) (apache#19)

Co-authored-by: Osvaldo Salinas <osvaldo.salinas@osvaldo.salinas>
Co-authored-by: alejandro.maguey <alejandro.maguey@wizeline.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>
Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>
Co-authored-by: Alejandro Maguey <alexmaguey1@gmail.com>
Co-authored-by: Hassan Reyes <hassanreyes@users.noreply.github.com>
  • Loading branch information
9 people authored Feb 12, 2021
1 parent e33afae commit cfbbbfd
Show file tree
Hide file tree
Showing 14 changed files with 1,990 additions and 425 deletions.
8 changes: 5 additions & 3 deletions sdks/java/io/cdc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ dependencies {
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testCompile project(":runners:google-cloud-dataflow-java")
testCompile "org.testcontainers:testcontainers:1.15.1"
testCompile "org.testcontainers:mysql:1.15.1"

// Kafka connect dependencies
compile "org.apache.kafka:connect-api:2.5.0"
compile "org.apache.kafka:connect-json:2.5.0"

// Debezium depenedencies
compile group: 'io.debezium', name: 'debezium-core', version: '1.3.0.Final'
compile group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.0.Final'
// Debezium dependencies
compile group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final'
testCompile group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final'
}

test {
Expand Down
159 changes: 159 additions & 0 deletions sdks/java/io/cdc/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# DebeziumIO
## Connect your Debezium Databases to Apache Beam easily.

### What is DebeziumIO?
DebeziumIO is an Apache Beam connector that lets users connect their Events-Driven Databases on [Debezium](https://debezium.io) to [Apache Beam](https://beam.apache.org/) without the need to set up a [Kafka](https://kafka.apache.org/) instance.

### Getting Started

DebeziumIO uses [Debezium Connectors v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a [Serializable Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html), then you will be able to connect to Apache Beam and start building your own Pipelines.

These connectors have been successfully tested and are known to work fine:
* MySQL Connector
* PostgreSQL Connector
* SQLServer Connector
* DB2 Connector

Other connectors might also work.


Setting up a connector and running a Pipeline should be as simple as:
```
Pipeline p = Pipeline.create(); // Create a Pipeline
p.apply(DebeziumIO.<String>read()
.withConnectorConfiguration(...) // Debezium Connector setup
.withFormatFunction(...) // Serializable Function to use
).setCoder(StringUtf8Coder.of());
p.run().waitUntilFinish(); // Run your pipeline!
```

### Setting up a Debezium Connector

DebeziumIO comes with a handy ConnectorConfiguration builder, which lets you provide all the configuration needed to access your Debezium Database.

A basic configuration such as **username**, **password**, **port number**, and **host name** must be specified along with the **Debezium Connector class** you will use by using these methods:

|Method|Param|Description|
|-|-|-|
|`.withConnectorClass(connectorClass)`|_Class_|Debezium Connector|
|`.withUsername(username)`|_String_|Database Username|
|`.withPassword(password)`|_String_|Database Password|
|`.withHostName(hostname)`|_String_|Database Hostname|
|`.withPort(portNumber)`|_String_|Database Port number|

You can also add more configuration, such as Connector-specific Properties with the `_withConnectionProperty_` method:

|Method|Params|Description|
|-|-|-|
|`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a custom property to the connector.|
> **Note:** For more information on custom properties, see your [Debezium Connector](https://debezium.io/documentation/reference/1.3/connectors/) specific documentation.
Example of a MySQL Debezium Connector setup:
```
DebeziumIO.ConnectorConfiguration.create()
.withUsername("dbUsername")
.withPassword("dbPassword")
.withConnectorClass(MySqlConnector.class)
.withHostName("127.0.0.1")
.withPort("3306")
.withConnectionProperty("database.server.id", "serverId")
.withConnectionProperty("database.server.name", "serverName")
.withConnectionProperty("database.include.list", "dbName")
.withConnectionProperty("include.schema.changes", "false")
```

### Setting a Serializable Function

A serializable function is required to depict each `SourceRecord` fetched from the Database.

DebeziumIO comes with a built-in JSON Mapper that you can optionally use to map every `SourceRecord` fetched from the Database to a JSON object. This helps users visualize and access their data in a simple way.

If you want to use this built-in JSON Mapper, you can do it by setting an instance of **SourceRecordJsonMapper** as a Serializable Function to the DebeziumIO:
```
.withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
```
> **Note:** `SourceRecordJsonMapper`comes out of the box, but you may use any Format Function you prefer.
## Quick Example

The following example is how an actual setup would look like using a **MySQL Debezium Connector** and **SourceRecordJsonMapper** as the Serializable Function.
```
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(DebeziumIO.<String>read().
withConnectorConfiguration( // Debezium Connector setup
DebeziumIO.ConnectorConfiguration.create()
.withUsername("debezium")
.withPassword("dbz")
.withConnectorClass(MySqlConnector.class)
.withHostName("127.0.0.1")
.withPort("3306")
.withConnectionProperty("database.server.id", "184054")
.withConnectionProperty("database.server.name", "dbserver1")
.withConnectionProperty("database.include.list", "inventory")
.withConnectionProperty("include.schema.changes", "false")
).withFormatFunction(
new SourceRecordJson.SourceRecordJsonMapper() // Serializable Function
)
).setCoder(StringUtf8Coder.of());
p.run().waitUntilFinish();
```

## Shortcut!

If you will be using the built-in **SourceRecordJsonMapper** as your Serializable Function for all your pipelines, you should use **readAsJson()**.

DebeziumIO comes with a method called `readAsJson`, which automatically sets the `SourceRecordJsonMapper` as the Serializable Function for your pipeline. This way, you would need to setup your connector before running your pipeline, without explicitly setting a Serializable Function.

Example of using **readAsJson**:
```
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(DebeziumIO.<String>read().
withConnectorConfiguration( // Debezium Connector setup
DebeziumIO.ConnectorConfiguration.create()
.withUsername("debezium")
.withPassword("dbz")
.withConnectorClass(MySqlConnector.class)
.withHostName("127.0.0.1")
.withPort("3306")
.withConnectionProperty("database.server.id", "184054")
.withConnectionProperty("database.server.name", "dbserver1")
.withConnectionProperty("database.include.list", "inventory")
.withConnectionProperty("include.schema.changes", "false"));
p.run().waitUntilFinish();
```

## Under the hood

### KafkaSourceConsumerFn and Restrictions

KafkaSourceConsumerFn (KSC onwards) is a [DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html) in charge of the Database replication and CDC.

There are two ways of initializing KSC:
* Restricted by number of records
* Restricted by amount of time (minutes)

By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter:

|Function|Param|Description|
|-|-|-|
|`KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)`|_Class, SourceRecordMapper, Int_|Restrict run by number of records (Default).|
|`KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)`|_Class, SourceRecordMapper, Long_|Restrict run by amount of time (in minutes).|

### Requirements and Supported versions

- JDK v8
- Debezium Connectors v1.3
- Apache Beam 2.25

## Running Unit Tests

You can run Unit Tests using **gradlew**.

Example of running the MySQL Connector Test:
```
./gradlew :sdks:java:io:cdc:test --tests="**testDebeziumIOMySql"
```

This file was deleted.

Loading

0 comments on commit cfbbbfd

Please sign in to comment.