Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats Jetstream Support in EventSource #3160

Open
justinfx opened this issue Jun 12, 2024 · 13 comments
Open

Nats Jetstream Support in EventSource #3160

justinfx opened this issue Jun 12, 2024 · 13 comments
Labels
enhancement New feature or request

Comments

@justinfx
Copy link

Is your feature request related to a problem? Please describe.
I have a Nats Jetstream cluster running, which contains many different streams of events. I want to use these streams as an EventSource to trigger in a Sensor. There does not seem to be any current support for Nats Jetstream in the NATSEventSource, as it only supports Nats Core subscriptions. The problem is that one cannot do reliable messaging from Nats using only the Nats Core subscriptions. If the EventSource is stopped for a period of time, it would miss messages during that period of time, and only get the next new message when it reconnects. The only way to mitigate this problem is to use a "queue" name, and multiple replicaset of EventSource with rolling update, and make sure that at least 1 instance is always running. Even then, it is likely to have out-of-order messages when using a queue on Nats core.
There is no current way to define a "stream" and various Nats Consumer details, to establish a reliable consumer of Nats Jetstream messages. A Jetstream Consumer would keep track of the sequence position via auto-ack, and be able to pick up where it left off, on the next connection. This is similar to how Kafka works.

Describe the solution you'd like
I would love to see one of the following possible solutions:

  1. NATSEventSource gets updated with extra parameters around specifying the stream name, and extra consumer options like the durable name, filter subjects, and inactivity threshold. This would mean some branching logic would have to be added to handle the client using a Jetstream consumer instead of nats core subscriptions.
  2. NATSJetStreamEventSource is created as a separate implementation

In either case, it would use options exposed from here: https://docs.nats.io/nats-concepts/jetstream/consumers

When the EventSource is deployed, it should start out similar to the existing NATSEventSource to create a client, but then it should create-or-update a Jetstream Consumer for the named Stream, which can either start from the next new message if it is a new consumer, or it would pick up from the last message position for the existing consumer. The consumer could use InactivityThreshold so that it would auto-delete after a period of time with no Consumer activity (configurable).

It should be possible to achieve durable messaging with just a single EventSource pod.

Describe alternatives you've considered
If this were not something Argo Events wanted to officially implement, we would have to consider either writing our own EventSource, or just not using Argo Events, and choosing a solution that can support Nats Jetstream sources.

Additional context
Original question on slack:
https://cloud-native.slack.com/archives/C01TNKD6KL6/p1715833804917629

Older request for the same feature, which was closed:
#2409


Message from the maintainers:

If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.

@justinfx justinfx added the enhancement New feature or request label Jun 12, 2024
@justinfx
Copy link
Author

justinfx commented Jul 9, 2024

Just bumping this, as we are getting close to being able to introduce Argo Events into our production environment. However it is kind of a blocker to not have Jetstream support as a listener, and I am not clear yet whether I need to invest time in writing some custom EventSource, or if this is something that can be officially supported?

@justinfx
Copy link
Author

As it currently stands, I have gone the route of implementing this internally as a generic NATSJSEventSource. The reason I did this was it felt like the path of least resistance, instead of trying to fork argo-events, implement it as a first class EventSource, and then try to contribute it and use the forked build while waiting to get it merged upstream.
I did however write it as close as possible to the existing NatsEventSource implementation, and I would be happy to share my source code if someone wants to port it over as 1st class.
I do believe this is a pretty important feature for anyone wanting to use Nats as an EventSource and want to actually take advantage of the durability of Jetstream.

@rossnelson
Copy link

@justinfx I am encountering the same problem and I'm curious how you implemented your solution. I was thinking it might be cleaner to create a custom jetstream consumer without argo using the golang sdk and simply having it act as a proxy between the listening subject and the subject the sensor is listening to.

Heck, whats to stop the application from sending directly to the subject the sensor is listening to?

Is this essentially what you have done or did you implement a custom event source using this: https://argoproj.github.io/argo-events/eventsources/generic/

@justinfx
Copy link
Author

justinfx commented Aug 8, 2024

@rossnelson yes that is exactly what I implemented! I used the generic argo eventsource, where you implement just one required service function on the proto type. That function receives each new stream request to start a new nats Jetstream consumer on behalf of the proxy that argo will run for each newly deployed instance of this eventsource.
So it becomes a stateless service where you only have to serve the consumer for the lifetime of that single streaming context. And you accept whatever config details you want with the blob config field they deliver. We have it working now.
The downside of the generic eventsource approach is that you have to deploy your own service to host it, as opposed to how nicely argo will deploy everything for its 1st class eventsources.

@rossnelson
Copy link

@justinfx I'm interested in seeing what you have if you are still willing to share.

@justinfx
Copy link
Author

justinfx commented Aug 8, 2024

@rossnelson I did write it using only the argo and Nats packages with the intent of contributing it if and when an argo maintainer responded to this issue with interest.
But I can actually just look into open sourcing it today. Ideally I wouldn't have to officially maintain it for too long, until something gets picked up as a 1st class option. I'll share the reference project on this ticket. Stay tuned!

@justinfx
Copy link
Author

justinfx commented Aug 9, 2024

@rossnelson I've put together an open source release of my work:
https://github.com/justinfx/argo-natsjs-eventsource

@praveenperera
Copy link

Can we get this merged?

@rossnelson
Copy link

@rossnelson I've put together an open source release of my work:

https://github.com/justinfx/argo-natsjs-eventsource

I'll take a look over the next few days. Thank you for this. Saves me a ton of time. 😂

@justinfx
Copy link
Author

justinfx commented Aug 10, 2024

Can we get this merged?

Someone needs to take the time to refactor it into a 1st class Eventsource, which uses a slightly different interface, and also needs to have the cases added for where it selects the Eventsource type from all the first class string type names. I just didn't want to deal with all that and a fork to work with.

@praveenperera
Copy link

Makes sense thanks, sorry just excited to have it.

@justinfx
Copy link
Author

If you can start with using this generic external solution for now, we can identify bugs and get them merged and released quickly in my repo!

@static-moonlight
Copy link

Oh yes, I definitely need that feature!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants