[QUESTION] - Is it possible to Start, Stop, Pause, Resume consumer? #1504
-
I was wondering if I can Start/Stop, Resume/Pause the consumer on demand? For e.g. package main
import (
"context"
"fmt"
"runtime"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := jetstream.New(nc)
cfg := jetstream.StreamConfig{
Name: "EVENTS",
Retention: jetstream.WorkQueuePolicy,
Subjects: []string{"events.>"},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := js.Stream(ctx, "EVENTS")
if err != nil || stream == nil {
stream, err = js.CreateStream(ctx, cfg)
if err != nil {
panic(err)
}
fmt.Println("created the stream")
}
cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "processor-us",
FilterSubject: "events.>",
})
if err != nil {
panic(err)
}
cons.Consume(func(msg jetstream.Msg) {
fmt.Printf("eu sub got: %s\n", msg.Subject())
msg.Ack()
})
// cons.Pause()
// cons.Resume()
// cons.Stop()
runtime.Goexit()
} |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
There is |
Beta Was this translation helpful? Give feedback.
-
You should use |
Beta Was this translation helpful? Give feedback.
-
Note that the ability to pause/resume consumers will be available as part of the nats server version 2.11 and above |
Beta Was this translation helpful? Give feedback.
You should use
Messages()
instead ofConsume()
that way when you want to pause you simply stop calling it in a loop and when you want to resume you start that loop again.