Skip to content

Commit

Permalink
add kv status interface
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <rip@devco.net>
  • Loading branch information
ripienaar committed Oct 11, 2021
1 parent 41cb5bc commit e1df7e5
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 2 deletions.
57 changes: 56 additions & 1 deletion kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,35 @@ type KeyValue interface {
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}

type KeyValueStatus interface {
// Bucket the name of the bucket
Bucket() string

// Values is how many messages are in the bucket, including historical values
Values() uint64

// History returns the configured history kept per key
History() int64

// TTL is how long the bucket keeps values for
TTL() time.Duration

// Replicas is how many storage replicas are kept
Replicas() int

// StreamName is the name of the stream used to store the data
StreamName() string
}

// KeyWatcher is what is returned when doing a watch.
type KeyWatcher interface {
// Updates returns a channel to read any updates to entries.
Updates() <-chan KeyValueEntry
// Stop() will stop this watcher.
// Stop will stop this watcher.
Stop() error
}

Expand Down Expand Up @@ -642,3 +664,36 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
func (kv *kvs) Bucket() string {
return kv.name
}

type kvStatus struct {
nfo *StreamInfo
bucket string
}

// Bucket the name of the bucket
func (s *kvStatus) Bucket() string { return s.bucket }

// Values is how many messages are in the bucket, including historical values
func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs }

// History returns the configured history kept per key
func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }

// TTL is how long the bucket keeps values for
func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }

// Replicas is how many storage replicas are kept
func (s *kvStatus) Replicas() int { return s.nfo.Config.Replicas }

// StreamName is the name of the stream used to store the data
func (s *kvStatus) StreamName() string { return s.nfo.Config.Name }

// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
if err != nil {
return nil, err
}

return &kvStatus{nfo: nfo, bucket: kv.name}, nil
}
24 changes: 23 additions & 1 deletion test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestKeyValueBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour})
expectOk(t, err)

if kv.Bucket() != "TEST" {
Expand Down Expand Up @@ -78,6 +78,28 @@ func TestKeyValueBasics(t *testing.T) {
expectOk(t, err)
_, err = kv.Update("age", []byte("33"), r)
expectOk(t, err)

// Status
status, err := kv.Status()
expectOk(t, err)
if status.Replicas() != 1 {
t.Fatalf("expected 1 replica got %d", status.Replicas())
}
if status.History() != 5 {
t.Fatalf("expected history of 5 got %d", status.History())
}
if status.Bucket() != "TEST" {
t.Fatalf("expected bucket TEST got %v", status.Bucket())
}
if status.TTL() != time.Hour {
t.Fatalf("expected 1 hour TTL got %v", status.TTL())
}
if status.Values() != 7 {
t.Fatalf("expected 7 values got %d", status.Values())
}
if status.StreamName() != "KV_TEST" {
t.Fatalf("expected KV_TEST stream got %v", status.StreamName())
}
}

func TestKeyValueHistory(t *testing.T) {
Expand Down

0 comments on commit e1df7e5

Please sign in to comment.