-
Notifications
You must be signed in to change notification settings - Fork 688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
js: Ephemeral consumers with js.QueueSubscribe act like Subscribe #748
Comments
@wallyqs this is the same behaviour as the javascript clients - the change would require that the (maybe generated) name of the consumer be passed, OR would need some mechanism to filter all consumers based on a subject. Perhaps I am missing something here. |
@aricart Yes that's the way it currently works. A few users have run into this expecting load balancing a la core NATS QueueSubscribe so maybe should improve somehow. |
We should be able to bind to any consumer. All are named (ConsumerInfo shows it), its just their lifetime changes whether or not its durable or ephemeral. |
I tried to use the QueueSubscribe. but even using durable, it is not working. |
When subscribing if we can bind to an existing consumer we should, meaning js.QueueSubscribe should work they way its intended. Does this not behave this way for the JS clients? |
@rwrz Below you can find a couple of examples of using js.QueueSubscribe: QueueSubscribe with Durablepackage main
import (
"fmt"
"log"
"github.com/nats-io/nats.go"
)
func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
func main() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo"},
})
for i := 0; i < 20; i++ {
n := i
js.QueueSubscribe("foo", "bar", func(msg *nats.Msg) {
log.Printf("[Received n=%-2d]:\t%+v", n, string(msg.Data))
}, nats.Durable("consumer"))
}
for i := 0; i < 1000; i++ {
js.Publish("foo", []byte(fmt.Sprintf("i:%v", i)))
}
nc.Drain()
} QueueSubscribe with ephemeralpackage main
import (
"fmt"
"log"
"github.com/nats-io/nats.go"
)
func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
func main() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo"},
})
inbox := nats.NewInbox()
// Avoid: 'consumer requires interest for delivery subject when ephemeral'
qsub, err := nc.QueueSubscribeSync(inbox, "bar")
if err != nil {
log.Fatal(err)
}
consumer, err := js.AddConsumer("foo", &nats.ConsumerConfig{
DeliverSubject: inbox,
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
log.Fatal(err)
}
log.Printf("%+v", consumer)
for i := 0; i < 20; i++ {
n := i
js.QueueSubscribe("foo", "bar", func(msg *nats.Msg) {
log.Printf("[Received n=%-2d]:\t%+v", n, string(msg.Data))
}, nats.Bind("foo", consumer.Name))
}
// Remove original subscription.
qsub.Unsubscribe()
for i := 0; i < 1000; i++ {
js.Publish("foo", []byte(fmt.Sprintf("i:%v", i)))
}
nc.Drain()
} |
@wallyqs your example with ephemeral consumer is only valid when running in a single process. You can't do this reliably from multiple processes or multiple machines, because you wouldn't know the name of the ephemeral consumer and even if you could query it you'll end up with race conditions. |
There are ways around that but you do raise a good point. Several folks have suggested being able to name ephemerals as a potential workaround. |
They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Currently js.QueueSubscribe unless using a durable or binding to a consumer with a durable name, no load balancing would occur when other members call
js.QueueSubscribe("foo", "bar")
since each call would have its own ephemeral consumer.The text was updated successfully, but these errors were encountered: