-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Each JSON response is packed into "payload.value" as an invalid JSON string #19
Comments
First of all, thank you @rumline for using the plugin, and I'm sorry it didn't work out out of the box for you. You seem to be using the plugin just fine. However, Kafka Connect source plugins produce data with a schema, this schema is usually an Avro schema that will be registered on the Schema Registry. You can find some additional information about how to manage this here: https://docs.confluent.io/current/schema-registry/connect.html The schema produced by default by this plugin wraps key and value each in an envelope with a single string property with name "key" for the key and "value" for the value, although the name of these properties is configurable. In order to simplify your use case, I've just released v0.7.3 that introduces two new mappers that produce a simplified schema without the envelopes. You'll find them here:
or
However, using the
And the reason I don't think this will solve it is because by representing the json document as a String, with the default converter, it's being encoded escaping illegal characters (illegal because the value has to be wrapped in an Avro envelope represented as a json itself). I haven't tried this, but my guess is you'll have to play with the following configuration properties documented here: connect-configuring-converters I'm not 100% sure on what you should be using there, but my guess would be something like this in combination with one of the new mappers described above:
Please, let me know if there is anything else I can do to support you with this use case, and please come back if you manage to make it work so future users benefit from your findings. Thanks, |
Works perfectly now. Switched to the new StringKvSourceRecordMapper and then I only had to add "value.converter": "StringConverter". The key and schema are completely ignored by the Elasticsearch sink to allow Elasticsearch to do dynamic type mapping. New connector config:
Important settings in the Elasticsearch sink config to ignore key and schema:
Thank you for the quick solution. |
Since I was testing it out, here are examples of the different topic entries with the different settings. Perhaps useful for the examples section of the project. |
That's great news :) Thanks for the examples, I understand your use case will be common, so I'll have to find a place to document this. By the way, I saw you noticed #20. Although you are using the connector just fine, you might be interested to know that once that issue is resolved, it would be possible for you to use the NYTimes API the same way you are, periodically polling last articles, but avoiding articles to be published to Kafka if they've been already published. Not sure if this would be useful for you. I guess it depends on whether you value having a deduplicated Kafka topic and saving some unnecessary Elasticsearch writes. All it'd require from you is:
|
Castor, I am now using the StringKvSourceRecordMapper and value converter StringConverter and works, and I get just the data back. Maybe as a future enhancement might be the ability to add a custom schema like they enable you to do with the https://docs.confluent.io/kafka-connect-sftp/current/source-connector/json_source_connector.html connector. |
Describe the bug
The JSON documents returned by the REST API call are recorded to the Kafka topic in a single field of "payload.value" with extra backslashes and double quote characters. This behavior renders the string invalid JSON when passed to Elasticsearch and proper document field:value pairs are not created.
To Reproduce
I didn't track down the same exact record through the process. Examples are attached.
Use this source config:
nytimes_source_connector_config.txt
Getting this JSON response:
nytimes_response.txt
An example entry in the Kafka topic is created:
kafka_topic_entry.txt
Use this sink config:
nytime_sink_connector_config.txt
An example Elasticsearch document is created that is unparsed:
elasticsearch_document.txt
Expected behavior
The JSON responses are stored in the Kafka topic as valid JSON without the extra double quotes and backslashes that turn the response JSON into a very long string.
Kafka Connect:
Using the confluent-platform-2.12. All components are at version 5.5.0
Plugin:
Self complied 0.7.1 using Java JDK-1.8.0_251
Additional context
Perhaps I am simply misunderstanding how to properly configure the connector. I've tried many variations of configuration options and SMT's to try and work around this behavior.
The text was updated successfully, but these errors were encountered: