Skip to content

Commit

Permalink
fix(block-builder): return from Process call early if max offset is r…
Browse files Browse the repository at this point in the history
…eached (#15073)
  • Loading branch information
ashwanthgoli authored Nov 25, 2024
1 parent 1ea49e3 commit 13ea254
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 23 deletions.
28 changes: 23 additions & 5 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/dskit/backoff"
Expand Down Expand Up @@ -68,11 +70,13 @@ type PartitionJobController struct {
part partition.Reader
backoff backoff.Config
decoder *kafka.Decoder
logger log.Logger
}

func NewPartitionJobController(
controller partition.Reader,
backoff backoff.Config,
logger log.Logger,
) (*PartitionJobController, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
Expand All @@ -83,6 +87,11 @@ func NewPartitionJobController(
part: controller,
backoff: backoff,
decoder: decoder,
logger: log.With(logger,
"component", "job-controller",
"topic", controller.Topic(),
"partition", controller.Partition(),
),
}, nil
}

Expand Down Expand Up @@ -125,9 +134,9 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
err error
)

for boff.Ongoing() {
for lastOffset < offsets.Max && boff.Ongoing() {
var records []partition.Record
records, err = l.part.Poll(ctx)
records, err = l.part.Poll(ctx, int(offsets.Max-lastOffset))
if err != nil {
boff.Wait()
continue
Expand All @@ -143,11 +152,11 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c

converted := make([]AppendInput, 0, len(records))
for _, record := range records {
offset := records[len(records)-1].Offset
if offset >= offsets.Max {
if record.Offset >= offsets.Max {
level.Debug(l.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max)
break
}
lastOffset = offset
lastOffset = record.Offset

stream, labels, err := l.decoder.Decode(record.Content)
if err != nil {
Expand All @@ -163,7 +172,9 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
labelsStr: stream.Labels,
entries: stream.Entries,
})
}

if len(converted) > 0 {
select {
case ch <- converted:
case <-ctx.Done():
Expand Down Expand Up @@ -198,7 +209,14 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error)
if err != nil {
return false, Job{}, err
}

if highestOffset < committedOffset {
level.Error(l.logger).Log("msg", "partition highest offset is less than committed offset", "highest", highestOffset, "committed", committedOffset)
return false, Job{}, fmt.Errorf("partition highest offset is less than committed offset")
}

if highestOffset == committedOffset {
level.Info(l.logger).Log("msg", "no pending records to process")
return false, Job{}, nil
}

Expand Down
39 changes: 26 additions & 13 deletions pkg/blockbuilder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (i *BlockBuilder) running(ctx context.Context) error {
default:
_, err := i.runOne(ctx)
if err != nil {
return err
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}
}

Expand All @@ -157,7 +157,7 @@ func (i *BlockBuilder) running(ctx context.Context) error {
"err", err,
)
if err != nil {
return err
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}
}
}
Expand Down Expand Up @@ -213,6 +213,8 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
level.Debug(logger).Log(
"msg", "finished loading records",
"ctx_error", ctx.Err(),
"last_offset", lastOffset,
"total_records", lastOffset-job.Offsets.Min,
)
close(inputCh)
return nil
Expand Down Expand Up @@ -305,6 +307,7 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
func() (res struct{}, err error) {
err = i.store.PutOne(ctx, chk.From, chk.Through, *chk)
if err != nil {
level.Error(logger).Log("msg", "failed to flush chunk", "err", err)
i.metrics.chunksFlushFailures.Inc()
return
}
Expand All @@ -320,6 +323,10 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
Entries: uint32(chk.Data.Entries()),
}
err = indexer.Append(chk.UserID, chk.Metric, chk.ChunkRef.Fingerprint, index.ChunkMetas{meta})
if err != nil {
level.Error(logger).Log("msg", "failed to append chunk to index", "err", err)
}

return
},
); err != nil {
Expand All @@ -346,24 +353,30 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {

built, err := indexer.create(ctx, nodeName, tableRanges)
if err != nil {
level.Error(logger).Log("msg", "failed to build index", "err", err)
return false, err
}

u := newUploader(i.objStore)
for _, db := range built {
u := newUploader(i.objStore)
if err := u.Put(ctx, db); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to upload tsdb",
"path", db.id.Path(),
)
if _, err := withBackoff(ctx, i.cfg.Backoff, func() (res struct{}, err error) {
err = u.Put(ctx, db)
if err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to upload tsdb",
"path", db.id.Path(),
)
return
}

level.Debug(logger).Log(
"msg", "uploaded tsdb",
"name", db.id.Name(),
)
return
}); err != nil {
return false, err
}

level.Debug(logger).Log(
"msg", "uploaded tsdb",
"name", db.id.Name(),
)
}

if lastOffset <= job.Offsets.Min {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Reader interface {
ConsumerGroup() string
FetchLastCommittedOffset(ctx context.Context) (int64, error)
FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error)
Poll(ctx context.Context) ([]Record, error)
Poll(ctx context.Context, maxPollRecords int) ([]Record, error)
Commit(ctx context.Context, offset int64) error
// Set the target offset for consumption. reads will begin from here.
SetOffsetForConsumption(offset int64)
Expand Down Expand Up @@ -257,9 +257,10 @@ func (r *StdReader) FetchPartitionOffset(ctx context.Context, position SpecialOf
}

// Poll retrieves the next batch of records from Kafka
func (r *StdReader) Poll(ctx context.Context) ([]Record, error) {
// Number of records fetched can be limited by configuring maxPollRecords to a non-zero value.
func (r *StdReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error) {
start := time.Now()
fetches := r.client.PollFetches(ctx)
fetches := r.client.PollRecords(ctx, maxPollRecords)
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())

// Record metrics
Expand Down
4 changes: 2 additions & 2 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
}

timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
records, err := s.reader.Poll(timedCtx)
records, err := s.reader.Poll(timedCtx, -1)
cancel()

if err != nil {
Expand Down Expand Up @@ -382,7 +382,7 @@ func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record {
case <-ctx.Done():
return
default:
res, err := s.reader.Poll(ctx)
res, err := s.reader.Poll(ctx, -1)
if err != nil {
level.Error(s.logger).Log("msg", "error polling records", "err", err)
continue
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,7 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
controller, err := blockbuilder.NewPartitionJobController(
reader,
t.Cfg.BlockBuilder.Backoff,
logger,
)

if err != nil {
Expand Down

0 comments on commit 13ea254

Please sign in to comment.