Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to have connector output JSON String? #398

Closed
aliadnani opened this issue Aug 27, 2020 · 5 comments
Closed

How to have connector output JSON String? #398

aliadnani opened this issue Aug 27, 2020 · 5 comments

Comments

@aliadnani
Copy link

Hi, I'm using the Camel Websocket Connecter to send data from a topic to a websocket. The data from the input topic is in Json/JsonSchema format.

The connector works for sending data to the websocket, but the data is in the format of Struct{key1=val1,key2=val2}

Is it possible to have the connector emit a JSON string like {"key1":"val1","key2": "val2"} instead?

The task config can be seen via CURL request below:

curl -X POST -H "Content-Type: application/json" \
    --data '{
        "name": "camel-sink3",
        "config": {
          "topics": "xxx",
          "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
          "value.converter.schema.registry.url": "http:/xxx-cp-schema-registry:8081",
          "key.converter": "org.apache.kafka.connect.storage.StringConverter",
          "connector.class": "org.apache.camel.kafkaconnector.websocket.CamelWebsocketSinkConnector",  
          "camel.sink.path.host": "0.0.0.0",
          "camel.sink.path.port":"8084",
	  "camel.component.websocket.host":"0.0.0.0",
          "camel.component.websocket.port": "8084",
	  "camel.sink.path.resourceUri": "kstreams-windows-average",
          "camel.component.websocket.minThreads": "3",
          "camel.sink.endpoint.sendToAll": "true",
        }
      }' http://xxx:8083/connectors

Thanks!

@orpiske
Copy link
Contributor

orpiske commented Aug 27, 2020

@aliadnani Out of curiosity: what happens if you use org.apache.kafka.connect.storage.StringConverter as the value converter?

@aliadnani
Copy link
Author

aliadnani commented Aug 27, 2020

If I use org.apache.kafka.connect.storage.StringConverter I get the following for my topic data serialized with JSON Schema:

"\u0000\u0000\u0000\u0000\u0001{\"name\":\"temperatureCelsius\",\"value\":23.0,\"unit\":\"C\",\"utc\":1592805755142}"

The double quotes are escaped and I'm guessing the \u0000\u0000\u0000\u0000\u0001 at the beginning is the Magic Byte/Schema ID thats encoded with the message.

I think what's happening when the value converter is set to JsonSchemaConverter is that the message is serialized to a Java object (based on the JsonConverter source I'm guessing Jackson JsonNode?) and that object gets passed directly to the websocket output resulting in something like Struct{key1=val1,key2=val2}. Whereas, with StringConverter the message (inlcuding the schema ID bytes) are deserialized to a String object directly resulting in above ^^ funny looking JSON.

I was poking around the camel connector codebase and saw transforms, would it work to write a transform class to take the JsonNode object and format it to a JSON string with the method jsonNode.toPrettyString() before sending it to the websocket output? - or do I misunderstand what transforms are to camel 😅?

@orpiske
Copy link
Contributor

orpiske commented Aug 27, 2020

...

I was poking around the camel connector codebase and saw transforms, would it work to write a transform class to take the JsonNode object and format it to a JSON string with the method jsonNode.toPrettyString() before sending it to the websocket output? - or do I misunderstand what transforms are to camel sweat_smile?

I think this would do the trick. We do have a few cases where we do this, in the test code base, for example, so I assume it would be a pretty similar scenario.

@aliadnani
Copy link
Author

aliadnani commented Aug 28, 2020

Thank you! It works!

I was wrong that the JsonSchemaConverter deserializes the record into a JsonNode object, I found out its actually a Kafka Connect Struct Object.

Anyways, I installed a transform library and used the ToJSON SMT to form a JSON string from the Struct before outputting to websocket.

My CURL request looks like this now:

curl -X POST -H "Content-Type: application/json" \
    --data '{
        "name": "camel-sink3",
        "config": {
          "topics": "xxx",
          "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
          "value.converter.schema.registry.url": "http:/xxx-cp-schema-registry:8081",
          "key.converter": "org.apache.kafka.connect.storage.StringConverter",
          "connector.class": "org.apache.camel.kafkaconnector.websocket.CamelWebsocketSinkConnector",  
          "camel.sink.path.host": "0.0.0.0",
          "camel.sink.path.port":"8084",
	  "camel.component.websocket.host":"0.0.0.0",
          "camel.component.websocket.port": "8084",
	  "camel.sink.path.resourceUri": "kstreams-windows-average",
          "camel.component.websocket.minThreads": "3",
          "camel.sink.endpoint.sendToAll": "true",
          "transforms": "toJson",
          "transforms.toJson.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value",
          "transforms.toJson.schemas.enable": "false"
        }
      }' http://xxx:8083/connectors

And now the output is a proper JSON: {"unit":"C","utc":1588264680439,"name":"temperature","value":422.0}

I was thinking maybe this should be the default behavior for Sink Connectors dealing consuming non-string data (JSON in my case) and don't have to send data in a specific format.

To me it makes more sense that non-string data (JSON, Avro) parsed by their converters to a Struct Object is sent to a websocket (or any other sink) as JSON rather than string-ed struct: Struct{key1=val1,key2=val2}

Anyways, thanks again for the help!

@oscerd
Copy link
Contributor

oscerd commented Oct 2, 2020

This can be close

@oscerd oscerd closed this as completed Oct 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants