Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaTrigger (.NET) does not work with Avro schema that has the has an array of records #455

Open
michny opened this issue Aug 18, 2023 · 5 comments

Comments

@michny
Copy link

michny commented Aug 18, 2023

I have an Avro schema that looks like this:

{
    "type": "array",
    "items": {
        "type": "record",
        "name": "myrecord",
        "fields": [
            ...
        ]
    }
}

If I try to use the Avro schema in my KafkaTrigger using either "SchemaRegistryUrl" or "AvroSchema" properties and a "events" type as KafkaEventData<GenericRecord>[] I get the following error:
Confluent.Kafka: Local: Value deserialization error. System.Private.CoreLib: Unable to cast object of type 'System.Object[]' to type 'Avro.Generic.GenericRecord'.

Changing the events type to KafkaEventData<GenericRecord[]>[] results in the same error.
Is it not possible to deserialize the messages using Avro with an array type? I have to use SchemaRegistryUrl as the Avro schema is managed by another team (the providers) so I can't hardcode the schema as a C# class.

@jainharsh98
Copy link
Collaborator

@michny Are you using dotnet or dotnet-isolated model for your function? Also is the behavior similar to what is described in #452?

@michny
Copy link
Author

michny commented Aug 18, 2023

I am using "dotnet" for the function.
As for #452 it appears different. My deserialization completely fails by throwing an exception where as the other issue seems to just gets "null" as the value of the array. They also seem different in that my avro Schema is an array of records, where as the schema mentioned in the other issue is a record with an array field.
To me it seems like the issue I am hitting is that it receives an array in the message ("Object[]") which matches the schema, but the code expects the message to be a single record when it deserializes to "GenericRecord".

@michny
Copy link
Author

michny commented Aug 18, 2023

I have just tried recreating the setup in a Console app using Confluent.Kafka 2.2.0, Confluent.SchemaRegistry 2.2.0 and Confluent.SchemaRegistry.Serdes.Avro and I get the exact same error so it is probably an error from the underlying libraries rather than a problem with the KafkaTrigger itself.
But I am still looking for some way around it, so I can deserialize the messages.

@michny
Copy link
Author

michny commented Aug 23, 2023

@jainharsh98 are there any plans to upgrade the library to use Confluent.Kafka v2.2.0?

I have found a work-around for my issue which is to use the Chr.Avro.Confluent library (https://github.com/ch-robinson/dotnet-avro) for the deserialization, but this uses Confluent.Kafka 2.2.0 and as such clashes with the azure-functions-kafka-extension using Confluent.Kafka 1.9.0. Using both these libraries at the same time forces azure-functions-kafka-extension to use 2.2.0 which results in the following error on startup:

A host error has occurred during startup operation '7317244d-cf39-4871-a99b-2588681b02b6'.
System.Private.CoreLib: Method 'RegisterSchemaAsync' in type 'Microsoft.Azure.WebJobs.Extensions.Kafka.LocalSchemaRegistry' from assembly 'Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.9.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35' does not have an implementation.

Of course I would ideally like to just use azure-functions-kafka-extension for triggering the function and doing the deserialization, but an acceptable workaround for me would be to use azure-functions-kafka-extension to trigger the function when new messages arrive in Kafka and then use Chr.Avro.Confluent to deserialize them from a byte[] to the .NET classes I have.

@jainharsh98
Copy link
Collaborator

@michny We have library upgrade as a part of our roadmap. Since this is a major version change any breaking changes need to be taken care of before the release. We currently do not have any ETA on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants