-
Notifications
You must be signed in to change notification settings - Fork 688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NextMsgWithContext
does not return messages
#785
Comments
Hi, right now NextMsg or NextMsgWithContext are not aliases to Fetch and only used with Push subscribers. Related PR: https://github.com/nats-io/nats.go/pull/754/files |
Is the nats.WorkQueuePolicy only available for Pull? I changed the PullSubscribe to SubscribeSync, and now I see this error:
Ignore the smells, I cut a bunch of stuff to make a minimal example 😝 |
@vdwees work queue policy is available for both, but once the consumer is created, you need to use the proper API depending on the delivery mode (push/pull). PullSubscribe + Fetch is only for pull consumers, SubscribeSync is for push consumers. |
Thanks. One more question: Is there a way to explicitly set a consumer as push or pull? Or maybe all durable consumers are pull? Asking because even with a fresh image and the following script, I still see the "must use pull subscribe to bind to pull based consumer" error. func main() {
nc, err := nats.Connect("localhost:4222")
if err != nil {
log.Fatalf("Encountered error: %s\n", err)
}
js, err := nc.JetStream()
if err != nil {
log.Fatalf("Encountered error: %s\n", err)
}
streamConfig := nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.>"},
}
log.Printf("Configuring stream: \n%+v\n", streamConfig)
_, err = js.AddStream(&streamConfig)
if err != nil {
log.Fatalf("Encountered error: %s\n", err)
}
consumerConfig := nats.ConsumerConfig{
Durable: "DURABLECONSUMER",
AckPolicy: nats.AckExplicitPolicy,
}
log.Printf("Configuring consumer: \n%+v\n", consumerConfig)
_, err = js.AddConsumer("ORDERS", &consumerConfig)
if err != nil {
log.Fatalf("Encountered error: %s\n", err)
}
_, err = js.SubscribeSync(
"ORDERS.*",
nats.Bind("ORDERS", "DURABLECONSUMER"),
)
if err != nil {
log.Fatalf("Encountered error: %s\n", err)
}
} |
@vdwees the presence of a delivery subject is what determines whether it would be a pull based consumer, if there is none then it is a pull consumer, otherwise is a push subscriber. |
@wallyqs Thanks, somehow I missed that detail. I can create the subscriber now. In an example you posted in an issue: #748 (comment), you use |
@vdwees the inbox gives you a guaranteed to be unique subject so that it does not collide with a subject that might be used for another purpose in the same account namespace, but does not have to be that way, a simple string is fine |
With latest #791, NextMsg on a PullSubscribe will return ErrBadSubscription because user are not allowed to call NextMsg but need to call Fetch(). |
Not entirely, in that PR I do it only for NextMsgWithContent. In upcoming PR, it will indeed cause NextMsg() to return ErrTypeSubscription (btw, this is ErrTypeSubscription, not ErrBadSubscription as I wrote earlier). So this will close when another PR that I will submit later lands. |
They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Defect
NextMsgWithContext
does not return messages, but simply hangs until the ctx is closed. However, when usingFetch(1)
the same subscriber, messages are received.Versions of
nats.go
and thenats-server
if one was involved:nats.go v1.11.1-0.20210623165838-4b75fc59ae30
docker run -p 4222:4222 -ti nats:2.3.3-alpine -js
OS/Container environment:
Debian-like
Steps or code to reproduce the issue:
nats pub ORDERS.test "message {{.Count}} @ {{.TimeStamp}}" --count=3
Expected result:
I would expect that message 1 and 2 would get printed and the process would terminate.
I would expect that message 2 would get printed and the process would terminate.
Actual result:
Message 1 is printed, and the process hangs.
The text was updated successfully, but these errors were encountered: