From 725a1d2e5dd3b7ac44d550d9168447e49d49c701 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 11 Oct 2021 12:35:51 +0200 Subject: [PATCH] nats.go kv and obj store This moves the KV backend to nats.go implementation and adds a new experimental command "nats obj" Signed-off-by: R.I.Pienaar --- go.mod | 2 +- go.sum | 3 +- nats/kv_command.go | 361 +++++++++++++++++++++++------------ nats/kv_command_test.go | 86 ++++----- nats/main.go | 1 + nats/object_command.go | 408 ++++++++++++++++++++++++++++++++++++++++ nats/pub_command.go | 2 +- nats/reply_command.go | 2 +- nats/util.go | 44 ++++- 9 files changed, 729 insertions(+), 180 deletions(-) create mode 100644 nats/object_command.go diff --git a/go.mod b/go.mod index e2834ece..45cb6408 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/klauspost/compress v1.13.4 github.com/nats-io/jsm.go v0.0.27-0.20211006163108-9aae04fb57e9 github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34 - github.com/nats-io/nats.go v1.12.3 + github.com/nats-io/nats.go v1.13.1-0.20211013144704-0f47b1ceedfa github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.26.0 diff --git a/go.sum b/go.sum index 266ffd38..3fcd36d9 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,9 @@ github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34 h1:Qq2jwQrv/hyU6MmYZR5NYJ6wKVwRULJpn0CoDMtMUZg= github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= -github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.1-0.20211013144704-0f47b1ceedfa h1:wz8dAjZOLCe+lQNooGB+giVUc0VNZmvhhE8ptITjqKY= +github.com/nats-io/nats.go v1.13.1-0.20211013144704-0f47b1ceedfa/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/nats/kv_command.go b/nats/kv_command.go index 9105bde6..d1bcea10 100644 --- a/nats/kv_command.go +++ b/nats/kv_command.go @@ -1,8 +1,19 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( - "context" - "encoding/json" "fmt" "io/ioutil" "os" @@ -10,7 +21,6 @@ import ( "github.com/dustin/go-humanize" "github.com/fatih/color" - "github.com/nats-io/jsm.go/kv" "github.com/nats-io/nats.go" "gopkg.in/alecthomas/kingpin.v2" ) @@ -20,7 +30,6 @@ type kvCommand struct { key string val string raw bool - asJson bool history uint64 ttl time.Duration replicas uint @@ -28,6 +37,8 @@ type kvCommand struct { maxValueSize int32 maxBucketSize int64 cluster string + revision uint64 + description string } func configureKVCommand(app *kingpin.Application) { @@ -38,24 +49,44 @@ func configureKVCommand(app *kingpin.Application) { The JetStream Key-Value store uses streams to store key-value pairs for an indefinite period or a per-bucket configured TTL. -The Key-Value store supports read-after-write safety when not using -any caches or read replicas. +The Key-Value store supports read-after-write safety. NOTE: This is an experimental feature. ` kv := app.Command("kv", help) - get := kv.Command("get", "Gets a value for a key").Action(c.getAction) - get.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) - get.Arg("key", "The key to act on").Required().StringVar(&c.key) - get.Flag("raw", "Show only the value string").BoolVar(&c.raw) + add := kv.Command("add", "Adds a new KV Store Bucket").Alias("new").Action(c.addAction) + add.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + add.Flag("history", "How many historic values to keep per key").Default("1").Uint64Var(&c.history) + add.Flag("ttl", "How long to keep values for").DurationVar(&c.ttl) + add.Flag("replicas", "How many replicas of the data to store").Default("1").UintVar(&c.replicas) + add.Flag("cluster", "Place the bucket in a specific cluster").StringVar(&c.cluster) + add.Flag("max-value-size", "Maximum size for any single value").Int32Var(&c.maxValueSize) + add.Flag("max-bucket-size", "Maximum size for the bucket").Int64Var(&c.maxBucketSize) + add.Flag("description", "A description for the bucket").StringVar(&c.description) put := kv.Command("put", "Puts a value into a key").Action(c.putAction) put.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) put.Arg("key", "The key to act on").Required().StringVar(&c.key) put.Arg("value", "The value to store, when empty reads STDIN").StringVar(&c.val) + get := kv.Command("get", "Gets a value for a key").Action(c.getAction) + get.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + get.Arg("key", "The key to act on").Required().StringVar(&c.key) + get.Flag("raw", "Show only the value string").BoolVar(&c.raw) + + create := kv.Command("create", "Puts a value into a key only if the key is new or it's last operation was a delete").Action(c.createAction) + create.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + create.Arg("key", "The key to act on").Required().StringVar(&c.key) + create.Arg("value", "The value to store, when empty reads STDIN").StringVar(&c.val) + + update := kv.Command("update", "Updates a key with a new value if the previous value matches the given revision").Action(c.updateAction) + update.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + update.Arg("key", "The key to act on").Required().StringVar(&c.key) + update.Arg("value", "The value to store, when empty reads STDIN").StringVar(&c.val) + update.Arg("revision", "The revision of the previous value in the bucket").Uint64Var(&c.revision) + del := kv.Command("del", "Deletes a key from the bucket, preserving history").Action(c.deleteAction) del.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) del.Arg("key", "The key to act on").Required().StringVar(&c.key) @@ -69,16 +100,6 @@ NOTE: This is an experimental feature. history := kv.Command("history", "Shows the full history for a key").Action(c.historyAction) history.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) history.Arg("key", "The key to act on").Required().StringVar(&c.key) - history.Flag("json", "JSON format output").Short('j').BoolVar(&c.asJson) - - add := kv.Command("add", "Adds a new KV store").Alias("new").Action(c.addAction) - add.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) - add.Flag("history", "How many historic values to keep per key").Default("1").Uint64Var(&c.history) - add.Flag("ttl", "How long to keep values for").DurationVar(&c.ttl) - add.Flag("replicas", "How many replicas of the data to store").Default("1").UintVar(&c.replicas) - add.Flag("cluster", "Place the bucket in a specific cluster").StringVar(&c.cluster) - add.Flag("max-value-size", "Maximum size for any single value").Int32Var(&c.maxValueSize) - add.Flag("max-bucket-size", "Maximum size for the bucket").Int64Var(&c.maxBucketSize) status := kv.Command("status", "View the status of a KV store").Alias("view").Alias("info").Action(c.statusAction) status.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) @@ -87,13 +108,17 @@ NOTE: This is an experimental feature. watch.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) watch.Arg("key", "The key to act on").StringVar(&c.key) - dump := kv.Command("dump", "Dumps the contents of the bucket as JSON").Action(c.dumpAction) - dump.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) - rm := kv.Command("rm", "Removes a bucket").Action(c.rmAction) rm.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) rm.Flag("force", "Act without confirmation").Short('f').BoolVar(&c.force) + rmHistory := kv.Command("compact", "Removes all historic values from the store where the last value is a delete").Action(c.compactAction) + rmHistory.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + rmHistory.Flag("force", "Act without confirmation").Short('f').BoolVar(&c.force) + + upgrade := kv.Command("upgrade", "Upgrades a early tech-preview bucket to current format").Action(c.upgradeAction) + upgrade.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + cheats["kv"] = `# to create a replicated KV bucket nats kv add CONFIG --replicas 3 @@ -103,39 +128,76 @@ nats kv put CONFIG username bob # to read just the value with no additional details nats kv get CONFIG username --raw -# to see all values in the bucket -nats kv dump CONFIG - # to see the bucket status nats kv status CONFIG ` } -func (c *kvCommand) historyAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() +func (c *kvCommand) strForOp(op nats.KeyValueOp) string { + switch op { + case nats.KeyValuePut: + return "PUT" + case nats.KeyValuePurge: + return "PURGE" + case nats.KeyValueDelete: + return "DELETE" + default: + return "UNKNOWN" + } +} + +func (c *kvCommand) upgradeAction(_ *kingpin.ParseContext) error { + _, js, store, err := c.loadBucket() if err != nil { return err } - history, err := store.History(context.Background(), c.key) + status, err := store.Status() if err != nil { return err } - if c.asJson { - printJSON(history) - return nil + nfo := status.(*nats.KeyValueBucketStatus).StreamInfo() + if nfo.Config.AllowRollup { + fmt.Println("Bucket is already using the correct configuration") + } + + nfo.Config.AllowRollup = true + nfo, err = js.UpdateStream(&nfo.Config) + if err != nil { + return err + } + + if !nfo.Config.AllowRollup { + fmt.Printf("Configuration upgrade failed, please edit stream %s to allow RollUps", nfo.Config.Name) + os.Exit(1) + } + + fmt.Printf("Bucket %s has been upgraded\n\n", status.Bucket()) + + return c.showStatus(store) +} + +func (c *kvCommand) historyAction(_ *kingpin.ParseContext) error { + _, _, store, err := c.loadBucket() + if err != nil { + return err + } + + history, err := store.History(c.key) + if err != nil { + return err } table := newTableWriter(fmt.Sprintf("History for %s > %s", c.bucket, c.key)) - table.AddHeaders("Seq", "Op", "Created", "Length", "Value") + table.AddHeaders("Key", "Revision", "Op", "Created", "Length", "Value") for _, r := range history { val := base64IfNotPrintable(r.Value()) if len(val) > 40 { val = fmt.Sprintf("%s...%s", val[0:15], val[len(val)-15:]) } - table.AddRow(r.Sequence(), r.Operation(), r.Created().Format(time.RFC822), humanize.Comma(int64(len(r.Value()))), val) + table.AddRow(r.Key(), r.Revision(), c.strForOp(r.Operation()), r.Created().Format(time.RFC822), humanize.Comma(int64(len(r.Value()))), val) } fmt.Println(table.Render()) @@ -143,8 +205,29 @@ func (c *kvCommand) historyAction(_ *kingpin.ParseContext) error { return nil } +func (c *kvCommand) compactAction(_ *kingpin.ParseContext) error { + _, _, store, err := c.loadBucket() + if err != nil { + return err + } + + if !c.force { + ok, err := askConfirmation(fmt.Sprintf("Purge all historic values and audit trails for deleted keys in bucket %s?", c.bucket), false) + if err != nil { + return err + } + + if !ok { + fmt.Println("Skipping delete") + return nil + } + } + + return store.PurgeDeletes() +} + func (c *kvCommand) deleteAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() + _, _, store, err := c.loadBucket() if err != nil { return err } @@ -170,14 +253,21 @@ func (c *kvCommand) addAction(_ *kingpin.ParseContext) error { return err } - store, err := kv.NewBucket(nc, c.bucket, - kv.WithTTL(c.ttl), - kv.WithHistory(c.history), - kv.WithReplicas(c.replicas), - kv.WithPlacementCluster(c.cluster), - kv.WithMaxBucketSize(c.maxBucketSize), - kv.WithMaxValueSize(c.maxValueSize), - ) + js, err := nc.JetStream() + if err != nil { + return err + } + + store, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: c.bucket, + Description: c.description, + MaxValueSize: c.maxValueSize, + History: uint8(c.history), + TTL: c.ttl, + MaxBytes: c.maxBucketSize, + Storage: nats.FileStorage, // TODO + Replicas: int(c.replicas), + }) if err != nil { return err } @@ -186,7 +276,7 @@ func (c *kvCommand) addAction(_ *kingpin.ParseContext) error { } func (c *kvCommand) getAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() + _, _, store, err := c.loadBucket() if err != nil { return err } @@ -218,21 +308,17 @@ func (c *kvCommand) getAction(_ *kingpin.ParseContext) error { } func (c *kvCommand) putAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() + _, _, store, err := c.loadBucket() if err != nil { return err } - if c.val == "" { - var val []byte - val, err = ioutil.ReadAll(os.Stdin) - if err != nil { - return err - } - _, err = store.Put(c.key, val) - } else { - _, err = store.Put(c.key, []byte(c.val)) + val, err := c.valOrReadVal() + if err != nil { + return err } + + _, err = store.Put(c.key, val) if err != nil { return err } @@ -242,95 +328,111 @@ func (c *kvCommand) putAction(_ *kingpin.ParseContext) error { return err } -func (c *kvCommand) loadBucket() (*nats.Conn, kv.KV, error) { - nc, _, err := prepareHelper("", natsOpts()...) +func (c *kvCommand) createAction(_ *kingpin.ParseContext) error { + _, _, store, err := c.loadBucket() if err != nil { - return nil, nil, err + return err } - store, err := kv.NewClient(nc, c.bucket) + val, err := c.valOrReadVal() if err != nil { - return nil, nil, err + return err } - return nc, store, err -} - -func (c *kvCommand) statusAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() + _, err = store.Create(c.key, val) if err != nil { return err } - return c.showStatus(store) + fmt.Println(c.val) + + return err } -func (c *kvCommand) watchAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() +func (c *kvCommand) updateAction(_ *kingpin.ParseContext) error { + _, _, store, err := c.loadBucket() if err != nil { return err } - watch, err := store.Watch(context.Background(), c.key) + val, err := c.valOrReadVal() if err != nil { return err } - defer watch.Close() - for res := range watch.Channel() { - if res != nil { - if res.Operation() == kv.DeleteOperation { - fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString("DEL"), res.Bucket(), res.Key()) - } else { - fmt.Printf("[%s] %s %s > %s: %s\n", res.Created().Format("2006-01-02 15:04:05"), color.GreenString("PUT"), res.Bucket(), res.Key(), res.Value()) - } - } + _, err = store.Update(c.key, val, c.revision) + if err != nil { + return err } - return nil + fmt.Println(c.val) + + return err } -func (c *kvCommand) dumpAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() +func (c *kvCommand) valOrReadVal() ([]byte, error) { + if c.val != "" { + return []byte(c.val), nil + } + + return ioutil.ReadAll(os.Stdin) +} + +func (c *kvCommand) loadBucket() (*nats.Conn, nats.JetStreamContext, nats.KeyValue, error) { + nc, js, err := prepareJSHelper("", natsOpts()...) if err != nil { - return err + return nil, nil, nil, err } - vals := make(map[string]kv.Entry) - watch, err := store.Watch(context.Background(), "") + store, err := js.KeyValue(c.bucket) if err != nil { - return err + return nil, nil, nil, err } - for val := range watch.Channel() { - if val == nil { - break - } + return nc, js, store, err +} - switch val.Operation() { - case kv.PutOperation: - vals[val.Key()] = val - case kv.DeleteOperation: - delete(vals, val.Key()) - } +func (c *kvCommand) statusAction(_ *kingpin.ParseContext) error { + _, _, store, err := c.loadBucket() + if err != nil { + return err + } - if val.Delta() == 0 { - break - } + return c.showStatus(store) +} + +func (c *kvCommand) watchAction(_ *kingpin.ParseContext) error { + _, _, store, err := c.loadBucket() + if err != nil { + return err } - j, err := json.MarshalIndent(vals, "", " ") + watch, err := store.Watch(c.key) if err != nil { return err } + defer watch.Stop() - fmt.Println(string(j)) + for res := range watch.Updates() { + if res == nil { + continue + } + + switch res.Operation() { + case nats.KeyValueDelete: + fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key()) + case nats.KeyValuePurge: + fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key()) + case nats.KeyValuePut: + fmt.Printf("[%s] %s %s > %s: %s\n", res.Created().Format("2006-01-02 15:04:05"), color.GreenString(c.strForOp(res.Operation())), res.Bucket(), res.Key(), res.Value()) + } + } return nil } func (c *kvCommand) purgeAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() + _, _, store, err := c.loadBucket() if err != nil { return err } @@ -363,44 +465,55 @@ func (c *kvCommand) rmAction(_ *kingpin.ParseContext) error { } } - _, store, err := c.loadBucket() + _, js, err := prepareJSHelper("", natsOpts()...) if err != nil { return err } - return store.Destroy() + return js.DeleteKeyValue(c.bucket) } -func (c *kvCommand) showStatus(store kv.KV) error { +func (c *kvCommand) showStatus(store nats.KeyValue) error { status, err := store.Status() if err != nil { return err } - if c.asJson { - printJSON(status) - return nil - } - - fmt.Printf("%s Key-Value Store Status\n", c.bucket) + fmt.Printf("%s Key-Value Store Status\n", status.Bucket()) fmt.Println() - fmt.Printf(" Bucket Name: %s\n", c.bucket) + fmt.Printf(" Bucket Name: %s\n", status.Bucket()) fmt.Printf(" History Kept: %d\n", status.History()) - if status.MaxBucketSize() == -1 { - fmt.Printf(" Maximum Bucket Size: unlimited\n") - } else { - fmt.Printf(" Maximum Bucket Size: %d\n", status.MaxBucketSize()) - } - if status.MaxValueSize() == -1 { - fmt.Printf(" Maximum Value Size: unlimited\n") - } else { - fmt.Printf(" Maximum Value Size: %d\n", status.MaxValueSize()) - } - if status.BucketLocation() != "" { - fmt.Printf(" Bucket Location: %s\n", status.BucketLocation()) - } fmt.Printf(" Values Stored: %d\n", status.Values()) - fmt.Printf(" Backing Store Name: %s\n", status.BackingStore()) + fmt.Printf(" Backing Store Kind: %s\n", status.BackingStore()) + + if status.BackingStore() == "JetStream" { + nfo := status.(*nats.KeyValueBucketStatus).StreamInfo() + if nfo.Config.Description != "" { + fmt.Printf(" Description: %s\n", nfo.Config.Description) + } + if nfo.Config.MaxBytes == -1 { + fmt.Printf(" Maximum Bucket Size: unlimited\n") + } else { + fmt.Printf(" Maximum Bucket Size: %d\n", nfo.Config.MaxBytes) + } + if nfo.Config.MaxBytes == -1 { + fmt.Printf(" Maximum Value Size: unlimited\n") + } else { + fmt.Printf(" Maximum Value Size: %d\n", nfo.Config.MaxMsgSize) + } + fmt.Printf(" JetStream Stream: %s\n", nfo.Config.Name) + if nfo.Cluster != nil { + fmt.Printf(" Cluster Location: %s\n", nfo.Cluster.Name) + } + + if !nfo.Config.AllowRollup { + fmt.Println() + fmt.Println("Warning the bucket does not support roll-ups") + fmt.Println("and needs a configuration upgrade.") + fmt.Println() + fmt.Printf("Please run: nats kv upgrade %s\n\n", status.Bucket()) + } + } return nil } diff --git a/nats/kv_command_test.go b/nats/kv_command_test.go index 0e830abc..65cd806e 100644 --- a/nats/kv_command_test.go +++ b/nats/kv_command_test.go @@ -2,13 +2,11 @@ package main import ( "bytes" - "encoding/json" "fmt" "strings" "testing" "time" - "github.com/nats-io/jsm.go/kv" "github.com/nats-io/nats.go" ) @@ -16,10 +14,19 @@ func init() { skipContexts = true } -func createTestBucket(t *testing.T, nc *nats.Conn, opts ...kv.Option) kv.KV { +func createTestBucket(t *testing.T, nc *nats.Conn, cfg *nats.KeyValueConfig) nats.KeyValue { t.Helper() - store, err := kv.NewBucket(nc, "T", opts...) + if cfg == nil { + cfg = &nats.KeyValueConfig{Bucket: "T"} + } + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("js failed: %s", err) + } + + store, err := js.CreateKeyValue(cfg) if err != nil { t.Fatalf("new failed: %s", err) } @@ -27,7 +34,7 @@ func createTestBucket(t *testing.T, nc *nats.Conn, opts ...kv.Option) kv.KV { return store } -func mustPut(t *testing.T, store kv.KV, key string, value string) uint64 { +func mustPut(t *testing.T, store nats.KeyValue, key string, value string) uint64 { t.Helper() seq, err := store.Put(key, []byte(value)) @@ -42,7 +49,7 @@ func TestCLIKVGet(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) mustPut(t, store, "X.Y", "Y") out := runNatsCli(t, fmt.Sprintf("--server='%s' kv get T X.Y --raw", srv.ClientURL())) @@ -55,7 +62,7 @@ func TestCLIKVPut(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) out := runNatsCli(t, fmt.Sprintf("--server='%s' kv put T X VAL", srv.ClientURL())) if strings.TrimSpace(string(out)) != "VAL" { @@ -75,19 +82,19 @@ func TestCLIKVDel(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) mustPut(t, store, "X", "VAL") runNatsCli(t, fmt.Sprintf("--server='%s' kv del T X -f", srv.ClientURL())) _, err := store.Get("X") - if err != kv.ErrUnknownKey { + if err != nats.ErrKeyNotFound { t.Fatalf("get did not fail: %v", err) } } func TestCLIAdd(t *testing.T) { - srv, nc, mgr := setupJStreamTest(t) + srv, _, mgr := setupJStreamTest(t) defer srv.Shutdown() runNatsCli(t, fmt.Sprintf("--server='%s' kv add T --history 5 --ttl 2m", srv.ClientURL())) @@ -99,48 +106,25 @@ func TestCLIAdd(t *testing.T) { t.Fatalf("stream was not created") } - store, err := kv.NewClient(nc, "T") - if err != nil { - t.Fatalf("client failed: %s", err) - } - - status, err := store.Status() - if err != nil { - t.Fatalf("status failed: %s", err) - } - - if status.History() != 5 { - t.Fatalf("history is %d", status.History()) - } - - if status.TTL() != 2*time.Minute { - t.Fatalf("ttl is %v", status.TTL()) - } -} - -func TestCLIDump(t *testing.T) { - srv, nc, _ := setupJStreamTest(t) - defer srv.Shutdown() + stream, _ := mgr.LoadStream("KV_T") - store := createTestBucket(t, nc) - mustPut(t, store, "X", "VALX") - mustPut(t, store, "Y", "VALY") + // TODO: needs status api + // js, err := nc.JetStream() + // if err != nil { + // t.Fatalf("js failed: %s", err) + // } + // + // status, err := store.Status() + // if err != nil { + // t.Fatalf("status failed: %s", err) + // } - out := runNatsCli(t, fmt.Sprintf("--server='%s' kv dump T", srv.ClientURL())) - var dumped map[string]kv.GenericEntry - err := json.Unmarshal(out, &dumped) - if err != nil { - t.Fatalf("json unmarshal failed: %s", err) + if stream.MaxMsgsPerSubject() != 5 { + t.Fatalf("history is %d", stream.MaxMsgsPerSubject()) } - if len(dumped) != 2 { - t.Fatalf("expected 2 entries got %d", len(dumped)) - } - if dumped["X"].Key != "X" && !bytes.Equal(dumped["X"].Val, []byte("VALX")) { - t.Fatalf("did not get right res: %+v", dumped["X"]) - } - if dumped["Y"].Key != "Y" && !bytes.Equal(dumped["Y"].Val, []byte("VALY")) { - t.Fatalf("did not get right res: %+v", dumped["Y"]) + if stream.MaxAge() != 2*time.Minute { + t.Fatalf("ttl is %v", stream.MaxAge()) } } @@ -148,14 +132,14 @@ func TestCLIPurge(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) mustPut(t, store, "X", "VALX") mustPut(t, store, "Y", "VALY") runNatsCli(t, fmt.Sprintf("--server='%s' kv purge T X -f", srv.ClientURL())) _, err := store.Get("X") - if err != kv.ErrUnknownKey { + if err != nats.ErrKeyNotFound { t.Fatalf("expected unknown key got: %v", err) } v, err := store.Get("Y") @@ -171,7 +155,7 @@ func TestCLIRM(t *testing.T) { srv, nc, mgr := setupJStreamTest(t) defer srv.Shutdown() - createTestBucket(t, nc) + createTestBucket(t, nc, nil) runNatsCli(t, fmt.Sprintf("--server='%s' kv rm T -f", srv.ClientURL())) diff --git a/nats/main.go b/nats/main.go index 761e674c..ebdf305e 100644 --- a/nats/main.go +++ b/nats/main.go @@ -102,6 +102,7 @@ See 'nats cheat' for a quick cheatsheet of commands configureEventsCommand(ncli) configureGovernorCommand(ncli) configureKVCommand(ncli) + configureObjectCommand(ncli) configureLatencyCommand(ncli) configurePubCommand(ncli) configureRTTCommand(ncli) diff --git a/nats/object_command.go b/nats/object_command.go new file mode 100644 index 00000000..23ef1145 --- /dev/null +++ b/nats/object_command.go @@ -0,0 +1,408 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/base64" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/dustin/go-humanize" + "github.com/fatih/color" + "github.com/nats-io/nats.go" + "gopkg.in/alecthomas/kingpin.v2" +) + +type objCommand struct { + bucket string + file string + overrideName string + hdrs []string + force bool + + description string + replicas uint + ttl time.Duration +} + +func configureObjectCommand(app *kingpin.Application) { + c := &objCommand{} + + help := `Interacts with a JetStream based Object store + +The JetStream Object store uses streams to store large objects +for an indefinite period or a per-bucket configured TTL. + +NOTE: This is an experimental feature. +` + + obj := app.Command("object", help).Alias("obj") + + add := obj.Command("add", "Adds a new Object Store Bucket").Action(c.addAction) + add.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + add.Flag("ttl", "How long to keep objects for").DurationVar(&c.ttl) + add.Flag("replicas", "How many replicas of the data to store").Default("1").UintVar(&c.replicas) + add.Flag("description", "A description for the bucket").StringVar(&c.description) + + put := obj.Command("put", "Puts a file into the store").Action(c.putAction) + put.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + put.Arg("file", "The file to put").Required().ExistingFileVar(&c.file) + put.Flag("name", "Override the name supplied to the object store").StringVar(&c.overrideName) + put.Flag("description", "Sets an optional description for the object").StringVar(&c.description) + put.Flag("header", "Adds headers to the object").Short('H').StringsVar(&c.hdrs) + + rm := obj.Command("rm", "Removes a file from the store").Action(c.rmAction) + rm.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + rm.Arg("file", "The file to retrieve").Required().StringVar(&c.file) + rm.Flag("force", "Act without confirmation").Short('f').BoolVar(&c.force) + + get := obj.Command("get", "Retrieves a file from the store").Action(c.getAction) + get.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + get.Arg("file", "The file to retrieve").Required().StringVar(&c.file) + get.Flag("output", "Override the output file name").Short('O').StringVar(&c.overrideName) + + info := obj.Command("info", "Get information about a bucket or object").Action(c.infoAction) + info.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + info.Arg("file", "The file to retrieve").StringVar(&c.file) + + ls := obj.Command("ls", "List contents of the bucket").Action(c.lsAction) + ls.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + + seal := obj.Command("seal", "Seals a bucket preventing further updates").Action(c.sealAction) + seal.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + seal.Flag("force", "Force sealing without prompting").Short('f').BoolVar(&c.force) + + watch := obj.Command("watch", "Watch a bucket for changes").Action(c.watchAction) + watch.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) +} + +func (c *objCommand) watchAction(_ *kingpin.ParseContext) error { + _, _, obj, err := c.loadBucket() + if err != nil { + return err + } + + w, err := obj.Watch(nats.IncludeHistory()) + if err != nil { + return err + } + defer w.Stop() + + for i := range w.Updates() { + if i == nil { + continue + } + + if i.Deleted { + fmt.Printf("[%s] %s %s > %s\n", i.ModTime.Format("2006-01-02 15:04:05"), color.RedString("DEL"), i.Bucket, i.Name) + } else { + fmt.Printf("[%s] %s %s > %s: %s bytes in %s chunks\n", i.ModTime.Format("2006-01-02 15:04:05"), color.GreenString("PUT"), i.Bucket, i.Name, humanize.IBytes(i.Size), humanize.Comma(int64(i.Chunks))) + } + } + + return nil +} + +func (c *objCommand) sealAction(_ *kingpin.ParseContext) error { + if !c.force { + ok, err := askConfirmation(fmt.Sprintf("Really seal Bucket %s, sealed buckets can not be unsealed or modified", c.bucket), false) + kingpin.FatalIfError(err, "could not obtain confirmation") + + if !ok { + return nil + } + } + + _, _, obj, err := c.loadBucket() + if err != nil { + return err + } + + err = obj.Seal() + if err != nil { + return err + } + + fmt.Printf("%s has been sealed\n", c.bucket) + + return c.showBucketInfo(obj) +} + +func (c *objCommand) rmAction(_ *kingpin.ParseContext) error { + _, _, obj, err := c.loadBucket() + if err != nil { + return err + } + + if !c.force { + nfo, err := obj.GetInfo(c.file) + if err != nil { + return err + } + + ok, err := askConfirmation(fmt.Sprintf("Delete %s byte file %s > %s?", humanize.IBytes(nfo.Size), c.bucket, c.file), false) + if err != nil { + return err + } + + if !ok { + fmt.Println("Skipping delete") + return nil + } + + } + + err = obj.Delete(c.file) + if err != nil { + return err + } + + fmt.Printf("Removed %s > %s\n", c.bucket, c.file) + + return c.showBucketInfo(obj) +} + +func (c *objCommand) infoAction(_ *kingpin.ParseContext) error { + if c.file == "" { + return fmt.Errorf("only object info is supported") + } + + _, _, obj, err := c.loadBucket() + if err != nil { + return err + } + + nfo, err := obj.GetInfo(c.file) + if err != nil { + return err + } + + c.showObjectInfo(nfo) + + return nil +} + +func (c *objCommand) showBucketInfo(store nats.ObjectStore) error { + status, err := store.Status() + if err != nil { + return err + } + + fmt.Printf("%s Object Store Status\n", status.Bucket()) + fmt.Println() + fmt.Printf(" Bucket Name: %s\n", status.Bucket()) + fmt.Printf(" Description: %s\n", status.Description()) + fmt.Printf(" Replicas: %d\n", status.Replicas()) + fmt.Printf(" TTL: %s\n", humanizeDuration(status.TTL())) + fmt.Printf(" Sealed: %v\n", status.Sealed()) + fmt.Printf(" Size: %s\n", humanize.IBytes(status.Size())) + fmt.Printf(" Backing Store Kind: %s\n", status.BackingStore()) + if status.BackingStore() == "JetStream" { + nfo := status.(*nats.ObjectBucketStatus).StreamInfo() + fmt.Printf(" JetStream Stream: %s\n", nfo.Config.Name) + if nfo.Cluster != nil { + fmt.Printf(" Cluster Location: %s\n", nfo.Cluster.Name) + } + } + + return nil +} + +func (c *objCommand) showObjectInfo(nfo *nats.ObjectInfo) { + digest := strings.Split(nfo.Digest, "=") + digestBytes, _ := base64.StdEncoding.DecodeString(digest[1]) + + fmt.Printf("Object information for %s > %s\n\n", nfo.Bucket, nfo.Name) + if nfo.Description != "" { + fmt.Printf(" Description: %s\n", nfo.Description) + } + fmt.Printf(" Size: %s\n", humanize.IBytes(nfo.Size)) + fmt.Printf(" Modification Time: %s\n", nfo.ModTime.Format(time.RFC822Z)) + fmt.Printf(" Chunks: %s\n", humanize.Comma(int64(nfo.Chunks))) + fmt.Printf(" Digest: %s %x\n", digest[0], digestBytes) + fmt.Printf(" ID: %s\n", nfo.NUID) + if nfo.Deleted { + fmt.Printf(" Deleted: %v\n", nfo.Deleted) + } + if len(nfo.Headers) > 0 { + fmt.Printf(" Headers:\n") + for k, v := range nfo.Headers { + fmt.Printf(" %s = %s\n", k, v) + } + } +} + +func (c *objCommand) lsAction(_ *kingpin.ParseContext) error { + _, _, obj, err := c.loadBucket() + if err != nil { + return err + } + + contents, err := obj.List() + if err != nil { + return err + } + + if len(contents) == 0 { + fmt.Println("No entries found") + return nil + } + + table := newTableWriter("Bucket Contents") + table.AddHeaders("Name", "Size", "Time", "ID") + + for _, i := range contents { + table.AddRow(i.Name, humanize.IBytes(i.Size), i.ModTime.Format(time.RFC3339), i.NUID) + } + + fmt.Println(table.Render()) + + return nil +} + +func (c *objCommand) putAction(_ *kingpin.ParseContext) error { + _, _, obj, err := c.loadBucket() + if err != nil { + return err + } + + name := c.file + if c.overrideName != "" { + name = c.overrideName + } + + hdr, err := parseStringsToHeader(c.hdrs, 0) + if err != nil { + return err + } + + meta := &nats.ObjectMeta{ + Name: filepath.Clean(name), + Description: c.description, + Headers: hdr, + } + + r, err := os.Open(c.file) + if err != nil { + return err + } + + nfo, err := obj.Put(meta, r) + if err != nil { + return err + } + + c.showObjectInfo(nfo) + + return nil +} + +func (c *objCommand) getAction(_ *kingpin.ParseContext) error { + _, _, obj, err := c.loadBucket() + if err != nil { + return err + } + + res, err := obj.Get(c.file) + if err != nil { + return err + } + + nfo, err := res.Info() + if err != nil { + return err + } + + if nfo.Deleted { + return fmt.Errorf("file has been deleted") + } + + out := filepath.Base(nfo.Name) + if c.overrideName != "" { + out = c.overrideName + } + + out, err = filepath.Abs(out) + if err != nil { + return err + } + + of, err := os.Create(out) + if err != nil { + return err + } + defer of.Close() + + // TODO: progress + wc, err := io.Copy(of, res) + if err != nil { + of.Close() + os.Remove(of.Name()) + return err + } + + if wc > 0 && uint64(wc) != nfo.Size { + return fmt.Errorf("wrote %s, expected %s", humanize.IBytes(uint64(wc)), humanize.IBytes(nfo.Size)) + } + + of.Close() + err = os.Chtimes(of.Name(), time.Now(), nfo.ModTime) + if err != nil { + log.Printf("Could not set modification time: %s", err) + } + + // TODO: render, verify + fmt.Printf("wrote: %s to %s\n", humanize.IBytes(uint64(wc)), of.Name()) + + return nil +} + +func (c *objCommand) addAction(_ *kingpin.ParseContext) error { + _, js, err := prepareJSHelper("", natsOpts()...) + if err != nil { + return err + } + + obj, err := js.CreateObjectStore(&nats.ObjectStoreConfig{ + Bucket: c.bucket, + Description: c.description, + TTL: c.ttl, + Storage: nats.FileStorage, + Replicas: int(c.replicas), + }) + if err != nil { + return err + } + + return c.showBucketInfo(obj) +} + +func (c *objCommand) loadBucket() (*nats.Conn, nats.JetStreamContext, nats.ObjectStore, error) { + nc, js, err := prepareJSHelper("", natsOpts()...) + if err != nil { + return nil, nil, nil, err + } + + store, err := js.ObjectStore(c.bucket) + if err != nil { + return nil, nil, nil, err + } + + return nc, js, store, err +} diff --git a/nats/pub_command.go b/nats/pub_command.go index c4290a27..38dd4c97 100644 --- a/nats/pub_command.go +++ b/nats/pub_command.go @@ -96,7 +96,7 @@ func (c *pubCmd) prepareMsg(body []byte, seq int) (*nats.Msg, error) { msg.Reply = c.replyTo msg.Data = body - return msg, parseStringsToHeader(c.hdrs, seq, msg) + return msg, parseStringsToMsgHeader(c.hdrs, seq, msg) } func (c *pubCmd) doReq(nc *nats.Conn, progress *uiprogress.Bar) error { diff --git a/nats/reply_command.go b/nats/reply_command.go index 57b110e4..623d93cb 100644 --- a/nats/reply_command.go +++ b/nats/reply_command.go @@ -121,7 +121,7 @@ func (c *replyCmd) reply(_ *kingpin.ParseContext) error { msg := nats.NewMsg(m.Reply) if nc.HeadersSupported() && len(c.hdrs) > 0 { - parseStringsToHeader(c.hdrs, i, msg) + parseStringsToMsgHeader(c.hdrs, i, msg) } switch { diff --git a/nats/util.go b/nats/util.go index 143efb46..52d69cb6 100644 --- a/nats/util.go +++ b/nats/util.go @@ -404,6 +404,28 @@ func newNatsConn(servers string, opts ...nats.Option) (*nats.Conn, error) { return nats.Connect(servers, opts...) } +func prepareJSHelper(servers string, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext, error) { + nc, _, err := prepareHelper("", natsOpts()...) + if err != nil { + return nil, nil, err + } + + var jso []nats.JSOpt + if jsDomain != "" { + jso = append(jso, nats.Domain(jsDomain)) + } + if jsApiPrefix != "" { + jso = append(jso, nats.APIPrefix(jsApiPrefix)) + } + + js, err := nc.JetStream(jso...) + if err != nil { + return nil, nil, err + } + + return nc, js, nil +} + func prepareHelper(servers string, opts ...nats.Option) (*nats.Conn, *jsm.Manager, error) { if config == nil { if ctxError != nil { @@ -633,7 +655,27 @@ func randomString(shortest uint, longest uint) string { return string(b) } -func parseStringsToHeader(hdrs []string, seq int, msg *nats.Msg) error { +func parseStringsToHeader(hdrs []string, seq int) (nats.Header, error) { + res := nats.Header{} + + for _, hdr := range hdrs { + parts := strings.SplitN(hdr, ":", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid header %q", hdr) + } + + val, err := pubReplyBodyTemplate(strings.TrimSpace(parts[1]), seq) + if err != nil { + return nil, fmt.Errorf("failed to parse Header template for %s: %s", parts[0], err) + } + + res.Add(strings.TrimSpace(parts[0]), string(val)) + } + + return res, nil +} + +func parseStringsToMsgHeader(hdrs []string, seq int, msg *nats.Msg) error { for _, hdr := range hdrs { parts := strings.SplitN(hdr, ":", 2) if len(parts) != 2 {