Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: Nak delay and BackOff lists #894

Merged
merged 3 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c h1:TfLMCHvaj2YSNrgiEWQiXA344lWqPmX3xOLtZj/ywlA=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
76 changes: 52 additions & 24 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,9 @@ func ExpectLastMsgId(id string) PubOpt {
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
ttl time.Duration
ctx context.Context
nakDelay time.Duration
}

// AckOpt are the options that can be passed when acknowledge a message.
Expand Down Expand Up @@ -880,29 +881,37 @@ func Context(ctx context.Context) ContextOpt {
return ContextOpt{ctx}
}

type nakDelay time.Duration

func (d nakDelay) configureAck(opts *ackOpts) error {
opts.nakDelay = time.Duration(d)
return nil
}

// Subscribe

// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
Expand Down Expand Up @@ -2638,14 +2647,22 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
wait = js.opts.wait
}

var body []byte
// This will be > 0 only when called from NakWithDelay()
if o.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
} else {
body = ackType
}

if sync {
if usesCtx {
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
_, err = nc.RequestWithContext(ctx, m.Reply, body)
} else {
_, err = nc.Request(m.Reply, ackType, wait)
_, err = nc.Request(m.Reply, body, wait)
}
} else {
err = nc.Publish(m.Reply, ackType)
err = nc.Publish(m.Reply, body)
}

// Mark that the message has been acked unless it is AckProgress
Expand Down Expand Up @@ -2676,6 +2693,17 @@ func (m *Msg) Nak(opts ...AckOpt) error {
return m.ackReply(ackNak, false, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message after the give `delay` duration. You can configure the number
// of redeliveries by passing nats.MaxDeliver when you Subscribe.
// The default is infinite redeliveries.
func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
if delay > 0 {
opts = append(opts, nakDelay(delay))
}
return m.ackReply(ackNak, false, opts...)
}

// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func (m *Msg) Term(opts ...AckOpt) error {
Expand Down
96 changes: 95 additions & 1 deletion test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2465,7 +2465,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Subjects: []string{"foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -2741,6 +2741,100 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {

checkAcks(t, sub)
})

t.Run("Nak with delay", func(t *testing.T) {
js.Publish("bar", []byte("msg"))
sub, err := js.SubscribeSync("bar", nats.Durable("nak_dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.NakWithDelay(500 * time.Millisecond); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
// We should not get redelivery before 500ms+
if _, err = sub.NextMsg(250 * time.Millisecond); err != nats.ErrTimeout {
t.Fatalf("Expected timeout, got %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on NextMsg: %v", err)
}
if err := msg.NakWithDelay(0); err != nil {
t.Fatalf("Error on Nak: %v", err)
}
msg, err = sub.NextMsg(250 * time.Millisecond)
if err != nil {
t.Fatalf("Expected timeout, got %v", err)
}
msg.Ack()
})

t.Run("BackOff redeliveries", func(t *testing.T) {
inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer sub.Unsubscribe()
cc := nats.ConsumerConfig{
Durable: "backoff",
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: "bar",
DeliverSubject: inbox,
BackOff: []time.Duration{50 * time.Millisecond, 250 * time.Millisecond},
}
// First, try with a MaxDeliver that is < len(BackOff), which the
// server should reject.
cc.MaxDeliver = 1
_, err = js.AddConsumer("TEST", &cc)
if err == nil || !strings.Contains(err.Error(), "max deliver is required to be > length of backoff values") {
t.Fatalf("Expected backoff/max deliver error, got %v", err)
}
// Now put a valid value
cc.MaxDeliver = 4
ci, err := js.AddConsumer("TEST", &cc)
if err != nil {
t.Fatalf("Error on add consumer: %v", err)
}
if !reflect.DeepEqual(ci.Config.BackOff, cc.BackOff) {
t.Fatalf("Expected backoff to be %v, got %v", cc.BackOff, ci.Config.BackOff)
}
// Consume the first delivery
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg: %v", err)
}
// We should get a redelivery at around 50ms
start := time.Now()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg: %v", err)
}
if dur := time.Since(start); dur < 25*time.Millisecond || dur > 100*time.Millisecond {
t.Fatalf("Expected to be redelivered at around 50ms, took %v", dur)
}
// Now it should be every 250ms or so
for i := 0; i < 2; i++ {
start = time.Now()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on nextMsg for iter=%v: %v", i+1, err)
}
if dur := time.Since(start); dur < 200*time.Millisecond || dur > 300*time.Millisecond {
t.Fatalf("Expected to be redelivered at around 250ms, took %v", dur)
}
}
// At this point, we should have go reach MaxDeliver
_, err = sub.NextMsg(300 * time.Millisecond)
if err != nats.ErrTimeout {
t.Fatalf("Expected timeout, got %v", err)
}
})
}

func TestJetStreamPullSubscribe_AckPending(t *testing.T) {
Expand Down