Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debezium CDC Connector to send Events to Kafka-Enabled Event Hub #53

Closed
8 of 14 tasks
JagdeepP opened this issue Jun 5, 2019 · 9 comments
Closed
8 of 14 tasks

Comments

@JagdeepP
Copy link

JagdeepP commented Jun 5, 2019

Description

Data replication using Kafka Connect and Kafka-enabled Event Hub. We have it working with Confluent Kafka Broker and Confluent Kafka Connect. We are having issues when we replace the Kafka Broker with Event Hub. We are not getting any events from the CDC connector to Event Hub. We see the control topics being created but we do not see the CDC topic for table get created.

It does work with the File Sync connector

How to reproduce

  • Setup Developer Instance of SQL 2017 (CDC Feature required) with SQL auth enabled
  • Download latest AdventureWorks2017.bak file for testing
  • Restore AdventureWorks2017.bak to AdventureWorks2017Source database
  • Create a blank database AdventureWorks2017Dest
  • Enable CDC on database,
  • Setup latest Confluent 5.2.1
  • Setup Debezium CDC Connector to read Country.Region table changes from AdventureWorks2017Source database
  • Setup JDBC Sink Connector to sink changes to AdventureWorks2017Dest
  • Verify it works with Confluent Kafka Broker
  • No Change the configurations to use Event Hub

Has it worked previously?

No

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • Verified that port 9093 is not blocked by firewall
  • Verified the namespace is either Standard or Dedicated tier (i.e. it is not Basic tier, which isn't supported)
  • Sample you're having trouble with: <REPLACE with e.g., Java quickstart>
  • Apache Kafka version: Confluent 5.2.1
  • Kafka client configuration: <REPLACE with e.g., auto.reset.offset=earliest, ..> (do not include your connection string or SAS Key)
  • Namespace and EventHub/topic name
  • Consumer or producer failure Producer
  • If consumer, partition ID, group ID <REPLACE with e.g., partitionID=3, groupID=consumer-group>
  • Timestamp in UTC <REPLACE wtih e.g., Nov 7 2018 - 17:15:01 UTC>
  • Client ID <REPLACE with e.g., clientID=kafka-client>
  • Provide all logs (with debugging enabled if possible) or exception call stack
  • Standalone repro <REPLACE with e.g., Willing/able to send scenario to repro issue>
  • Operating system: RHEL latest Anish => Red Hat Enterprise Linux Server release 7.6 (Maipo)
  • Critical issue

Logs with Debugging enabled

No errors in the logs. Its connected to the event hub and created the default tables like connect-cluster-configs/connect-cluster-offsets/connect-cluster-status. Connector is not even trying to connect to the SQL server. As per Debezium community support, there is no proven solution for CDC connector to connect to the event hub

Kafka Client Configuration

group.id=connect-cluster-group_db2
bootstrap.servers=.servicebus.windows.net:9093
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter = io.confluent.connect.avro.AvroConverter
value.converter = io.confluent.connect.avro.AvroConverter
#internal.key.converter=org.apache.kafka.connect.json.JsonConverter
#internal.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/eventhub.offsets
offset.flush.interval.ms=10000

#key.serializer=org.apache.kafka.common.serialization.StringSerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer

transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.drop.tombstones=false

config.storage.topic=connect-cluster-configs_db2
offset.storage.topic=connect-cluster-offsets_db2
status.storage.topic=connect-cluster-status_db2

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

name=kafka-poc
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
tasks.max=1
database.hostname=
database.port=1433
database.user=
database.password=
database.dbname=INTCMSCINT
database.history.kafka.topic=history_kafka_poc
database.server.name=
database.history.kafka.bootstrap.servers=.servicebus.windows.net:9093
table.whitelist=Kafka_test.EMPLOYER_testCDC
#snapshot.mode=initial_schema_only
plugin.path=/opt/kafka/confluent-5.2.1/share/java
rest.port=8090

#required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**********";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**********";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**********";

========

@arerlend
Copy link
Contributor

This is interesting. What are these fields?

database.history.kafka.topic=history_kafka_poc
database.server.name=
database.history.kafka.bootstrap.servers=.servicebus.windows.net:9093

I don't have time at the moment to dig into the connector docs unfortunately, but we can work through these together since you've used the connector previously.

@yorek
Copy link

yorek commented Jul 16, 2019

I've been able to configure it to make it almost working, but I now have some problems with authentication:

[AdminClient clientId=adminclient-1] Connection to node -1 (dmkafka.servicebus.windows.net/40.112.242.0:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.   [org.apache.kafka.clients.NetworkClient]

@yorek
Copy link

yorek commented Jul 17, 2019

I've been able to have Debezium correctly connect to Eventhub Kafka, and create the topics. I'm testing Debezium using the docker image provided in the Debezium Tutorial. This is the .yaml I used to make it work:

version: '2'
services:
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    environment:
     - BOOTSTRAP_SERVERS=xxxxx.servicebus.windows.net:9093
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
     - CONNECT_REQUEST_TIMEOUT_MS=60000
     - CONNECT_SECURITY_PROTOCOL=SASL_SSL
     - CONNECT_SASL_MECHANISM=PLAIN
     - CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="Endpoint=sb://xxxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yyyyy";
     - CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
     - CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
     - CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="Endpoint=sb://xxxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yyyyy";
     - CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
     - CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
     - CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="Endpoint=sb://xxxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yyyyy";

Note the I had to used a double $ character to make sure it was escaped property (This is needed only if running Debezium using the provided docker image I guess)

I then registered the connector for SQL Server (more precisely Azure SQL MI) using the followin JSON:

{
    "name": "sampledb-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.hostname" : "aaaaa.public.bbbbb.database.windows.net",
        "database.port" : "3342",
        "database.user" : "zzzzz",
        "database.password" : "ccccc",
        "database.dbname" : "sampledb",
        "database.server.name" : "aaaaa",
        "tasks.max" : "1",                
        "database.history.kafka.bootstrap.servers" : "xxxxx.servicebus.windows.net:9093",
        "database.history.kafka.topic": "dbhistory.aaaaa",        
        "database.history.security.protocol": "SASL_SSL",
        "database.history.sasl.mechanism": "PLAIN",
        "database.history.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://dmkafka.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yyyyy\";",
        "database.history.producer.bootstrap.servers": "xxxxx.servicebus.windows.net:9093",
        "database.history.producer.security.protocol": "SASL_SSL",
        "database.history.producer.sasl.mechanism": "PLAIN",
        "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://dmkafka.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yyyyy\";",
        "database.history.consumer.bootstrap.servers": "xxxxx.servicebus.windows.net:9093",
        "database.history.consumer.security.protocol": "SASL_SSL",
        "database.history.consumer.sasl.mechanism": "PLAIN",
        "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://xxxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yyyyy\";"
    }
}

The issue I'm having right now is this error message:

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS

Which seems to be related to the fact that the actual Kafka endpoint doesn't support 100% kafka features. Any known workaround for this?

@yorek
Copy link

yorek commented Jul 17, 2019

Found a workaround looking in the source code: it is possible to use something else other then Kafka to store schema changes. For example they can be stored in memory using the MemoryDatabaseHistory class. Not sure how useful this could be, but at least make the whole thing working!

Just add this to the JSON configuration:

	"database.history":"io.debezium.relational.history.MemoryDatabaseHistory",

@yehudamakarov
Copy link

yehudamakarov commented Apr 28, 2021

@yorek are you saying that you got kafka connect / debezium to make the topics for the watched tables before adding the database.history entry to config?

I've been able to have Debezium correctly connect to Eventhub Kafka, and create the topics

You mean the database table topics? or the system topics that are created on connect's startup?

@yorek
Copy link

yorek commented Apr 28, 2021

I was referring to the debezium "system" topics that are created when debezium starts. It's been a while now, so not sure if this still works with current version of Debezium

@arunthakur1
Copy link

arunthakur1 commented Feb 24, 2022

@yorek This is really helpful.

@netchex-tony
Copy link

This no longer seems to work with the current version of debezium/connect. The producers and consumers repeatedly disconnect.

@yannoushka74
Copy link

"database.history":"io.debezium.relational.history.MemoryDatabaseHistory",

Did you find a solution to this issue netchex-tony?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants