Skip to content

Commit

Permalink
[CHANGED] StreamInfo() will now return all subjects when requested
Browse files Browse the repository at this point in the history
If a subject filter is specified in the StreamInfoRequest{} option,
then all matching subjects will be returned and not be capped to
the server limit of 100,000. It is internally using pagination
that was added in the server PR: nats-io/nats-server#3454
  • Loading branch information
jnmoyne authored and kozlovic committed Sep 15, 2022
1 parent 866ce08 commit 65796fc
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 19 deletions.
44 changes: 44 additions & 0 deletions js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,3 +1157,47 @@ func TestJetStreamConsumerMemoryStorage(t *testing.T) {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}
}

func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"test.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Publish on enough subjects to exercise the pagination
payload := make([]byte, 10)
for i := 0; i < 100001; i++ {
nc.Publish(fmt.Sprintf("test.%d", i), payload)
}

// Check that passing a filter returns the subject details
result, err := js.StreamInfo("TEST", &StreamInfoRequest{SubjectsFilter: ">"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result.State.Subjects) != 100001 {
t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result.State.Subjects))
}

// Check that passing no filter does not return any subject details
result, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result.State.Subjects) != 0 {
t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects))
}
}
84 changes: 65 additions & 19 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type apiPaged struct {
// apiPagedRequest includes parameters allowing specific pages to be requested
// from APIs responding with apiPaged.
type apiPagedRequest struct {
Offset int `json:"offset"`
Offset int `json:"offset,omitempty"`
}

// AccountInfo contains info about the JetStream usage from the current account.
Expand Down Expand Up @@ -696,12 +696,17 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
type (
// StreamInfoRequest contains additional option to return
StreamInfoRequest struct {
apiPagedRequest
// DeletedDetails when true includes information about deleted messages
DeletedDetails bool `json:"deleted_details,omitempty"`
// SubjectsFilter when set, returns information on the matched subjects
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
streamInfoResponse = streamCreateResponse
streamInfoResponse = struct {
apiResponse
apiPaged
*StreamInfo
}
)

func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
Expand All @@ -715,30 +720,71 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if cancel != nil {
defer cancel()
}

var i int
var subjectMessagesMap map[string]uint64
var req []byte
var requestPayload bool

var siOpts StreamInfoRequest
if o.streamInfoOpts != nil {
if req, err = json.Marshal(o.streamInfoOpts); err != nil {
requestPayload = true
siOpts = *o.streamInfoOpts
}

for {
if requestPayload {
siOpts.Offset = i
if req, err = json.Marshal(&siOpts); err != nil {
return nil, err
}
}

siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
}
siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
return nil, resp.Error
}

return resp.StreamInfo, nil
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

var total int
// for backwards compatibility
if resp.Total != 0 {
total = resp.Total
} else {
total = len(resp.State.Subjects)
}

if requestPayload && len(resp.StreamInfo.State.Subjects) > 0 {
if subjectMessagesMap == nil {
subjectMessagesMap = make(map[string]uint64, total)
}

for k, j := range resp.State.Subjects {
subjectMessagesMap[k] = j
i++
}
}

if i >= total {
if requestPayload {
resp.StreamInfo.State.Subjects = subjectMessagesMap
}
return resp.StreamInfo, nil
}
}
}

// StreamInfo shows config and current state for this stream.
Expand Down

0 comments on commit 65796fc

Please sign in to comment.