diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 501063c8955c..7c1ee7272d80 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -605,7 +605,13 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close) - return sendBatches(ctx, it, queryServer, req.Limit) + // sendBatches uses -1 to specify no limit. + batchLimit := int32(req.Limit) + if batchLimit == 0 { + batchLimit = -1 + } + + return sendBatches(ctx, it, queryServer, batchLimit) } // QuerySample the ingesters for series from logs matching a set of matchers. diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 1c43d9f40d7d..686ab27665ec 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -695,38 +695,23 @@ type QuerierQueryServer interface { Send(res *logproto.QueryResponse) error } -func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit uint32) error { +func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error { stats := stats.FromContext(ctx) - if limit == 0 { - // send all batches. - for !isDone(ctx) { - batch, size, err := iter.ReadBatch(i, queryBatchSize) - if err != nil { - return err - } - if len(batch.Streams) == 0 { - return nil - } - stats.AddIngesterBatch(int64(size)) - batch.Stats = stats.Ingester() - - if err := queryServer.Send(batch); err != nil { - return err - } - stats.Reset() - - } - return nil - } // send until the limit is reached. - sent := uint32(0) - for sent < limit && !isDone(queryServer.Context()) { - batch, batchSize, err := iter.ReadBatch(i, math.MinUint32(queryBatchSize, limit-sent)) + for limit != 0 && !isDone(ctx) { + fetchSize := uint32(queryBatchSize) + if limit > 0 { + fetchSize = math.MinUint32(queryBatchSize, uint32(limit)) + } + batch, batchSize, err := iter.ReadBatch(i, fetchSize) if err != nil { return err } - sent += batchSize + + if limit > 0 { + limit -= int32(batchSize) + } if len(batch.Streams) == 0 { return nil diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 97b79bea25db..82323d1b39fc 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -491,7 +491,7 @@ func Test_Iterator(t *testing.T) { return nil }, ), - uint32(2)), + int32(2)), ) require.Equal(t, 2, len(res.Streams)) // each entry translated into a unique stream