Skip to content

Commit

Permalink
Wire in AllowMsgTTL to stream config
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Dec 20, 2024
1 parent e9d14b0 commit 74e161d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 18 deletions.
33 changes: 33 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,39 @@ func jsClientConnectURL(t testing.TB, url string, opts ...nats.Option) (*nats.Co
return nc, js
}

// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfig {
j, err := json.Marshal(cfg)
require_NoError(t, err)

msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second*3)
require_NoError(t, err)

var resp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(msg.Data, &resp))
require_NotNil(t, resp.StreamInfo)
return &resp.Config
}

// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamUpdate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConfig, error) {
j, err := json.Marshal(cfg)
require_NoError(t, err)

msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), j, time.Second*3)
require_NoError(t, err)

var resp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(msg.Data, &resp))

if resp.Error != nil {
return nil, resp.Error
}

require_NotNil(t, resp.StreamInfo)
return &resp.Config, nil
}

func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) {
t.Helper()
checkFor(t, 10*time.Second, 20*time.Millisecond, func() error {
Expand Down
61 changes: 44 additions & 17 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24745,11 +24745,12 @@ func TestJetStreamMessageTTL(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
require_NoError(t, err)

msg := &nats.Msg{
Subject: "test",
Expand Down Expand Up @@ -24784,11 +24785,12 @@ func TestJetStreamMessageTTLRestart(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
require_NoError(t, err)

msg := &nats.Msg{
Subject: "test",
Expand Down Expand Up @@ -24838,11 +24840,12 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
require_NoError(t, err)

msg := &nats.Msg{
Subject: "test",
Expand Down Expand Up @@ -24895,22 +24898,46 @@ func TestJetStreamMessageTTLInvalid(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
require_NoError(t, err)

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "500ms")
_, err = js.PublishMsg(msg)
_, err := js.PublishMsg(msg)
require_Error(t, err)

msg.Header.Set("Nats-TTL", "something")
_, err = js.PublishMsg(msg)
require_Error(t, err)
}

func TestJetStreamMessageTTLNotUpdatable(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

_, err := jsStreamUpdate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: false,
})
require_Error(t, err)
}
7 changes: 6 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
fsCfg.SyncInterval = s.getOpts().SyncInterval
fsCfg.SyncAlways = s.getOpts().SyncAlways
fsCfg.Compression = config.Compression
fsCfg.EnforceTTLs = true // config.AllowMsgTTL
fsCfg.EnforceTTLs = config.AllowMsgTTL

if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
Expand Down Expand Up @@ -1782,6 +1782,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server, pedan
}
}

// Check on the allowed message TTL status.
if cfg.AllowMsgTTL != old.AllowMsgTTL {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("message TTL status can not be changed after stream creation"))
}

// Do some adjustments for being sealed.
// Pedantic mode will allow those changes to be made, as they are determinictic and important to get a sealed stream.
if cfg.Sealed {
Expand Down

0 comments on commit 74e161d

Please sign in to comment.