diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index fccb2646da..4afbd952d3 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1235,6 +1235,39 @@ func jsClientConnectURL(t testing.TB, url string, opts ...nats.Option) (*nats.Co return nc, js } +// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet. +func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfig { + j, err := json.Marshal(cfg) + require_NoError(t, err) + + msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second*3) + require_NoError(t, err) + + var resp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_NotNil(t, resp.StreamInfo) + return &resp.Config +} + +// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet. +func jsStreamUpdate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConfig, error) { + j, err := json.Marshal(cfg) + require_NoError(t, err) + + msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), j, time.Second*3) + require_NoError(t, err) + + var resp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + + if resp.Error != nil { + return nil, resp.Error + } + + require_NotNil(t, resp.StreamInfo) + return &resp.Config, nil +} + func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { t.Helper() checkFor(t, 10*time.Second, 20*time.Millisecond, func() error { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 0014c6069f..720e7cd3ec 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24745,11 +24745,12 @@ func TestJetStreamMessageTTL(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"test"}, + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"test"}, + AllowMsgTTL: true, }) - require_NoError(t, err) msg := &nats.Msg{ Subject: "test", @@ -24784,11 +24785,12 @@ func TestJetStreamMessageTTLRestart(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"test"}, + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"test"}, + AllowMsgTTL: true, }) - require_NoError(t, err) msg := &nats.Msg{ Subject: "test", @@ -24838,11 +24840,12 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"test"}, + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"test"}, + AllowMsgTTL: true, }) - require_NoError(t, err) msg := &nats.Msg{ Subject: "test", @@ -24895,11 +24898,12 @@ func TestJetStreamMessageTTLInvalid(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"test"}, + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"test"}, + AllowMsgTTL: true, }) - require_NoError(t, err) msg := &nats.Msg{ Subject: "test", @@ -24907,10 +24911,33 @@ func TestJetStreamMessageTTLInvalid(t *testing.T) { } msg.Header.Set("Nats-TTL", "500ms") - _, err = js.PublishMsg(msg) + _, err := js.PublishMsg(msg) require_Error(t, err) msg.Header.Set("Nats-TTL", "something") _, err = js.PublishMsg(msg) require_Error(t, err) } + +func TestJetStreamMessageTTLNotUpdatable(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"test"}, + AllowMsgTTL: true, + }) + + _, err := jsStreamUpdate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"test"}, + AllowMsgTTL: false, + }) + require_Error(t, err) +} diff --git a/server/stream.go b/server/stream.go index 313da553dd..cb55632b82 100644 --- a/server/stream.go +++ b/server/stream.go @@ -717,7 +717,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.SyncInterval = s.getOpts().SyncInterval fsCfg.SyncAlways = s.getOpts().SyncAlways fsCfg.Compression = config.Compression - fsCfg.EnforceTTLs = true // config.AllowMsgTTL + fsCfg.EnforceTTLs = config.AllowMsgTTL if err := mset.setupStore(fsCfg); err != nil { mset.stop(true, false) @@ -1782,6 +1782,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server, pedan } } + // Check on the allowed message TTL status. + if cfg.AllowMsgTTL != old.AllowMsgTTL { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("message TTL status can not be changed after stream creation")) + } + // Do some adjustments for being sealed. // Pedantic mode will allow those changes to be made, as they are determinictic and important to get a sealed stream. if cfg.Sealed {