You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Following #2049, Spring Kafka may also support Spring managed interceptors for standard consumers and streams.
In pure Kafka, interceptors are specified through a configuration entry. This entry contains the list of classes related to the interceptors. This list of class is then converted into a list of interceptor instances (see AbstractConfig#getConfiguredInstances). As the interceptor is instantiated by Kafka itself, there is no way to inject dependencies into the interceptor expect by using the trick with the configure method (see https://docs.spring.io/spring-kafka/docs/current/reference/html/#interceptors).
An update in Kafka to support instances instead of classes (technical limitation?) could help a lot.
So, the proposal would be to extend ProducerConfig, ConsumerConfig and StreamsConfig as follow to complete the list of interceptors with the Spring managed interceptors.
// example for streams
public class SpringAwareStreamConfig extends StreamsConfig {
private final List<ProducerInterceptor<?, ?>> producerInterceptors;
private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;
public SpringAwareStreamConfig(Map<?, ?> props,
boolean doLog,
List<ProducerInterceptor<?, ?>> producerInterceptors,
List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
super(props, doLog);
this.producerInterceptors = producerInterceptors;
this.consumerInterceptors = consumerInterceptors;
}
@Override
public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
if (ConsumerInterceptor.class.equals(t)) {
configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
}
if (ProducerInterceptor.class.equals(t)) {
configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
}
return configuredInstances;
}
}
These new Kafka config classes would then be respectively instantiated in
DefaultKafkaProducerFactory#createRawProducer
DefaultKafkaConsumerFactory#createRawConsumer
StreamsBuilderFactoryBean#start
This approach would let Kafka handle the calls to the interceptor methods instead of having to call these methods in Spring Kafka itself.
Any comment, feedback, other proposal is more than welcome.
The text was updated successfully, but these errors were encountered:
Unfortunately, the proposal I did is not applicable for producers and consumers as the constructor accepting a producer/consumer config is not public (default modifier).
Issue has been created on the Kafka project to increase the visibility of this constructor.
See https://issues.apache.org/jira/browse/KAFKA-13864
I could still drop a PR for streams but this would be incomplete.
Following #2049, Spring Kafka may also support Spring managed interceptors for standard consumers and streams.
In pure Kafka, interceptors are specified through a configuration entry. This entry contains the list of classes related to the interceptors. This list of class is then converted into a list of interceptor instances (see AbstractConfig#getConfiguredInstances). As the interceptor is instantiated by Kafka itself, there is no way to inject dependencies into the interceptor expect by using the trick with the configure method (see https://docs.spring.io/spring-kafka/docs/current/reference/html/#interceptors).
An update in Kafka to support instances instead of classes (technical limitation?) could help a lot.
So, the proposal would be to extend ProducerConfig, ConsumerConfig and StreamsConfig as follow to complete the list of interceptors with the Spring managed interceptors.
These new Kafka config classes would then be respectively instantiated in
This approach would let Kafka handle the calls to the interceptor methods instead of having to call these methods in Spring Kafka itself.
Any comment, feedback, other proposal is more than welcome.
The text was updated successfully, but these errors were encountered: