The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB. This document describes how to setup the MongoDB CDC connector to run SQL queries against MongoDB.
In order to setup the MongoDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.3-SNAPSHOT</version>
</dependency>
Download link is available only for stable releases.
Download flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/
.
Note: flink-sql-connector-mongodb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-mongodb-cdc-XXX.jar, the released version will be available in the Maven central warehouse.
-
MongoDB version
MongoDB version >= 3.6
We use change streams feature (new in version 3.6) to capture change data. -
Cluster Deployment
replica sets or sharded clusters is required.
-
Storage Engine
WiredTiger storage engine is required.
-
Replica set protocol version 1 (pv1) is required.
Starting in version 4.0, MongoDB only supports pv1. pv1 is the default for all new replica sets created with MongoDB 3.2 or later. -
Privileges
changeStream
andread
privileges are required by MongoDB Kafka Connector.You can use the following example for simple authorization.
For more detailed authorization, please refer to MongoDB Database User Roles.use admin; db.createUser({ user: "flinkuser", pwd: "flinkpw", roles: [ { role: "read", db: "admin" }, //read role includes changeStream privilege { role: "readAnyDatabase", db: "admin" } //for snapshot reading ] });
The MongoDB CDC table can be defined as following:
-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE products (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
-- read snapshot and change events from products collection
SELECT * FROM products;
Note that
MongoDB's change event record doesn't have updated before message. So, we can only convert it to Flink's UPSERT changelog stream.
An upsert stream requires a unique key, so we must declare _id
as primary key.
We can't declare other column as primary key, because delete operation does not contain the key and value besides _id
and sharding key
.
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be mongodb-cdc . |
hosts | required | (none) | String | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. localhost:27017,localhost:27018
|
username | optional | (none) | String | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
password | optional | (none) | String | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
database | optional | (none) | String | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. |
collection | optional | (none) | String | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. |
connection.options | optional | (none) | String | The ampersand-separated connection options of MongoDB. eg. replicaSet=test&connectTimeoutMS=300000
|
errors.tolerance | optional | none | String | Whether to continue processing messages if an error is encountered.
Accept none or all .
When set to none , the connector reports an error and blocks further processing of the rest of the records
when it encounters an error. When set to all , the connector silently ignores any bad messages.
|
errors.log.enable | optional | true | Boolean | Whether details of failed operations should be written to the log file. |
copy.existing | optional | true | Boolean | Whether copy existing data from source collections. |
copy.existing.pipeline | optional | (none) | String | An array of JSON objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient. eg. [{"$match": {"closed": "false"}}] ensures that
only documents in which the closed field is set to false are copied.
|
copy.existing.max.threads | optional | Processors Count | Integer | The number of threads to use when performing the data copy. |
copy.existing.queue.size | optional | 16000 | Integer | The max size of the queue to use when copying data. |
poll.max.batch.size | optional | 1000 | Integer | Maximum number of change stream documents to include in a single batch when polling for new data. |
poll.await.time.ms | optional | 1500 | Integer | The amount of time to wait before checking for new results on the change stream. |
heartbeat.interval.ms | optional | 0 | Integer | The length of time in milliseconds between sending heartbeat messages. Use 0 to disable. |
Note: heartbeat.interval.ms
is highly recommended setting a proper value larger than 0 if the collection changes slowly.
The heartbeat event can push the resumeToken
forward to avoid resumeToken
being expired when we recover the Flink job from a checkpoint or savepoint.
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
Key | DataType | Description |
---|---|---|
database_name | STRING NOT NULL | Name of the database that contain the row. |
collection_name | STRING NOT NULL | Name of the collection that contain the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with exactly-once processing even failures happen.
The config option copy.existing
specifies whether do snapshot when MongoDB CDC consumer startup.
Defaults to true
.
The config option copy.existing.pipeline
describing the filters when copying existing data.
This can filter only required data and improve the use of indexes by the copying manager.
In the following example, the $match
aggregation operator ensures that only documents in which the closed field is set to false are copied.
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
We integrate the MongoDB's official Kafka Connector to read snapshot or change events from MongoDB and drive it by Debezium's EmbeddedEngine
.
Debezium's EmbeddedEngine
provides a mechanism for running a single Kafka Connect SourceConnector
within an application's process, and it can drive any standard Kafka Connect SourceConnector
properly even which is not provided by Debezium.
We choose MongoDB's official Kafka Connector instead of the Debezium's MongoDB Connector because they use a different change data capture mechanism.
- For Debezium's MongoDB Connector, it reads the
oplog.rs
collection of each replica-set's master node. - For MongoDB's Kafka Connector, it subscribes
Change Stream
of MongoDB.
MongoDB's oplog.rs
collection doesn't keep the changed record's update before state, so it's hard to extract the full document state by a single oplog.rs
record and convert it to change log stream accepted by Flink (Insert Only, Upsert, All).
Additionally, MongoDB 5 (released in July 2021) has changed the oplog format, so the current Debezium connector cannot be used with it.
Change Stream is a new feature provided by MongoDB 3.6 for replica sets and sharded clusters that allows applications to access real-time data changes without the complexity and risk of tailing the oplog.
Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them.
Lookup Full Document for Update Operations is a feature provided by Change Stream which can configure the change stream to return the most current majority-committed version of the updated document. Because of this feature, we can easily collect the latest full document and convert the change log to Flink's Upsert Changelog Stream.
By the way, Debezium's MongoDB change streams exploration mentioned by DBZ-435 is on roadmap.
If it's done, we can consider integrating two kinds of source connector for users to choose.
The MongoDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
public class MongoDBSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
.hosts("localhost:27017")
.username("flink")
.password("flinkpw")
.databaseList("inventory") // set captured database, support regex
.collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
Note: If database regex is used, readAnyDatabase
role is required.
BSON short for Binary JSON is a binary-encoded serialization of JSON-like format used to store documents and make remote procedure calls in MongoDB.
Flink SQL Data Type is similar to the SQL standard’s data type terminology which describes the logical type of a value in the table ecosystem. It can be used to declare input and/or output types of operations.
In order to enable Flink SQL to process data from heterogeneous data sources, the data types of heterogeneous data sources need to be uniformly converted to Flink SQL data types.
The following is the mapping of BSON type and Flink SQL type.
BSON type | Flink SQL type |
---|---|
TINYINT | |
SMALLINT | |
Int | INT |
Long | BIGINT |
FLOAT | |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp |
DATE |
Date Timestamp |
TIME |
Date | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex |
STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON |
Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ... |