Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DebeziumJsonDeserializer seems to not work #42

Open
smyrgeorge opened this issue Aug 19, 2023 · 4 comments
Open

DebeziumJsonDeserializer seems to not work #42

smyrgeorge opened this issue Aug 19, 2023 · 4 comments

Comments

@smyrgeorge
Copy link

Hola!

I'm playing around with Debezium + PostgreSQL and I'm trying to use the DebeziumJsonDeserializer.

I managed to load the plugin correctly, I saw it in the logs when Kafka connect is starting up.

Also managed to load the connector, using the following config.

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "@@@@@",
    "database.password": "@@@@@",
    "database.dbname": "@@@@@",
    "topic.prefix": "dbserver1",
    "schema.include.list": "inventory",
    "transforms": "DebeziumJsonDeserializer",
    "transforms.DebeziumJsonDeserializer.type": "com.birdie.kafka.connect.smt.DebeziumJsonDeserializer",
    "transforms.DebeziumJsonDeserializer.optional-struct-fields": true,
    "transforms.DebeziumJsonDeserializer.union-previous-messages-schema": true,
    "transforms.DebeziumJsonDeserializer.probabilistic-fast-path": true
  }
}

From the logs:

2023-08-19 08:06:46,151 INFO   ||  EnrichedConnectorConfig values:
	config.action.reload = restart
	connector.class = io.debezium.connector.postgresql.PostgresConnector
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	exactly.once.support = requested
	header.converter = null
	key.converter = null
	name = inventory-connector
	offsets.storage.topic = null
	predicates = []
	tasks.max = 1
	topic.creation.groups = []
	transaction.boundary = poll
	transaction.boundary.interval.ms = null
	transforms = [DebeziumJsonDeserializer]
	transforms.DebeziumJsonDeserializer.convert-numbers-to-double = false
	transforms.DebeziumJsonDeserializer.ignored-fields =
	transforms.DebeziumJsonDeserializer.negate = false
	transforms.DebeziumJsonDeserializer.optional-struct-fields = true
	transforms.DebeziumJsonDeserializer.predicate = null
	transforms.DebeziumJsonDeserializer.probabilistic-fast-path = true
	transforms.DebeziumJsonDeserializer.sanitize.field.names = false
	transforms.DebeziumJsonDeserializer.type = class com.birdie.kafka.connect.smt.DebeziumJsonDeserializer
	transforms.DebeziumJsonDeserializer.union-previous-messages-schema = true
	transforms.DebeziumJsonDeserializer.union-previous-messages-schema.log-union-errors = false
	value.converter = null

I have a table with the following values (one of the rows).

1002,George,Bailey,gbailey@foobar.com,"{""a"": 5, ""b"": ""test""}"

The message produced for this row is like:

{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "struct",
				"fields": [
					{
						"type": "int32",
						"optional": false,
						"default": 0,
						"field": "id"
					},
					{
						"type": "string",
						"optional": false,
						"field": "first_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "last_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "email"
					},
					{
						"type": "string",
						"optional": false,
						"name": "io.debezium.data.Json",
						"version": 1,
						"field": "test"
					}
				],
				"optional": true,
				"name": "dbserver1.inventory.customers.Value",
				"field": "before"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "int32",
						"optional": false,
						"default": 0,
						"field": "id"
					},
					{
						"type": "string",
						"optional": false,
						"field": "first_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "last_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "email"
					},
					{
						"type": "string",
						"optional": false,
						"name": "io.debezium.data.Json",
						"version": 1,
						"field": "test"
					}
				],
				"optional": true,
				"name": "dbserver1.inventory.customers.Value",
				"field": "after"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "string",
						"optional": false,
						"field": "version"
					},
					{
						"type": "string",
						"optional": false,
						"field": "connector"
					},
					{
						"type": "string",
						"optional": false,
						"field": "name"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "ts_ms"
					},
					{
						"type": "string",
						"optional": true,
						"name": "io.debezium.data.Enum",
						"version": 1,
						"parameters": {
							"allowed": "true,last,false,incremental"
						},
						"default": "false",
						"field": "snapshot"
					},
					{
						"type": "string",
						"optional": false,
						"field": "db"
					},
					{
						"type": "string",
						"optional": true,
						"field": "sequence"
					},
					{
						"type": "string",
						"optional": false,
						"field": "schema"
					},
					{
						"type": "string",
						"optional": false,
						"field": "table"
					},
					{
						"type": "int64",
						"optional": true,
						"field": "txId"
					},
					{
						"type": "int64",
						"optional": true,
						"field": "lsn"
					},
					{
						"type": "int64",
						"optional": true,
						"field": "xmin"
					}
				],
				"optional": false,
				"name": "io.debezium.connector.postgresql.Source",
				"field": "source"
			},
			{
				"type": "string",
				"optional": false,
				"field": "op"
			},
			{
				"type": "int64",
				"optional": true,
				"field": "ts_ms"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "string",
						"optional": false,
						"field": "id"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "total_order"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "data_collection_order"
					}
				],
				"optional": true,
				"name": "event.block",
				"version": 1,
				"field": "transaction"
			}
		],
		"optional": false,
		"name": "dbserver1.inventory.customers.Envelope"
	},
	"payload": {
		"before": null,
		"after": {
			"id": 1002,
			"first_name": "George",
			"last_name": "Bailey",
			"email": "gbailey@foobar.com",
			"test": "{\"a\": 5, \"b\": \"test\"}"
		},
		"source": {
			"version": "2.4.0.Alpha2",
			"connector": "postgresql",
			"name": "dbserver1",
			"ts_ms": 1692432406406,
			"snapshot": "first",
			"db": "postgres",
			"sequence": "[null,\"35544840\"]",
			"schema": "inventory",
			"table": "customers",
			"txId": 926,
			"lsn": 35544840,
			"xmin": null
		},
		"op": "r",
		"ts_ms": 1692432406496,
		"transaction": null
	}
}

As we can see the test field is still encoded as a string:

"{\"a\": 5, \"b\": \"test\"}"

Am I missing something?
I'm using the latest images (2.4) provided by the debezium team.
https://quay.io/organization/debezium

Thanks in advance!

@RaghadAlkhudhair
Copy link

I want to ask you how did you manage to use the transformer without getting the following error:

java.lang.ClassCastException: class org.apache.kafka.connect.sink.SinkRecord cannot be cast to class org.apache.kafka.connect.source.SourceRecord (org.apache.kafka.connect.sink.SinkRecord and org.apache.kafka.connect.source.So │
│ urceRecord are in unnamed module of loader 'app')                                                                                                                                                                                             │
│   at com.birdie.kafka.connect.smt.DebeziumJsonDeserializer.apply(DebeziumJsonDeserializer.java:26)                                                                                                                                            │
│   at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)                                                                                                                                                  │
│   at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)                                                                                                                                         │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)                                                                                                                     │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)                                                                                                               │
│   ... 15 more

because I am getting this error currently

@smyrgeorge
Copy link
Author

smyrgeorge commented Feb 13, 2024

I don't remember actually. Although I managed to create my own converters.

If you want just take a look here:

https://github.com/smyrgeorge/debezium-test/blob/main/kafka-connect-json-to-proto/src/main/kotlin/io/smyrgeorge/connect/converter/JsonNodeConverter.kt

@gtsopour
Copy link

@smyrgeorge One quick question please. I have managed to build this project but how I can include this plugin afterwards? Is it enough just to include the jar like below? Also you mentioned that you've built your own plugin - is this somewhere available to give it a shot in my end? Thanks in advance.
- name: connect-smts artifacts: - type: jar url: {PUBLIC_DOMAIN}/connect-smts-1.0-SNAPSHOT.jar

@smyrgeorge
Copy link
Author

@gtsopour according to Karla's connect documentation:

By default, the directory /kafka/connect is used as plugin directory by the Debezium Docker image for Kafka Connect. So any additional connectors you may wish to use should be added to that directory.

Check my repository for further details:

https://github.com/smyrgeorge/debezium-test

I'm not using this implementation in production. In the project that I'm working we do have a working version that is based on the examples of this repository.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants