From 26977473aa0cd3f6df8f9d1743357a16fa72b800 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Wed, 17 Apr 2024 21:51:22 +0200 Subject: [PATCH 1/2] [FIXED] Add discard policy repair logic in CreateKeyValue Signed-off-by: Piotr Piotrowski --- jetstream/kv.go | 18 +++++++++++++- jetstream/test/kv_test.go | 51 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/jetstream/kv.go b/jetstream/kv.go index 3bc689039..9d0853a36 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "reflect" "regexp" "strconv" "strings" @@ -488,8 +489,23 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke // errors are joined so that backwards compatibility is retained // and previous checks for ErrStreamNameAlreadyInUse will still work. err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err) + + // If we have a failure to add, it could be because we have + // a config change if the KV was created against before a bug fix + // that changed the value of discard policy. + // We will check if the stream exists and if the only difference + // is the discard policy, we will update the stream. + if stream, _ = js.Stream(ctx, scfg.Name); stream != nil { + cfg := stream.CachedInfo().Config + cfg.Discard = scfg.Discard + if reflect.DeepEqual(cfg, scfg) { + stream, err = js.UpdateStream(ctx, scfg) + } + } + } + if err != nil { + return nil, err } - return nil, err } pushJS, err := js.legacyJetStream() if err != nil { diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index b6ad3a611..f26b26c1d 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -1634,3 +1634,54 @@ func TestKeyValueCompression(t *testing.T) { t.Fatalf("Expected stream to be compressed with S2") } } + +func TestKeyValueCreateRepairDiscardPolicy(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + // create a standard kv + _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "A", + }) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + // get stream config and set discard policy to old + stream, err := js.Stream(ctx, "KV_A") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + streamCfg := stream.CachedInfo().Config + streamCfg.Discard = jetstream.DiscardOld + + // create a new kv with the same name - client should fix the discard policy + _, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "A", + }) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + // get stream config again and check if the discard policy is set to new + stream, err = js.Stream(ctx, "KV_A") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + if stream.CachedInfo().Config.Discard != jetstream.DiscardNew { + t.Fatalf("Expected stream to have discard policy set to new") + } + + // attempting to create a new kv with the same name and different settings should fail + _, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "A", + Description: "New KV", + }) + if !errors.Is(err, jetstream.ErrBucketExists) { + t.Fatalf("Expected error to be ErrBucketExists, got: %v", err) + } +} From d48f61b767fed8243acc8e4a71bd168e157e4826 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 18 Apr 2024 09:55:17 +0200 Subject: [PATCH 2/2] Repair allow direct Signed-off-by: Piotr Piotrowski --- jetstream/kv.go | 3 +++ jetstream/test/kv_test.go | 10 +++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/jetstream/kv.go b/jetstream/kv.go index 9d0853a36..f1596e544 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -495,9 +495,12 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke // that changed the value of discard policy. // We will check if the stream exists and if the only difference // is the discard policy, we will update the stream. + // The same logic applies for KVs created pre 2.9.x and + // the AllowDirect setting. if stream, _ = js.Stream(ctx, scfg.Name); stream != nil { cfg := stream.CachedInfo().Config cfg.Discard = scfg.Discard + cfg.AllowDirect = scfg.AllowDirect if reflect.DeepEqual(cfg, scfg) { stream, err = js.UpdateStream(ctx, scfg) } diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index f26b26c1d..8b455e425 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -1635,7 +1635,7 @@ func TestKeyValueCompression(t *testing.T) { } } -func TestKeyValueCreateRepairDiscardPolicy(t *testing.T) { +func TestKeyValueCreateRepairOldKV(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -1651,15 +1651,16 @@ func TestKeyValueCreateRepairDiscardPolicy(t *testing.T) { t.Fatalf("Error creating kv: %v", err) } - // get stream config and set discard policy to old + // get stream config and set discard policy to old and AllowDirect to false stream, err := js.Stream(ctx, "KV_A") if err != nil { t.Fatalf("Error getting stream info: %v", err) } streamCfg := stream.CachedInfo().Config streamCfg.Discard = jetstream.DiscardOld + streamCfg.AllowDirect = false - // create a new kv with the same name - client should fix the discard policy + // create a new kv with the same name - client should fix the config _, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ Bucket: "A", }) @@ -1675,6 +1676,9 @@ func TestKeyValueCreateRepairDiscardPolicy(t *testing.T) { if stream.CachedInfo().Config.Discard != jetstream.DiscardNew { t.Fatalf("Expected stream to have discard policy set to new") } + if !stream.CachedInfo().Config.AllowDirect { + t.Fatalf("Expected stream to have AllowDirect set to true") + } // attempting to create a new kv with the same name and different settings should fail _, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{