diff --git a/jetstream/kv.go b/jetstream/kv.go index 3bc689039..f1596e544 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "reflect" "regexp" "strconv" "strings" @@ -488,8 +489,26 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke // errors are joined so that backwards compatibility is retained // and previous checks for ErrStreamNameAlreadyInUse will still work. err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err) + + // If we have a failure to add, it could be because we have + // a config change if the KV was created against before a bug fix + // that changed the value of discard policy. + // We will check if the stream exists and if the only difference + // is the discard policy, we will update the stream. + // The same logic applies for KVs created pre 2.9.x and + // the AllowDirect setting. + if stream, _ = js.Stream(ctx, scfg.Name); stream != nil { + cfg := stream.CachedInfo().Config + cfg.Discard = scfg.Discard + cfg.AllowDirect = scfg.AllowDirect + if reflect.DeepEqual(cfg, scfg) { + stream, err = js.UpdateStream(ctx, scfg) + } + } + } + if err != nil { + return nil, err } - return nil, err } pushJS, err := js.legacyJetStream() if err != nil { diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index b6ad3a611..8b455e425 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -1634,3 +1634,58 @@ func TestKeyValueCompression(t *testing.T) { t.Fatalf("Expected stream to be compressed with S2") } } + +func TestKeyValueCreateRepairOldKV(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + // create a standard kv + _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "A", + }) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + // get stream config and set discard policy to old and AllowDirect to false + stream, err := js.Stream(ctx, "KV_A") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + streamCfg := stream.CachedInfo().Config + streamCfg.Discard = jetstream.DiscardOld + streamCfg.AllowDirect = false + + // create a new kv with the same name - client should fix the config + _, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "A", + }) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + // get stream config again and check if the discard policy is set to new + stream, err = js.Stream(ctx, "KV_A") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + if stream.CachedInfo().Config.Discard != jetstream.DiscardNew { + t.Fatalf("Expected stream to have discard policy set to new") + } + if !stream.CachedInfo().Config.AllowDirect { + t.Fatalf("Expected stream to have AllowDirect set to true") + } + + // attempting to create a new kv with the same name and different settings should fail + _, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "A", + Description: "New KV", + }) + if !errors.Is(err, jetstream.ErrBucketExists) { + t.Fatalf("Expected error to be ErrBucketExists, got: %v", err) + } +}