Skip to content

Commit

Permalink
Ack() for sample iterator as well
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed May 21, 2022
1 parent 519ba84 commit 220c443
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 222 deletions.
56 changes: 33 additions & 23 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,27 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) *instance { //nolint:r
return inst
}

func (i *Ingester) Ack(ctx context.Context, req *logproto.AckRequest) (*logproto.AckResponse, error) {
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return &logproto.AckResponse{}, err
}

instance := i.GetOrCreateInstance(instanceID)
if err != nil {
return &logproto.AckResponse{}, err
}

instance.queryMtx.Lock()
queryIngester := instance.queries[req.Id]
instance.queryMtx.Unlock()
if queryIngester != nil {
queryIngester.ReleaseAck()
}

return &logproto.AckResponse{}, nil
}

// Query the ingests for log streams matching a set of matchers.
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries.
Expand Down Expand Up @@ -621,28 +642,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
instance.queries[queryID] = queryIngester
instance.queryMtx.Unlock()

return sendBatches(ctx, it, queryServer, queryIngester, batchLimit)
}

func (i *Ingester) Ack(ctx context.Context, req *logproto.AckRequest) (*logproto.AckResponse, error) {
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return &logproto.AckResponse{}, err
}

instance := i.GetOrCreateInstance(instanceID)
if err != nil {
return &logproto.AckResponse{}, err
}

instance.queryMtx.Lock()
queryIngester := instance.queries[req.Id]
instance.queryMtx.Unlock()
if queryIngester != nil {
queryIngester.ReleaseAck()
}

return &logproto.AckResponse{}, nil
return queryIngester.SendBatches(ctx, it, queryServer, batchLimit)
}

// QuerySample the ingesters for series from logs matching a set of matchers.
Expand Down Expand Up @@ -680,7 +680,17 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log

defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)

return sendSampleBatches(ctx, it, queryServer)
instance.queryMtx.Lock()
queryID := uint32(0)
for instance.queries[queryID] != nil {
queryID++
}
queryIngester := NewIngesterQuery(queryID, instance)

instance.queries[queryID] = queryIngester
instance.queryMtx.Unlock()

return queryIngester.SendSampleBatches(ctx, it, queryServer)
}

// asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`.
Expand Down
87 changes: 86 additions & 1 deletion pkg/ingester/ingester_query.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package ingester

import "context"
import (
"context"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util/math"
)

type IngesterQuery struct {
Id uint32
Expand Down Expand Up @@ -43,3 +50,81 @@ func (q *IngesterQuery) BackpressureWait(ctx context.Context) {
func (q *IngesterQuery) ReleaseAck() {
q.loopAck <- struct{}{}
}

// QuerierQueryServer is the GRPC server stream we use to send batch of entries.
type QuerierQueryServer interface {
Context() context.Context
Send(res *logproto.QueryResponse) error
}

func (q *IngesterQuery) SendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error {
stats := stats.FromContext(ctx)

// send until the limit is reached.
for limit != 0 && !isDone(ctx) {
q.BackpressureWait(ctx)

fetchSize := uint32(queryBatchSize)
if limit > 0 {
fetchSize = math.MinUint32(queryBatchSize, uint32(limit))
}
batch, batchSize, err := iter.ReadBatch(i, fetchSize)
if err != nil {
q.End()
return err
}

if limit > 0 {
limit -= int32(batchSize)
}

if len(batch.Streams) == 0 {
q.End()
return nil
}

stats.AddIngesterBatch(int64(batchSize))
batch.Stats = stats.Ingester()
batch.Id = q.Id

if err := queryServer.Send(batch); err != nil {
q.End()
return err
}
stats.Reset()
}

q.End()
return nil
}

func (q *IngesterQuery) SendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error {
stats := stats.FromContext(ctx)
for !isDone(ctx) {
q.BackpressureWait(ctx)

batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize)
if err != nil {
q.End()
return err
}
if len(batch.Series) == 0 {
q.End()
return nil
}

stats.AddIngesterBatch(int64(size))
batch.Stats = stats.Ingester()
batch.Id = q.Id

if err := queryServer.Send(batch); err != nil {
q.End()
return err
}

stats.Reset()

}
q.End()
return nil
}
8 changes: 4 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func Test_DedupeIngester(t *testing.T) {
End: time.Unix(0, requests+1),
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream, client))
}
it := iter.NewMergeSampleIterator(ctx, iterators)
var expectedLabels []string
Expand Down Expand Up @@ -738,7 +738,7 @@ func Test_DedupeIngester(t *testing.T) {
End: time.Unix(0, requests+1),
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream, client))
}
it := iter.NewMergeSampleIterator(ctx, iterators)
for i := int64(0); i < requests; i++ {
Expand Down Expand Up @@ -849,7 +849,7 @@ func Test_DedupeIngesterParser(t *testing.T) {
End: time.Unix(0, int64(requests+1)),
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream, client))
}
it := iter.NewMergeSampleIterator(ctx, iterators)

Expand All @@ -874,7 +874,7 @@ func Test_DedupeIngesterParser(t *testing.T) {
End: time.Unix(0, int64(requests+1)),
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream, client))
}
it := iter.NewMergeSampleIterator(ctx, iterators)

Expand Down
72 changes: 0 additions & 72 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/math"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -693,77 +692,6 @@ func isDone(ctx context.Context) bool {
}
}

// QuerierQueryServer is the GRPC server stream we use to send batch of entries.
type QuerierQueryServer interface {
Context() context.Context
Send(res *logproto.QueryResponse) error
}

func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, q *IngesterQuery, limit int32) error {
stats := stats.FromContext(ctx)

// send until the limit is reached.
for limit != 0 && !isDone(ctx) {
q.BackpressureWait(ctx)

fetchSize := uint32(queryBatchSize)
if limit > 0 {
fetchSize = math.MinUint32(queryBatchSize, uint32(limit))
}
batch, batchSize, err := iter.ReadBatch(i, fetchSize)
if err != nil {
q.End()
return err
}

if limit > 0 {
limit -= int32(batchSize)
}

if len(batch.Streams) == 0 {
q.End()
return nil
}

stats.AddIngesterBatch(int64(batchSize))
batch.Stats = stats.Ingester()
batch.Id = q.Id

if err := queryServer.Send(batch); err != nil {
q.End()
return err
}
stats.Reset()
}

q.End()
return nil
}

func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error {
stats := stats.FromContext(ctx)
for !isDone(ctx) {
batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize)
if err != nil {
return err
}
if len(batch.Series) == 0 {
return nil
}

stats.AddIngesterBatch(int64(size))
batch.Stats = stats.Ingester()

if err := queryServer.Send(batch); err != nil {
return err
}

stats.Reset()

}
return nil
}

func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool {
from, to := stream.Bounds()

Expand Down
3 changes: 1 addition & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,14 @@ func Test_Iterator(t *testing.T) {
// assert the order is preserved.
var res *logproto.QueryResponse
require.NoError(t,
sendBatches(context.TODO(), it,
q.SendBatches(context.TODO(), it,
fakeQueryServer(
func(qr *logproto.QueryResponse) error {
res = qr
q.ReleaseAck()
return nil
},
),
q,
int32(2)),
)
require.Equal(t, 2, len(res.Streams))
Expand Down
21 changes: 16 additions & 5 deletions pkg/iter/sample_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,10 @@ func (i *sortSampleIterator) Close() error {
}

type sampleQueryClientIterator struct {
client QuerySampleClient
err error
curr SampleIterator
client QuerySampleClient
ingester logproto.QuerierClient
err error
curr SampleIterator
}

// QuerySampleClient is GRPC stream client with only method used by the SampleQueryClientIterator
Expand All @@ -473,9 +474,10 @@ type QuerySampleClient interface {
}

// NewQueryClientIterator returns an iterator over a QueryClient.
func NewSampleQueryClientIterator(client QuerySampleClient) SampleIterator {
func NewSampleQueryClientIterator(client QuerySampleClient, ingester logproto.QuerierClient) SampleIterator {
return &sampleQueryClientIterator{
client: client,
client: client,
ingester: ingester,
}
}

Expand All @@ -491,6 +493,15 @@ func (i *sampleQueryClientIterator) Next() bool {
}
stats.JoinIngesters(ctx, batch.Stats)
i.curr = NewSampleQueryResponseIterator(batch)
if i.ingester != nil {
_, err = i.ingester.Ack(ctx, &logproto.AckRequest{
Id: batch.Id,
})
if err != nil {
i.err = err
return false
}
}
}
return true
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/iter/sample_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func TestNewSampleQueryClientIterator(t *testing.T) {
{varSeries},
{carSeries},
},
})
}, nil)
for i := 1; i < 4; i++ {
require.True(t, it.Next(), i)
require.Equal(t, `{foo="var"}`, it.Labels(), i)
Expand Down
Loading

0 comments on commit 220c443

Please sign in to comment.