Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: blockbuilder timespan #15522

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/blockbuilder/builder/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"

"github.com/grafana/loki/pkg/push"
)
Expand Down Expand Up @@ -411,6 +412,8 @@ func (s *stream) closeChunk() (*chunkenc.MemChunk, error) {
return nil, fmt.Errorf("closing chunk: %w", err)
}

level.Debug(util_log.Logger).Log("msg", "chunk closed", "entries", s.chunk.Size(), "blocks", s.chunk.BlockCount(), "stream", s.ls.String())

s.metrics.samplesPerChunk.Observe(float64(s.chunk.Size()))
s.metrics.blocksPerChunk.Observe(float64(s.chunk.BlockCount()))
s.metrics.chunksCreatedTotal.Inc()
Expand Down
150 changes: 91 additions & 59 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
Expand Down Expand Up @@ -111,13 +113,14 @@ type BlockBuilder struct {

id string
cfg Config
kafkaCfg kafka.Config
periodicConfigs []config.PeriodConfig
metrics *builderMetrics
logger log.Logger

decoder *kafka.Decoder
readerFactory func(partition int32) (partition.Reader, error)
metrics *builderMetrics
logger log.Logger
registerer prometheus.Registerer

decoder *kafka.Decoder
store stores.ChunkWriter
objStore *MultiStore

Expand All @@ -128,34 +131,36 @@ type BlockBuilder struct {
func NewBlockBuilder(
id string,
cfg Config,
kafkaCfg kafka.Config,
periodicConfigs []config.PeriodConfig,
readerFactory func(partition int32) (partition.Reader, error),
store stores.ChunkWriter,
objStore *MultiStore,
logger log.Logger,
reg prometheus.Registerer,
registerer prometheus.Registerer,
) (*BlockBuilder,
error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}

t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, reg)
t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, registerer)
if err != nil {
return nil, fmt.Errorf("create grpc transport: %w", err)
}

i := &BlockBuilder{
id: id,
cfg: cfg,
kafkaCfg: kafkaCfg,
periodicConfigs: periodicConfigs,
metrics: newBuilderMetrics(reg),
metrics: newBuilderMetrics(registerer),
logger: logger,
registerer: registerer,
decoder: decoder,
readerFactory: readerFactory,
store: store,
objStore: objStore,
inflightJobs: make(map[string]*types.Job),
BuilderTransport: t,
}

Expand All @@ -164,20 +169,26 @@ func NewBlockBuilder(
}

func (i *BlockBuilder) running(ctx context.Context) error {
wg := sync.WaitGroup{}

errgrp, ctx := errgroup.WithContext(ctx)
for j := 0; j < i.cfg.WorkerParallelism; j++ {
wg.Add(1)
go func(id string) {
defer wg.Done()
workerID := fmt.Sprintf("block-builder-worker-%d", j)
errgrp.Go(func() error {
c, err := client.NewReaderClient(
i.kafkaCfg,
client.NewReaderClientMetrics(workerID, i.registerer),
log.With(i.logger, "component", workerID),
)
if err != nil {
return err
}

var waitFor time.Duration
for {
select {
case <-ctx.Done():
return
return nil
case <-time.After(waitFor):
gotJob, err := i.runOne(ctx, id)
gotJob, err := i.runOne(ctx, c, workerID)
if err != nil {
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}
Expand All @@ -190,30 +201,27 @@ func (i *BlockBuilder) running(ctx context.Context) error {
}
}
}
}(fmt.Sprintf("worker-%d", j))
}

wg.Add(1)
go func() {
defer wg.Done()
})
}

errgrp.Go(func() error {
ticker := time.NewTicker(i.cfg.SyncInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
if err := i.syncJobs(ctx); err != nil {
level.Error(i.logger).Log("msg", "failed to sync jobs", "err", err)
}
}
}
}()
})

wg.Wait()
return nil
return errgrp.Wait()
}

func (i *BlockBuilder) syncJobs(ctx context.Context) error {
Expand All @@ -232,7 +240,7 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error {
return nil
}

func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) {
func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID string) (bool, error) {
// assuming GetJob blocks/polls until a job is available
resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: workerID,
Expand Down Expand Up @@ -265,11 +273,14 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
Job: job,
Success: true,
}
if _, err = i.processJob(ctx, job, logger); err != nil {

_, lastConsumedRecordTS, err := i.processJob(ctx, c, job, logger)
if err != nil {
level.Error(i.logger).Log("msg", "failed to process job", "err", err)
completion.Success = false
}

completion.Job.UpdateLastConsumedRecordTS(lastConsumedRecordTS)
if _, err := withBackoff(
ctx,
i.cfg.Backoff,
Expand All @@ -291,7 +302,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
return true, err
}

func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffset int64, lastConsumedRecordTS int64, err error) {
level.Debug(logger).Log("msg", "beginning job")

indexer := newTsdbCreator()
Expand All @@ -304,7 +315,6 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
i.metrics,
)

var lastOffset int64
p := newPipeline(ctx)

// Pipeline stage 1: Process the job offsets and write records to inputCh
Expand All @@ -315,7 +325,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh)
lastOffset, lastConsumedRecordTS, err = i.loadRecords(ctx, c, job.Partition(), job.Offsets(), inputCh)
return err
},
func(ctx context.Context) error {
Expand Down Expand Up @@ -452,7 +462,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"err", err,
)
if err != nil {
return 0, err
return 0, -1, err
}

var (
Expand All @@ -463,7 +473,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
built, err := indexer.create(ctx, nodeName, tableRanges)
if err != nil {
level.Error(logger).Log("msg", "failed to build index", "err", err)
return 0, err
return 0, -1, err
}

u := newUploader(i.objStore)
Expand All @@ -484,86 +494,108 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
)
return
}); err != nil {
return 0, err
return 0, -1, err
}
}

if lastOffset <= job.Offsets().Min {
return lastOffset, nil
return
}

// log success
level.Info(logger).Log(
"msg", "successfully processed job",
"last_offset", lastOffset,
"last_consumed_record_ts", time.UnixMilli(lastConsumedRecordTS),
)

return lastOffset, nil
return
}

func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
f, err := i.readerFactory(partitionID)
if err != nil {
return 0, err
}

f.SetOffsetForConsumption(offsets.Min)
func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, int64, error) {
// Use NoResetOffset to avoid resetting the offset to the beginning of the partition when the requested offset is out of range.
// This could happen if the requested records are already outside of retention period. We should fail the job is such cases leaving the scheduler to make a decision.
c.AddConsumePartitions(map[string]map[int32]kgo.Offset{
i.kafkaCfg.Topic: {partitionID: kgo.NoResetOffset().At(offsets.Min)},
})
defer c.RemoveConsumePartitions(map[string][]int32{
i.kafkaCfg.Topic: {partitionID},
})

var (
lastOffset = offsets.Min - 1
boff = backoff.New(ctx, i.cfg.Backoff)
lastConsumedOffset = offsets.Min - 1
lastConsumeRecordTS int64 = -1
lastSeenOffset = offsets.Min - 1
boff = backoff.New(ctx, i.cfg.Backoff)
)

for lastOffset < offsets.Max && boff.Ongoing() {
var records []partition.Record
records, err = f.Poll(ctx, int(offsets.Max-lastOffset))
if err != nil {
for lastSeenOffset < offsets.Max && boff.Ongoing() {
if err := context.Cause(ctx); err != nil {
return 0, -1, err
}

fs := c.PollRecords(ctx, int(offsets.Max-lastConsumedOffset))
// TODO: better error handling for non-retrybale errors
// we don't have to iterate over all errors since we only fetch a single partition
if err := fs.Err(); err != nil {
level.Error(i.logger).Log("msg", "failed to poll records", "err", err)
boff.Wait()
continue
}

if len(records) == 0 {
if fs.Empty() {
// No more records available
break
continue
}

// Reset backoff on successful poll
boff.Reset()

converted := make([]AppendInput, 0, len(records))
for _, record := range records {
converted := make([]AppendInput, 0, fs.NumRecords())
var totalEntries int64
for iter := fs.RecordIter(); !iter.Done(); {
record := iter.Next()
lastSeenOffset = record.Offset

if record.Offset >= offsets.Max {
level.Debug(i.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max)
break
}
lastOffset = record.Offset

stream, labels, err := i.decoder.Decode(record.Content)
stream, labels, err := i.decoder.Decode(record.Value)
if err != nil {
return 0, fmt.Errorf("failed to decode record: %w", err)
return 0, -1, fmt.Errorf("failed to decode record: %w", err)
}

lastConsumedOffset = record.Offset
lastConsumeRecordTS = record.Timestamp.UnixMilli()

if len(stream.Entries) == 0 {
continue
}

totalEntries += int64(len(stream.Entries))

converted = append(converted, AppendInput{
tenant: record.TenantID,
tenant: string(record.Key),
labels: labels,
labelsStr: stream.Labels,
entries: stream.Entries,
})
}

level.Debug(i.logger).Log("msg", "loaded records", "records", len(converted), "entries", totalEntries)

if len(converted) > 0 {
select {
case ch <- converted:
case <-ctx.Done():
return 0, ctx.Err()
return 0, -1, ctx.Err()
}
}
}

return lastOffset, boff.Err()
return lastConsumedOffset, lastConsumeRecordTS, boff.Err()
}

func withBackoff[T any](
Expand Down
Loading
Loading