From 824712b80877d0d19b27a6fa3f31ae31206d984c Mon Sep 17 00:00:00 2001 From: linxside <39219399+linxside@users.noreply.github.com> Date: Sun, 17 Mar 2024 11:22:07 +0100 Subject: [PATCH] fix: make nats consumer durable with cleanup after one week (#363) --- pkg/microservice/messaging/nats.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/microservice/messaging/nats.go b/pkg/microservice/messaging/nats.go index 2da32f84..4d800ca2 100644 --- a/pkg/microservice/messaging/nats.go +++ b/pkg/microservice/messaging/nats.go @@ -73,11 +73,13 @@ func (nb *natsBroker) CreatePersistentMessageReceiver(name, id, address, port, c messageStartTime := time.Now().Add(-15 * time.Minute).Round(time.Minute) cfg := jetstream.ConsumerConfig{ - Name: name + id, - DeliverPolicy: jetstream.DeliverByStartTimePolicy, - OptStartTime: &messageStartTime, - AckPolicy: jetstream.AckNonePolicy, - Metadata: map[string]string{"id": id, "address": address, "port": port, "type": cType}, + Name: name + id, + Durable: name + id, + InactiveThreshold: 8 * 24 * time.Hour, + DeliverPolicy: jetstream.DeliverByStartTimePolicy, + OptStartTime: &messageStartTime, + AckPolicy: jetstream.AckNonePolicy, + Metadata: map[string]string{"id": id, "address": address, "port": port, "type": cType}, } consumer, err := stream.CreateOrUpdateConsumer(ctx, cfg)