From 95c1946231f2f1ef836a0592ea85e12ed56e1c81 Mon Sep 17 00:00:00 2001 From: jnmoyne Date: Thu, 8 Sep 2022 05:40:20 -0700 Subject: [PATCH] Implements pagination for JS Stream Info requests --- server/jetstream_api.go | 44 ++++++++++++++++++++++++++++++++--------- server/norace_test.go | 32 ++++++++++++------------------ 2 files changed, 48 insertions(+), 28 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 377e224cd5..22954a7b3a 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -396,16 +396,18 @@ type JSApiStreamDeleteResponse struct { const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response" -// Maximum number of subject details we will send in the stream info. +// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response. const JSMaxSubjectDetails = 100_000 type JSApiStreamInfoRequest struct { + ApiPagedRequest DeletedDetails bool `json:"deleted_details,omitempty"` SubjectsFilter string `json:"subjects_filter,omitempty"` } type JSApiStreamInfoResponse struct { ApiResponse + ApiPaged *StreamInfo } @@ -1788,6 +1790,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s var details bool var subjects string + var offset int if !isEmptyRequest(msg) { var req JSApiStreamInfoRequest if err := json.Unmarshal(msg, &req); err != nil { @@ -1796,6 +1799,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s return } details, subjects = req.DeletedDetails, req.SubjectsFilter + offset = req.Offset } mset, err := acc.lookupStream(streamName) @@ -1825,17 +1829,39 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s // Check if they have asked for subject details. if subjects != _EMPTY_ { if mss := mset.store.SubjectsState(subjects); len(mss) > 0 { - if len(mss) > JSMaxSubjectDetails { - resp.StreamInfo = nil - resp.Error = NewJSStreamInfoMaxSubjectsError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return + // As go iterates over map in a non-consistent order, no choice but to buffer it a slice + + buffer := make([]string, 0, len(mss)) + for subj := range mss { + buffer = append(buffer, subj) + } + + // Sort it + sort.Strings(buffer) + + if offset > len(buffer) { + offset = len(buffer) + } + + end := offset + JSMaxSubjectDetails + if end > len(buffer) { + end = len(buffer) } - sd := make(map[string]uint64, len(mss)) - for subj, ss := range mss { - sd[subj] = ss.Msgs + + actualSize := end - offset + var sd map[string]uint64 + + if actualSize > 0 { + sd = make(map[string]uint64, actualSize) + for _, ss := range buffer[offset:end] { + sd[ss] = mss[ss].Msgs + } } + resp.StreamInfo.State.Subjects = sd + resp.Offset = offset + resp.Limit = JSMaxSubjectDetails + resp.Total = len(mss) } } diff --git a/server/norace_test.go b/server/norace_test.go index ecf36fd4f6..edc4d46b89 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -4289,37 +4289,31 @@ func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) { t.Fatalf("Did not receive completion signal") } - getInfo := func(filter string) *StreamInfo { - t.Helper() - // Need to grab StreamInfo by hand for now. - req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter}) - require_NoError(t, err) - resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second) - require_NoError(t, err) - var si StreamInfo - err = json.Unmarshal(resp.Data, &si) - require_NoError(t, err) - return &si - } - - si := getInfo("X.*") + // Need to grab StreamInfo by hand for now. + req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: "X.*"}) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second) + require_NoError(t, err) + var si StreamInfo + err = json.Unmarshal(resp.Data, &si) + require_NoError(t, err) if len(si.State.Subjects) != n { t.Fatalf("Expected to get %d subject details, got %d", n, len(si.State.Subjects)) } - // Now add one more message in which will exceed our internal limits for subject details. + // Now add one more message to check pagination _, err = js.Publish("foo", []byte("TOO MUCH")) require_NoError(t, err) - req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: nats.AllKeys}) + req, err = json.Marshal(&JSApiStreamInfoRequest{ApiPagedRequest: ApiPagedRequest{Offset: n}, SubjectsFilter: nats.AllKeys}) require_NoError(t, err) - resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second) + resp, err = nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second) require_NoError(t, err) var sir JSApiStreamInfoResponse err = json.Unmarshal(resp.Data, &sir) require_NoError(t, err) - if !IsNatsErr(sir.Error, JSStreamInfoMaxSubjectsErr) { - t.Fatalf("Did not get correct error response: %+v", sir.Error) + if len(sir.State.Subjects) != 1 { + t.Fatalf("Expected to get 1 extra subject detail, got %d", len(sir.State.Subjects)) } }