Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sink bug when when using AvroConverter -> Error: Argument 'recordValue' is not valid map format. #493

Closed
flopetegui opened this issue Oct 25, 2022 · 17 comments
Labels
bug Something isn't working

Comments

@flopetegui
Copy link

flopetegui commented Oct 25, 2022

Description

WorkerSinkTask fails to write message to cosmosdb citing that record value is not in a valid 'map' format.
Sink connector is pulling from kafka topic that uses AVRO serializer and schema registry.

Expected Behavior

Sink connector should publishes message from topic to cosmos db container.

Reproduce

Here is the Avro schema from .avsc file.

{
  "type": "record",
  "name": "CompanyResponse",
  "namespace": "com.company.ns",
  "fields": [
    {
      "name": "companyId",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "companyName",
      "type": [
        "null",
        {
          "type": "string",
          "avro.java.string": "String"
        }
      ],
      "default": null
    }
  ],
  "connect.name": "com.company.ns.CompanyResponse"
}

The Java code snippet that produces the message to 'company-response-topic' topic

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(CompanyResponse.SCHEMA$.toString());
GenericRecord avroRecord = new GenericData.Record(schema);

avroRecord.put("companyId", "JS9DJ354GSN8");
avroRecord.put("companyName", "My Company");

ProducerRecord<Object, Object> record = new ProducerRecord<>("company-response-topic", "JS9DJ354GSN8", avroRecord);
kafkaProducer.send(record);

The Cosmos sink configuration

{
  "name": "cosmosdb-sink-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "tasks.max": "1",
    "topics": [
      "company-response-topic"
    ],
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://sregistry:8081",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "connect.cosmos.connection.endpoint": "https://my-cosmos-instance.azure.com:443/",
    "connect.cosmos.master.key": "<my key>",
    "connect.cosmos.databasename": "database",
    "connect.cosmos.containers.topicmap": "company-response-topic#company-response-topic",
    "connect.cosmos.connection.gateway.enabled": "true",
    "id.strategy": "com.azure.cosmos.kafka.connect.sink.id.strategy.FullKeyStrategy"
  }
}

Cosmos db configurations includes a partition key '/companyId'

Here are logs indicating the error from WorkerSinkTask

[2022-10-25 01:35:32,128] INFO Sending 1 records to be written (com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask)
[2022-10-25 01:35:32,226] ERROR WorkerSinkTask{id=cosmosdb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Argument 'recordValue' is not valid map format. (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.IllegalArgumentException: Argument 'recordValue' is not valid map format.
        at com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument(Preconditions.java:136)
        at com.azure.cosmos.kafka.connect.sink.BulkWriter.getPartitionKeyValue(BulkWriter.java:103)
        at com.azure.cosmos.kafka.connect.sink.BulkWriter.writeCore(BulkWriter.java:62)
        at com.azure.cosmos.kafka.connect.sink.SinkWriterBase.write(SinkWriterBase.java:26)
        at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:123)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Logs aknowledge that a record needs to be written, but then fails at BulkWriter.getPartitionKeyValue(BulkWriter.java:103) where it fails the precondition
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");

Additional Context

If I use json instead of Avro, the sink works. This seems like an avro specific issue.

@flopetegui flopetegui added the bug Something isn't working label Oct 25, 2022
@flopetegui flopetegui changed the title WorkerSinkTask Error: Argument 'recordValue' is not valid map format. Sink bug when when using AvroConverter -> Error: Argument 'recordValue' is not valid map format. Oct 25, 2022
@kushagraThapar
Copy link
Collaborator

@flopetegui thanks for reporting this, and adding the required details. Wondering if you can provide more information related to the version of sink connector being used.
Also, we recently added support for bulk in the sink connector. Can you please repro this on a version prior to bulk support. That way we can isolate the issue if it is related to the recent addition of bulk support or if the issue has always been there.

@xinlian12
Copy link
Contributor

@flopetegui please disable the bulk mode and see whether the issue still repros -> connect.cosmos.sink.bulk.enabled=false

@flopetegui
Copy link
Author

I am using v1.6.0 of the plugin downloaded from Confluent hub.

@xinlian12 added the property as you instructed. Still failed, but with a different message.

    [2022-10-26 00:32:51,697] INFO Sending 1 records to be written (com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask)
    [2022-10-26 00:32:51,707] ERROR locationEndpoint is null because ClientRetryPolicy::onBeforeRequest(.) is not invoked, probably request creation failed due to invalid options, serialization setting, etc. (com.azure.cosmos.implementation.ClientRetryPolicy)
    [2022-10-26 00:32:51,707] ERROR onBeforeSendRequest is not invoked, encountered failure due to request being null (com.azure.cosmos.implementation.RenameCollectionAwareClientRetryPolicy)
    java.lang.IllegalArgumentException: Failed to serialize the object into json
            at com.azure.cosmos.implementation.Utils.serializeJsonToByteBuffer(Utils.java:699)
            at com.azure.cosmos.implementation.InternalObjectNode.serializeJsonToByteBuffer(InternalObjectNode.java:116)
            at com.azure.cosmos.BridgeInternal.serializeJsonToByteBuffer(BridgeInternal.java:110)
            at com.azure.cosmos.implementation.RxDocumentClientImpl.getCreateDocumentRequest(RxDocumentClientImpl.java:1470)
            at com.azure.cosmos.implementation.RxDocumentClientImpl.upsertDocumentInternal(RxDocumentClientImpl.java:1837)
            at com.azure.cosmos.implementation.RxDocumentClientImpl.lambda$upsertDocument$39(RxDocumentClientImpl.java:1829)
            at com.azure.cosmos.implementation.ObservableHelper.lambda$inlineIfPossibleAsObs$1(ObservableHelper.java:44)
            at com.azure.cosmos.implementation.BackoffRetryUtility.lambda$executeRetry$0(BackoffRetryUtility.java:36)
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
            at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
            at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
            at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
            at reactor.core.publisher.Mono.subscribe(Mono.java:4397)
            at reactor.core.publisher.Mono.block(Mono.java:1706)
            at com.azure.cosmos.CosmosContainer.blockItemResponse(CosmosContainer.java:242)
            at com.azure.cosmos.CosmosContainer.upsertItem(CosmosContainer.java:206)
            at com.azure.cosmos.kafka.connect.sink.PointWriter.writeCore(PointWriter.java:31)
            at com.azure.cosmos.kafka.connect.sink.SinkWriterBase.write(SinkWriterBase.java:26)
            at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:123)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.kafka.connect.data.Struct and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
            at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
            at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1300)
            at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400)
            at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:46)
            at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:29)
            at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
            at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
            at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4568)
            at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3780)
            at com.azure.cosmos.implementation.Utils.serializeJsonToByteBuffer(Utils.java:695)
            ... 30 more

@flopetegui
Copy link
Author

flopetegui commented Oct 27, 2022

The error indicates a potential issue with the ObjectMapper in connector?
Specifically the conversion of an Avro message to Json structure used to store item in NoSql.

The Avro message is in JSON format, but it includes the type in output for fields that are nullable.

For example, schema

    {
      "type" : "record",
      "name" : "User",
      "namespace" : "io.confluent.developer.avro",
      "fields" : [ {
        "name" : "name",
        "type" : "string"
      }, {
        "name" : "favorite_number",
        "type" : [ "null", "int" ],
        "default" : null
      }, {
        "name" : "favorite_color",
        "type" : [ "null", "string" ],
        "default" : null
      } ]
    }

Will results in JSON message,

    {
      "name": "cricket007",
      "favorite_number": {
        "int": 42
      },
      "favorite_color": {
        "string": "green"
      }
    }

@xinlian12
Copy link
Contributor

thanks @flopetegui, agree with you, seems mattered to the ObjectMapper used in the SDK, will pick up this issue in the next 2-3 weeks,

@ilyasdresden
Copy link

Hi, we faced with same issue. Is there any update on this? Thanks.

@xinlian12
Copy link
Contributor

@flopetegui / @ilyasdresden due to some other higher priority working items, this issue will likely be picked up sometime in March.
Is that possible to use JsonConverter instead?

@ilyasdresden
Copy link

@xinlian12, thank you for the information and an option. Unfortunately, it would not work for us because we are not owning the topic.

@renebrandnewday
Copy link

renebrandnewday commented Mar 3, 2023

Same here with 1.6.0. Is there any update on this? Being worked on this month? Thanks.

We are using Avro.

ERROR WorkerSinkTask{id=cosmos-rene-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Failed to serialize the object into json (org.apache.kafka.connect.runtime.WorkerSinkTask:609) java.lang.IllegalArgumentException: Failed to serialize the object into json at com.azure.cosmos.implementation.Utils.serializeJsonToByteBuffer(Utils.java:699) at com.azure.cosmos.implementation.InternalObjectNode.serializeJsonToByteBuffer(InternalObjectNode.java:116) at com.azure.cosmos.BridgeInternal.serializeJsonToByteBuffer(BridgeInternal.java:110) at com.azure.cosmos.implementation.RxDocumentClientImpl.getCreateDocumentRequest(RxDocumentClientImpl.java:1470) at com.azure.cosmos.implementation.RxDocumentClientImpl.upsertDocumentInternal(RxDocumentClientImpl.java:1837) at com.azure.cosmos.implementation.RxDocumentClientImpl.lambda$upsertDocument$39(RxDocumentClientImpl.java:1829) at com.azure.cosmos.implementation.ObservableHelper.lambda$inlineIfPossibleAsObs$1(ObservableHelper.java:44) at com.azure.cosmos.implementation.BackoffRetryUtility.lambda$executeRetry$0(BackoffRetryUtility.java:36) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) at reactor.core.publisher.Mono.subscribe(Mono.java:4397) at reactor.core.publisher.Mono.block(Mono.java:1706) at com.azure.cosmos.CosmosContainer.blockItemResponse(CosmosContainer.java:242) at com.azure.cosmos.CosmosContainer.upsertItem(CosmosContainer.java:206) at com.azure.cosmos.kafka.connect.sink.PointWriter.writeCore(PointWriter.java:31) at com.azure.cosmos.kafka.connect.sink.SinkWriterBase.write(SinkWriterBase.java:26) at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:123) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99) at reactor.core.publisher.Mono.block(Mono.java:1707) ... 16 more Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.kafka.connect.data.Struct and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77) at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1300) at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400) at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:46) at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:29) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4568) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3780) at com.azure.cosmos.implementation.Utils.serializeJsonToByteBuffer(Utils.java:695) ... 30 more

@Nardu92
Copy link

Nardu92 commented Mar 8, 2023

I had the same issue with the CosmosDb sink 1.6.0 with base image confluentinc/cp-kafka-connect-base:7.3.2
As a workaround I'm using the kafka-connect-cosmos 1.3.1 and it works

@ghost
Copy link

ghost commented Mar 23, 2023

I think i found the issue. I compared the version 1.5..0 with version 1.7.0.
The issue is probably that the StructToJson function translated the record.value() to recordValue but the recordValue it not being used anymore (works in 1.5.0 since its being used here).

This can be seen in version 1.7.0 at lines 110 - 119 in The ComosDbSinkTask

`
Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else {
recordValue = record.value();
}

            maybeInsertId(recordValue, record);
            toBeWrittenRecordList.add(record);`

To fix i suggest the the record.value() can be replaced with the recordValue or the for loop from version 1.5.0 can be reintroduced. If help is required we can be asked @ marko.oljaca@sva.de since this version is critical for one of our customers.

@kushagraThapar
Copy link
Collaborator

@maoljaca thanks for the suggestion, I have tried fixing this bug in the below PR - would appreciate if you can take a look (just in case I missed anything), thanks - #503

@ghost
Copy link

ghost commented Mar 27, 2023

@kushagraThapar i will test the version asap. Thanks for your PR!
Checked it and this resolved the issue at my side.

I'm still having some bugs related with very nested avro but this is not related to this issue. I suggest closing this issue.

Thanks and Best
Marko

@ghost
Copy link

ghost commented Mar 27, 2023

About my currentissue with the nested avro i have the following idea (tried to open discussion but didn't work due to permisison i think):

Currently https://github.com/microsoft/kafka-connect-cosmosdb/blob/v1.7.1/src/main/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMap.java handles transformation of Struct to a nested Map of List and Maps.

The recursion seems fine, but complexer Object still crash (we have an issue currently with large nested object). Some nested struct is not properly resolved.

We currently use a workaround where we use a Converter to convert Avro to Plain JSON and write that straight to Cosmos DB via Sink Connector. The Converter is available and can transform Struct to plain JSON. This could be written to Cosmos DB.

Benefits:

  • Less code
  • Converter is well tested for many use cases
  • Would resolve the current bug for heavily nested objects

Draw backs:

  • Certain data types not well supported in plain json (for example Dates)
  • Feels a bit hacky though since Converter jobs were to convert Data from Kafka Topics or write to Kafka Topics.

Here a Code Snippet:

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.Collections;

public class TestStruct {

    @Test
    public void testStruct() {

        Schema schema = SchemaBuilder.struct()
                .field("make", Schema.STRING_SCHEMA)
                .field("model", Schema.STRING_SCHEMA)
                .field("year", Schema.INT32_SCHEMA)
                .field("color", Schema.STRING_SCHEMA)
                .field("userBefore", SchemaBuilder.struct()
                        .field("id", Schema.INT32_SCHEMA)
                        .field("name", Schema.STRING_SCHEMA)
                        .field("email", Schema.STRING_SCHEMA)
                        .build())
                .build();

// Create a Struct object representing car data
        Struct carData = new Struct(schema)
                .put("make", "Toyota")
                .put("model", "Camry")
                .put("year", 2022)
                .put("userBefore", new Struct(schema.field("userBefore").schema())
                        .put("id", 1)
                        .put("name", "John Doe")
                        .put("email", "john.doe@gmail.com"))
                .put("color", "red");

// Create a JsonConverter object
        JsonConverter jsonConverter = new JsonConverter();
        jsonConverter.configure(Collections.singletonMap("schemas.enable", false), false);
// Convert the car data Struct object to JSON
        byte[] payload = jsonConverter.fromConnectData("car-data-topic", schema, carData);
        String json = new String(payload, StandardCharsets.UTF_8);

        System.out.println(json);

Output is:

{ "make": "Toyota", "model": "Camry", "year": 2022, "color": "red", "userBefore": { "id": 1, "name": "John Doe", "email": "john.doe@gmail.com" } }

What do you think about it?

Best Marko

@kushagraThapar
Copy link
Collaborator

@maoljaca thanks for verifying the solution. Reagrding the converter, I agree, your solution works but it is kind of hacky. Can you share the complex object on which the current StructToJsonMap conversion fails. May be I can try to fix the issue in cosmos connector. Will close this issue, can you please create a new github issue for this problem?

@ghost
Copy link

ghost commented Mar 28, 2023

i'll create a new one. I try to give a solution as well to it.

@kushagraThapar
Copy link
Collaborator

@maoljaca appreciate it, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants