-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support kafka parallel-consumer #2381
Comments
No plans currently; but contributions are welcome! |
That said; it's easy enough to use it. from Spring - just set up a @SpringBootApplication
public class Kgh2381Application {
private static final Logger log = LoggerFactory.getLogger(Kgh2381Application.class);
public static void main(String[] args) {
SpringApplication.run(Kgh2381Application.class, args);
}
@KafkaListener(id = "kgh2381", topics = "kgh2381", autoStartup = "false")
void listen(String in) {
log.info(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("kgh2381").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry registry, ConsumerFactory<String, String> cf,
KafkaTemplate<String, String> template) {
return args -> {
MessageListener messageListener = (MessageListener) registry.getListenerContainer("kgh2381")
.getContainerProperties().getMessageListener();
Consumer<String, String> consumer = cf.createConsumer("group", "");
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(ProcessingOrder.KEY)
.consumer(consumer)
.maxConcurrency(10)
.build();
ParallelStreamProcessor<String, String> processor = ParallelStreamProcessor
.createEosStreamProcessor(options);
processor.subscribe(List.of("kgh2381"));
processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));
IntStream.range(0, 10).forEach(i -> template.send("kgh2381", "foo" + i));
};
}
} |
I am trying to implement the parallel consumer in spring boot and using the above mentioned way. I have main class in separate java file and rest of the methods(KafkaListener, ApplicationRunner) in another java file. Also, have created a ConsumerProp class with the below content: @EnableKafka
@Configuration
public class ConsumerProp {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
return new DefaultKafkaConsumerFactory<>(configs);
}
} But the spring boot just exists without even any error. Could you please help or put a sample working example which I can leverage for further use. |
I assume you mean exits (not exists). You must have done something wrong; the example above is a complete working example using Spring Boot. This is the output when running it...
|
This is its application.properties spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
That file must be annotated with You don't need your own consumer factory bean, just use Boot's. |
Thank for the correcting the type(exits). I was able to run the application by specifying I am using consumer factory bean as I need to provide few properties dynamically(some cases from environment variables). Like Bootstrap server etc.. If that's okay with you I really would like you to share the sample project you just ran above. Thanks again for all the help. |
There's nothing more to share; that is the entire app (together with the properties above). |
As you stated earlier 'You don't need your own consumer factory bean, just use Boot's.' I have just observed that the runner method is using the spring boot's consumer factory only, the one I have created is not being used. Could you please help me what should I do use the consumer factory I have created. |
Since you define your own consumer factory bean, Boot will detect it and not declare its own; yours will be injected into the runner instead. |
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory( Your |
As of now I am trying with the spring boot's consumer factory. My application is stuck at
I am not very good at spring boot yet, Sorry for asking the small doubts. |
You have to start the container to consume. |
Sorry, did not get what do you mean by container. I am running the application in Intellij idea and the Kafka is up and running. |
Do you mean by setting the |
Could you please help me in understanding why Ideally it should call the listen method as soon as there is a message to consume as defined below. |
After adding the below line I was able to consume the messages. but each message is being consumed 2 times,, Could you please help me with that.
Could you please help me with what/where is the right way to start the container. Output:
If you don't mind could you please send your running project example to my email id 'dixitsingla@gmail.com' for reference. Or can upload on the github so that it could help others. |
Sorry, I forgot this was about the parallel consumer. You should not start the container. It looks like you are seeing data. As I said, there is nothing else to share, just copy/paste the code into a class named |
Previous .tgz was missing the |
Thanks a lot for sharing the project.
And on the custom consumer factory (don't want to use the spring boot's config) what changes do I need to do in the ConsumerProp class. This is the current ConsumerProp class. I just wanted to use the below DefaultKafkaConsumerFactory instead of the default spring boot's consumer factory.
|
Why? Just add the bean and it will replace Boot's. If it's in a separate file ( |
Hi, @garyrussell |
Expected Behavior
Enable parallel processing of the messages in single consumer.
Current Behavior
Single-threaded processing of the messages.
Context
Are there any plans to include
ParallelStreamProcessor
as an option for spring-kafka? https://github.com/confluentinc/parallel-consumer It should handle automatically multiple threads, acking and other stuff (synchronization on same key if needed etc.).Currently I can configure this manually, but would be easier I guess If I would only need to implement
void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);
, and the rest is taken care by the spring.The text was updated successfully, but these errors were encountered: