Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaListener with property overrides: overridden properties are not applied to key- and value-deserializers #3526

Closed
sjann0 opened this issue Oct 4, 2024 · 5 comments · Fixed by #3540

Comments

@sjann0
Copy link

sjann0 commented Oct 4, 2024

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.2.1

Describe the bug

When @Listener is used with "properties" (consumer properties overrides) set as part of the annotation, those properties are not applied to configure the key- and value-deserializer.

This is relevant when the deserializers use a schema-registry, and we want to use different clusters and schema-registries for multiple @Listener's.

To Reproduce

Override the "schema.registry.url" property using properties field of listener annotation, check values that are applied in deserializer.

The configs map which is part of public void configure(Map<String, ?> configs, boolean isKey) in Deserializers does currently not contain overridden properties.

Expected behavior

The overridden properties from Listener annotation should be part of the config map which is used to configure the deserializers.

@sobychacko
Copy link
Contributor

@sjann0 Could you create a small sample application so we can quickly reproduce the issue? That way, the triaging will be faster. Thanks!

@sjann0
Copy link
Author

sjann0 commented Oct 4, 2024

Sample:

There is a method with 2 KafkaListeners, each pointing to a different kafka-cluster..

  @KafkaListener(id = "listener1", groupId = "${kafka.consumer-group-id}", topicPattern = "${kafka.topics.messagev1}", batch = "true", autoStartup = "true")
  @KafkaListener(id = "listener2", groupId = "${kafka.consumer-group-id}", topicPattern = "${kafka.topics.messagev1}", batch = "true", autoStartup = "true", properties = "${kafka.cluster2-props}")
  public void batchProcessMessages(@Payload final List<ConsumerRecord<String, T>> records, final Acknowledgment acknowledgment) {  	
  	..
  }

Then we have a consumerFactory like so..

  @Bean
  protected ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), 
        () -> new StringDeserializer(),
        () -> new ErrorHandlingDeserializer<>(new KafkaAvroDeserializer())
    );
  }

And here the override-properties for listener2 (yaml)

  kafka.cluster2-props: |-
    bootstrap.servers=kafka.xxx.ch:9092
    schema.registry.url=https://schemaregistry.xxx.ch:443

I was hoping, that the lazy initialized deserializers would use the override properties from listener-2 as well, but nope..

@artembilan
Copy link
Member

If you do

    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), 
        () -> new StringDeserializer(),
        () -> new ErrorHandlingDeserializer<>(new KafkaAvroDeserializer())

yourself, then behavior is expected.
At the moment when we reach Deserializers in the Kafka client, the mentioned configure() is not called, because a Deserializer instance is already provided:

    public Deserializers(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);

        if (keyDeserializer == null) {
            this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
        } else {
            config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
            this.keyDeserializer = keyDeserializer;
        }

        if (valueDeserializer == null) {
            this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
        } else {
            config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
            this.valueDeserializer = valueDeserializer;
        }
    }

In my opinion the logic is correct and it is better to not mutate an externally provided object.
Or you fully configure that KafkaAvroDeserializer instance upfront, before injecting it into this DefaultKafkaConsumerFactory.
Or you go full config props way and let Kafka Client to configure it for you:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

Push these props to your consumerConfigs() and just use this constructor instead:

  @Bean
  protected ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

Closed as Works as Designed

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Oct 4, 2024
@sjann0
Copy link
Author

sjann0 commented Oct 5, 2024

Thanks for your advice.

The "configure" method on the Deserializer is actually called, but without the overridden properties from the listener. This makes the setup for two clusters (and two schema-registries) not impossible, but more complicated than necessary.

@artembilan
Copy link
Member

Right. That is called from the DefaultKafkaConsumerFactory. See its keyDeserializerSupplier() and that configureDeserializers populated as true from ctor:

	public DefaultKafkaConsumerFactory(Map<String, Object> configs,
			@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier,
			@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {

		this(configs, keyDeserializerSupplier, valueDeserializerSupplier, true);
	}

However I see your point and believe that we can fix it calling keyDeserializerSupplier() from the ExtendedKafkaConsumer with modified configs.

Reopening that it is really makes sense to do that after further investigation.
However this will be done only as a part of the current version since it comes with a slight breaking change in the behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants