Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a sample with webflux to develop http rest api #3657

Open
gianluca-sabena opened this issue Dec 3, 2024 · 3 comments
Open

Provide a sample with webflux to develop http rest api #3657

gianluca-sabena opened this issue Dec 3, 2024 · 3 comments

Comments

@gianluca-sabena
Copy link

Expected Behavior

Create a sample with spring-kafka and spring webflux

Context

I need to develop a microservices with spring webflux to expose a set of http rest api to allow a client to consume messages from kafka

The api are available based on this issue but the complexity is high. The application must connect a "stream" domain (kafka) with a req-resp (the webflux http rest api) domain.

Can you provide a sample on how to correctly use spring-kafka in a spring webflux application?

Do you know some project with this functionality to look at?

@artembilan
Copy link
Member

You can get an idea from here: https://github.com/artembilan/sandbox/tree/master/amqp-to-webflux.
Unfortunately the request neither belongs to Spring for Apache Kafka, nor Spring WebFlux.
This is something even outside of Spring Boot scope.
Sounds more like a real application with specific task.

We might accept a contribution on the matter, but not a fact that we will implement it ourselves since it is much broader, then this project goal.
Plus it is still not clear what is your ask.

The request-reply for @KafkaListener has nothing to do with HTTP: that request is initiated from Kafka broker, and then KafkaTemplate is used to send reply back to that broker.
HTTP is totally different initiator and maximum what we can suggest is a streaming bridge from @KafkaListener to WebFlux request.

Another option to consume from Kafka topic is to use KafkaTemplate.receive() API.

I'd suggest to start with something and come back to StackOverflow with more specific question: our knowledge in this project might not be enough to help you well.

@gianluca-sabena
Copy link
Author

Hi @artembilan,
thank you for your reply and suggestions.

To help clarify the case I need to create a few http rest api to receive messages from kafka.
The api could be identical to this ones:

A high level flow is:

  • the http client call the first http api create_consumer_group and receive a consumer_id in the response
  • next the http client call a second http api subscribe_to_topic and pass consumer_id

Main points:

  • What is the best approach to create a kafka client and and re-activate when a http api subscribe_to_topic is received
  • How to connect messages received from kafka client to webflux http response as a stream

Thanks again

@artembilan
Copy link
Member

OK. Sounds like very complex application, which is indeed out of this project scope.

Saying that here are some pointers on what I see so far:

I don't see a correlation between create_consumer_group and consumer_id. The consumer group is really about several competing consumers: https://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups.
The join_group might sound more reasonable. But again: when you join the group, that does not mean that your consumer may receive any records. Just because of those competition rules on the consumer group.
So, I don't understand what is really a task for this API.

If you say there is already REST API from Confluent on the matter, so why pursue your own?

What is the best approach to create a kafka client and and re-activate when a http api subscribe_to_topic is received

That sounds like a Map<String, KafkaConsumer> based on that consumer_id as a key.
However else you would implement such a logic?

How to connect messages received from kafka client to webflux http response as a stream

That is exactly what I've showed with my sample for AMQP above.

It is probably going to be a bit complicated trying to create a @KafkaListener on the fly, so better to look into a ConcurrentMessageListenerContainer: https://docs.spring.io/spring-kafka/reference/kafka/dynamic-containers.html

At the same time, most of those components are supposed to be Spring beans and that complicates things more for such a dynamic API.
You might would need look into Spring Integration and its dynamic flows functionality: https://docs.spring.io/spring-integration/reference/dsl/java-runtime-flows.html.

I'm not sure how to help you else with much more dedication and requirements investigation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants