Skip to content

Commit

Permalink
Implements a new jsm function StreamContainedSubjects()` to get the…
Browse files Browse the repository at this point in the history
… list of (matching) subjects for the messages currently in the stream and the number of messages for each subject, using the pagination API for Stream Info (see nats-io/nats-server#3454)
  • Loading branch information
jnmoyne committed Sep 9, 2022
1 parent a1017ee commit 31624d1
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
33 changes: 33 additions & 0 deletions js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,3 +1096,36 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
t.Fatalf("Wrong header: %v", r.Header)
}
}

func TestJetStreamStreamInfoSubjectDetails(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"test.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Publish on enough subjects to exercise the pagination
payload := make([]byte, 10)
for i := 0; i < 100001; i++ {
nc.Publish(fmt.Sprintf("test.%d", i), payload)
}

result, err := js.StreamContainedSubjects("TEST", "test.*")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result) != 100001 {
t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result))
}
}
82 changes: 81 additions & 1 deletion jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type JetStreamManager interface {
// StreamInfo retrieves information from a stream.
StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)

// StreamContainedSubjects queries the stream for the subjects it holds with optional filter
StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error)

// PurgeStream purges a stream messages.
PurgeStream(name string, opts ...JSOpt) error

Expand Down Expand Up @@ -682,12 +685,17 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
type (
// StreamInfoRequest contains additional option to return
StreamInfoRequest struct {
apiPagedRequest
// DeletedDetails when true includes information about deleted messages
DeletedDetails bool `json:"deleted_details,omitempty"`
// SubjectsFilter when set, returns information on the matched subjects
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
streamInfoResponse = streamCreateResponse
streamInfoResponse = struct {
apiResponse
apiPaged
*StreamInfo
}
)

func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
Expand Down Expand Up @@ -727,6 +735,78 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return resp.StreamInfo, nil
}

func (js *js) StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}

if len(filter) > 1 {
return nil, fmt.Errorf("only 1 filter supported")
}

f := ">"
if len(filter) == 1 && filter[0] != "" {
f = filter[0]
}

var i int
var subjectMessagesMap map[string]uint64 = nil

o, cancel, err := getJSContextOpts(js.opts)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}

for {
var req []byte

if req, err = json.Marshal(StreamInfoRequest{SubjectsFilter: f, apiPagedRequest: apiPagedRequest{Offset: i}}); err != nil {
return nil, err
}

r, err := js.apiRequestWithContext(o.ctx, defaultAPIPrefix+fmt.Sprintf(apiStreamInfoT, stream), req)
if err != nil {
return nil, err
}

var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

// for backwards compatibility
var total int
if resp.Total != 0 {
total = resp.Total
} else {
total = len(resp.State.Subjects)
}

if subjectMessagesMap == nil {
subjectMessagesMap = make(map[string]uint64, total)
}

for k, j := range resp.State.Subjects {
subjectMessagesMap[k] = j
i++
}
if i == total {
break
}
}

return subjectMessagesMap, nil
}

// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
Config StreamConfig `json:"config"`
Expand Down

0 comments on commit 31624d1

Please sign in to comment.