diff --git a/pkg/blockbuilder/builder/appender.go b/pkg/blockbuilder/builder/appender.go index d956eb7931c43..9a00975b10067 100644 --- a/pkg/blockbuilder/builder/appender.go +++ b/pkg/blockbuilder/builder/appender.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores" "github.com/grafana/loki/v3/pkg/util" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/pkg/push" ) @@ -411,6 +412,8 @@ func (s *stream) closeChunk() (*chunkenc.MemChunk, error) { return nil, fmt.Errorf("closing chunk: %w", err) } + level.Debug(util_log.Logger).Log("msg", "chunk closed", "entries", s.chunk.Size(), "blocks", s.chunk.BlockCount(), "stream", s.ls.String()) + s.metrics.samplesPerChunk.Observe(float64(s.chunk.Size())) s.metrics.blocksPerChunk.Observe(float64(s.chunk.BlockCount())) s.metrics.chunksCreatedTotal.Inc() diff --git a/pkg/blockbuilder/builder/builder.go b/pkg/blockbuilder/builder/builder.go index ad981e3183d0e..9d4b9ab2ce9dc 100644 --- a/pkg/blockbuilder/builder/builder.go +++ b/pkg/blockbuilder/builder/builder.go @@ -15,11 +15,13 @@ import ( "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kgo" + "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/blockbuilder/types" "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/kafka/partition" + "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores" @@ -111,13 +113,14 @@ type BlockBuilder struct { id string cfg Config + kafkaCfg kafka.Config periodicConfigs []config.PeriodConfig - metrics *builderMetrics - logger log.Logger - decoder *kafka.Decoder - readerFactory func(partition int32) (partition.Reader, error) + metrics *builderMetrics + logger log.Logger + registerer prometheus.Registerer + decoder *kafka.Decoder store stores.ChunkWriter objStore *MultiStore @@ -128,12 +131,12 @@ type BlockBuilder struct { func NewBlockBuilder( id string, cfg Config, + kafkaCfg kafka.Config, periodicConfigs []config.PeriodConfig, - readerFactory func(partition int32) (partition.Reader, error), store stores.ChunkWriter, objStore *MultiStore, logger log.Logger, - reg prometheus.Registerer, + registerer prometheus.Registerer, ) (*BlockBuilder, error) { decoder, err := kafka.NewDecoder() @@ -141,7 +144,7 @@ func NewBlockBuilder( return nil, err } - t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, reg) + t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, registerer) if err != nil { return nil, fmt.Errorf("create grpc transport: %w", err) } @@ -149,13 +152,15 @@ func NewBlockBuilder( i := &BlockBuilder{ id: id, cfg: cfg, + kafkaCfg: kafkaCfg, periodicConfigs: periodicConfigs, - metrics: newBuilderMetrics(reg), + metrics: newBuilderMetrics(registerer), logger: logger, + registerer: registerer, decoder: decoder, - readerFactory: readerFactory, store: store, objStore: objStore, + inflightJobs: make(map[string]*types.Job), BuilderTransport: t, } @@ -164,20 +169,26 @@ func NewBlockBuilder( } func (i *BlockBuilder) running(ctx context.Context) error { - wg := sync.WaitGroup{} - + errgrp, ctx := errgroup.WithContext(ctx) for j := 0; j < i.cfg.WorkerParallelism; j++ { - wg.Add(1) - go func(id string) { - defer wg.Done() + workerID := fmt.Sprintf("block-builder-worker-%d", j) + errgrp.Go(func() error { + c, err := client.NewReaderClient( + i.kafkaCfg, + client.NewReaderClientMetrics(workerID, i.registerer), + log.With(i.logger, "component", workerID), + ) + if err != nil { + return err + } var waitFor time.Duration for { select { case <-ctx.Done(): - return + return nil case <-time.After(waitFor): - gotJob, err := i.runOne(ctx, id) + gotJob, err := i.runOne(ctx, c, workerID) if err != nil { level.Error(i.logger).Log("msg", "block builder run failed", "err", err) } @@ -190,30 +201,27 @@ func (i *BlockBuilder) running(ctx context.Context) error { } } } - }(fmt.Sprintf("worker-%d", j)) - } - wg.Add(1) - go func() { - defer wg.Done() + }) + } + errgrp.Go(func() error { ticker := time.NewTicker(i.cfg.SyncInterval) defer ticker.Stop() for { select { case <-ctx.Done(): - return + return nil case <-ticker.C: if err := i.syncJobs(ctx); err != nil { level.Error(i.logger).Log("msg", "failed to sync jobs", "err", err) } } } - }() + }) - wg.Wait() - return nil + return errgrp.Wait() } func (i *BlockBuilder) syncJobs(ctx context.Context) error { @@ -232,7 +240,7 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error { return nil } -func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) { +func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID string) (bool, error) { // assuming GetJob blocks/polls until a job is available resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{ BuilderID: workerID, @@ -265,11 +273,14 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error Job: job, Success: true, } - if _, err = i.processJob(ctx, job, logger); err != nil { + + _, lastConsumedRecordTS, err := i.processJob(ctx, c, job, logger) + if err != nil { level.Error(i.logger).Log("msg", "failed to process job", "err", err) completion.Success = false } + completion.Job.UpdateLastConsumedRecordTS(lastConsumedRecordTS) if _, err := withBackoff( ctx, i.cfg.Backoff, @@ -291,7 +302,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error return true, err } -func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) { +func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffset int64, lastConsumedRecordTS int64, err error) { level.Debug(logger).Log("msg", "beginning job") indexer := newTsdbCreator() @@ -304,7 +315,6 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo i.metrics, ) - var lastOffset int64 p := newPipeline(ctx) // Pipeline stage 1: Process the job offsets and write records to inputCh @@ -315,7 +325,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo "load records", 1, func(ctx context.Context) error { - lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh) + lastOffset, lastConsumedRecordTS, err = i.loadRecords(ctx, c, job.Partition(), job.Offsets(), inputCh) return err }, func(ctx context.Context) error { @@ -452,7 +462,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo "err", err, ) if err != nil { - return 0, err + return 0, -1, err } var ( @@ -463,7 +473,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo built, err := indexer.create(ctx, nodeName, tableRanges) if err != nil { level.Error(logger).Log("msg", "failed to build index", "err", err) - return 0, err + return 0, -1, err } u := newUploader(i.objStore) @@ -484,86 +494,108 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo ) return }); err != nil { - return 0, err + return 0, -1, err } } if lastOffset <= job.Offsets().Min { - return lastOffset, nil + return } // log success level.Info(logger).Log( "msg", "successfully processed job", "last_offset", lastOffset, + "last_consumed_record_ts", time.UnixMilli(lastConsumedRecordTS), ) - return lastOffset, nil + return } -func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) { - f, err := i.readerFactory(partitionID) - if err != nil { - return 0, err - } - - f.SetOffsetForConsumption(offsets.Min) +func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, int64, error) { + // Use NoResetOffset to avoid resetting the offset to the beginning of the partition when the requested offset is out of range. + // This could happen if the requested records are already outside of retention period. We should fail the job is such cases leaving the scheduler to make a decision. + c.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + i.kafkaCfg.Topic: {partitionID: kgo.NoResetOffset().At(offsets.Min)}, + }) + defer c.RemoveConsumePartitions(map[string][]int32{ + i.kafkaCfg.Topic: {partitionID}, + }) var ( - lastOffset = offsets.Min - 1 - boff = backoff.New(ctx, i.cfg.Backoff) + lastConsumedOffset = offsets.Min - 1 + lastConsumeRecordTS int64 = -1 + lastSeenOffset = offsets.Min - 1 + boff = backoff.New(ctx, i.cfg.Backoff) ) - for lastOffset < offsets.Max && boff.Ongoing() { - var records []partition.Record - records, err = f.Poll(ctx, int(offsets.Max-lastOffset)) - if err != nil { + for lastSeenOffset < offsets.Max && boff.Ongoing() { + if err := context.Cause(ctx); err != nil { + return 0, -1, err + } + + fs := c.PollRecords(ctx, int(offsets.Max-lastConsumedOffset)) + // TODO: better error handling for non-retrybale errors + // we don't have to iterate over all errors since we only fetch a single partition + if err := fs.Err(); err != nil { + level.Error(i.logger).Log("msg", "failed to poll records", "err", err) boff.Wait() continue } - if len(records) == 0 { + if fs.Empty() { // No more records available - break + continue } // Reset backoff on successful poll boff.Reset() - converted := make([]AppendInput, 0, len(records)) - for _, record := range records { + converted := make([]AppendInput, 0, fs.NumRecords()) + var totalEntries int64 + for iter := fs.RecordIter(); !iter.Done(); { + record := iter.Next() + lastSeenOffset = record.Offset + if record.Offset >= offsets.Max { level.Debug(i.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max) break } - lastOffset = record.Offset - stream, labels, err := i.decoder.Decode(record.Content) + stream, labels, err := i.decoder.Decode(record.Value) if err != nil { - return 0, fmt.Errorf("failed to decode record: %w", err) + return 0, -1, fmt.Errorf("failed to decode record: %w", err) } + + lastConsumedOffset = record.Offset + lastConsumeRecordTS = record.Timestamp.UnixMilli() + if len(stream.Entries) == 0 { continue } + totalEntries += int64(len(stream.Entries)) + converted = append(converted, AppendInput{ - tenant: record.TenantID, + tenant: string(record.Key), labels: labels, labelsStr: stream.Labels, entries: stream.Entries, }) } + level.Debug(i.logger).Log("msg", "loaded records", "records", len(converted), "entries", totalEntries) + if len(converted) > 0 { select { case ch <- converted: case <-ctx.Done(): - return 0, ctx.Err() + return 0, -1, ctx.Err() } } } - return lastOffset, boff.Err() + return lastConsumedOffset, lastConsumeRecordTS, boff.Err() } func withBackoff[T any]( diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 8055fef5ddc95..7794769440945 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -35,7 +35,7 @@ type Config struct { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.") f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") - f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.") + f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 48*time.Hour, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.") f.StringVar( &cfg.Strategy, prefix+"strategy", @@ -80,6 +80,7 @@ func (cfg *Config) Validate() error { if cfg.TargetRecordCount <= 0 { return errors.New("target record count must be a non-zero value") } + case TimeSpanStrategy: default: return fmt.Errorf("invalid strategy: %s", cfg.Strategy) } @@ -106,6 +107,8 @@ func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetMan switch cfg.Strategy { case RecordCountStrategy: planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, cfg.LookbackPeriod, logger) + case TimeSpanStrategy: + planner = NewTimeSpanPlanner(time.Hour, cfg.LookbackPeriod, offsetManager, func() time.Time { return time.Now().UTC() }, logger) default: return nil, fmt.Errorf("invalid strategy: %s", cfg.Strategy) } @@ -231,13 +234,19 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, logger := log.With(s.logger, "job", job.ID()) if success { + if job.LastConsumedRecordTS() == 0 { + level.Error(logger).Log("msg", "job has no last consumed record timestamp") + return nil + } + if err = s.offsetManager.Commit( ctx, job.Partition(), job.Offsets().Max-1, // max is exclusive, so commit max-1 + partition.MarshallCommitMeta(job.LastConsumedRecordTS()), ); err == nil { s.queue.MarkComplete(job.ID(), types.JobStatusComplete) - level.Info(logger).Log("msg", "job completed successfully") + level.Info(logger).Log("msg", "job completed successfully", "offset", job.Offsets().Max-1, "last_consumed_record_ts", time.UnixMilli(job.LastConsumedRecordTS())) return nil } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 78468bbea97ae..8d029635a105a 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -10,11 +10,17 @@ import ( "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" + "github.com/grafana/loki/v3/pkg/kafka/partition" +) + +const ( + DefaultBufferPeriod = time.Minute * 30 ) // OffsetReader is an interface to list offsets for all partitions of a topic from Kafka. type OffsetReader interface { GroupLag(context.Context, time.Duration) (map[int32]kadm.GroupMemberLag, error) + FetchPartitionOffset(context.Context, int32, partition.SpecialOffset) (int64, error) } type Planner interface { @@ -24,10 +30,12 @@ type Planner interface { const ( RecordCountStrategy = "record-count" + TimeSpanStrategy = "time-span" ) var validStrategies = []string{ RecordCountStrategy, + TimeSpanStrategy, } // tries to consume upto targetRecordCount records per partition @@ -104,3 +112,108 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) return jobs, nil } + +type TimeSpanPlanner struct { + targetPeriod time.Duration + now func() time.Time + + lookbackPeriod time.Duration + offsetReader OffsetReader + + logger log.Logger +} + +func NewTimeSpanPlanner(interval time.Duration, lookbackPeriod time.Duration, offsetReader OffsetReader, now func() time.Time, logger log.Logger) *TimeSpanPlanner { + return &TimeSpanPlanner{ + targetPeriod: interval, + lookbackPeriod: lookbackPeriod, + offsetReader: offsetReader, + now: now, + logger: logger, + } +} + +func (p *TimeSpanPlanner) Name() string { + return TimeSpanStrategy +} + +// create a single job per partition +// do not consume records within buffer time - defaults to 30m +func (p *TimeSpanPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) { + // Add some buffer time to avoid consuming recent logs. + // truncate to the nearest Interval + consumeBoundary := p.now().Add(-DefaultBufferPeriod).Truncate(p.targetPeriod).UnixMilli() + level.Info(p.logger).Log("msg", "start planning", " consumeBoundary", time.UnixMilli(consumeBoundary)) + + offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod) + if err != nil { + level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) + return nil, err + } + + jobs := make([]*JobWithMetadata, 0, len(offsets)) + for _, partitionOffset := range offsets { + if partitionOffset.End.Offset <= partitionOffset.Start.Offset { + continue + } + + // 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. + // no additional validation is needed here + // 2. committed offset could be behind start offset if we are falling behind retention period. + startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset) + + var lastConsumedRecordTS int64 + if partitionOffset.Commit.Metadata != "" { + lastConsumedRecordTS, err = partition.UnmarshallCommitMeta(partitionOffset.Commit.Metadata) + if err != nil { + level.Error(p.logger).Log("msg", "failed to unmarshal commit metadata", "err", err) + break + } + } else { + level.Error(p.logger).Log("msg", "no commit metadata found", "partition", partitionOffset.Partition) + break + } + + consumeUptoTS := min(lastConsumedRecordTS+p.targetPeriod.Milliseconds(), consumeBoundary) + consumeUptoOffset, err := p.offsetReader.FetchPartitionOffset(ctx, partitionOffset.Partition, partition.SpecialOffset(consumeUptoTS)) + if err != nil { + level.Error(p.logger).Log("msg", "failed to list offsets after timestamp", "err", err) + return nil, err + } + + // no records >= consumeUptoTS + if consumeUptoOffset == -1 { + level.Info(p.logger).Log("msg", "no records to eligible to process", "partition", partitionOffset.Partition, + "commitOffset", partitionOffset.Commit.At, + "latestOffset", partitionOffset.End.Offset, + "consumeUptoTS", time.UnixMilli(consumeUptoTS)) + continue + } + + endOffset := consumeUptoOffset + if startOffset >= endOffset { + level.Info(p.logger).Log("msg", "no records to process", "partition", partitionOffset.Partition, + "commitOffset", partitionOffset.Commit.At, + "startOffset", startOffset, + "endOffset", endOffset) + continue + } + + job := NewJobWithMetadata( + types.NewJob(partitionOffset.Partition, types.Offsets{ + Min: startOffset, + Max: endOffset, + }), int(endOffset-startOffset), + ) + + level.Debug(p.logger).Log("msg", "created job", "partition", partitionOffset.Partition, "consumeUptoTS", time.UnixMilli(consumeUptoTS), "min", startOffset, "max", endOffset) + jobs = append(jobs, job) + } + + // Sort jobs by partition number to ensure consistent ordering + sort.Slice(jobs, func(i, j int) bool { + return jobs[i].Job.Partition() < jobs[j].Job.Partition() + }) + + return jobs, nil +} diff --git a/pkg/blockbuilder/types/grpc_transport.go b/pkg/blockbuilder/types/grpc_transport.go index b53fdeeb4a7d4..2453a446a80cb 100644 --- a/pkg/blockbuilder/types/grpc_transport.go +++ b/pkg/blockbuilder/types/grpc_transport.go @@ -128,6 +128,7 @@ func protoToJob(p *proto.Job) *Job { Min: p.GetOffsets().GetMin(), Max: p.GetOffsets().GetMax(), }, + lastConsumedRecordTS: p.GetLastConsumedRecordTS(), } } @@ -143,5 +144,6 @@ func jobToProto(j *Job) *proto.Job { Min: j.offsets.Min, Max: j.offsets.Max, }, + LastConsumedRecordTS: j.LastConsumedRecordTS(), } } diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index ca23aa003b96e..82cb309c3da15 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -6,8 +6,9 @@ import "fmt" type Job struct { id string // Partition and offset information - partition int32 - offsets Offsets + partition int32 + offsets Offsets + lastConsumedRecordTS int64 } func (j *Job) ID() string { @@ -22,6 +23,14 @@ func (j *Job) Offsets() Offsets { return j.offsets } +func (j *Job) LastConsumedRecordTS() int64 { + return j.lastConsumedRecordTS +} + +func (j *Job) UpdateLastConsumedRecordTS(ts int64) { + j.lastConsumedRecordTS = ts +} + // JobStatus represents the current state of a job type JobStatus int @@ -68,5 +77,5 @@ func NewJob(partition int32, offsets Offsets) *Job { // GenerateJobID creates a deterministic job ID from partition and offsets func GenerateJobID(partition int32, offsets Offsets) string { - return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max) + return fmt.Sprintf("job-%d-%d", partition, offsets.Min) } diff --git a/pkg/blockbuilder/types/proto/blockbuilder.pb.go b/pkg/blockbuilder/types/proto/blockbuilder.pb.go index 331b210a34afa..3099b3d553346 100644 --- a/pkg/blockbuilder/types/proto/blockbuilder.pb.go +++ b/pkg/blockbuilder/types/proto/blockbuilder.pb.go @@ -362,9 +362,10 @@ func (m *Offsets) GetMax() int64 { // Job represents a block building job type Job struct { - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Partition int32 `protobuf:"varint,2,opt,name=partition,proto3" json:"partition,omitempty"` - Offsets *Offsets `protobuf:"bytes,3,opt,name=offsets,proto3" json:"offsets,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Partition int32 `protobuf:"varint,2,opt,name=partition,proto3" json:"partition,omitempty"` + Offsets *Offsets `protobuf:"bytes,3,opt,name=offsets,proto3" json:"offsets,omitempty"` + LastConsumedRecordTS int64 `protobuf:"varint,4,opt,name=LastConsumedRecordTS,proto3" json:"LastConsumedRecordTS,omitempty"` } func (m *Job) Reset() { *m = Job{} } @@ -420,6 +421,13 @@ func (m *Job) GetOffsets() *Offsets { return nil } +func (m *Job) GetLastConsumedRecordTS() int64 { + if m != nil { + return m.LastConsumedRecordTS + } + return 0 +} + func init() { proto.RegisterType((*GetJobRequest)(nil), "blockbuilder.types.GetJobRequest") proto.RegisterType((*GetJobResponse)(nil), "blockbuilder.types.GetJobResponse") @@ -436,36 +444,38 @@ func init() { } var fileDescriptor_04968622516f7b79 = []byte{ - // 455 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0x3f, 0x8e, 0xd3, 0x40, - 0x14, 0xc6, 0x3d, 0xb6, 0xd8, 0x90, 0x17, 0x11, 0x96, 0x41, 0x88, 0x28, 0xc0, 0x68, 0x19, 0x24, - 0x58, 0x0a, 0x6c, 0x29, 0xc0, 0x05, 0xa0, 0x40, 0x2c, 0x05, 0xc2, 0xa1, 0xda, 0x06, 0xfc, 0x67, - 0x92, 0x9d, 0xd8, 0xf1, 0x18, 0xcf, 0x18, 0x65, 0x3b, 0x8e, 0xc0, 0x05, 0xe8, 0x39, 0x0a, 0x65, - 0xca, 0x2d, 0x89, 0xd3, 0x50, 0xee, 0x11, 0x50, 0xc6, 0x76, 0xc0, 0x5a, 0x2b, 0x6c, 0x43, 0x65, - 0xfb, 0xf3, 0xcf, 0xef, 0xfb, 0xfc, 0xde, 0x1b, 0x70, 0xd2, 0x68, 0xea, 0xf8, 0xb1, 0x08, 0x22, - 0x3f, 0xe7, 0x71, 0xc8, 0x32, 0x47, 0x9d, 0xa6, 0x4c, 0x3a, 0x69, 0x26, 0x94, 0x68, 0xbc, 0xb0, - 0xb5, 0x84, 0x71, 0x43, 0xd3, 0x30, 0xb5, 0xe1, 0xda, 0x2b, 0xa6, 0x8e, 0x84, 0xef, 0xb2, 0x4f, - 0x39, 0x93, 0x0a, 0xdf, 0x03, 0xa8, 0x88, 0x0f, 0x3c, 0x1c, 0xa0, 0x03, 0x74, 0xd8, 0x75, 0xbb, - 0x95, 0xf2, 0x3a, 0xa4, 0x6f, 0xa0, 0x5f, 0xf3, 0x32, 0x15, 0x89, 0x64, 0xf8, 0x31, 0x58, 0x33, - 0xe1, 0x6b, 0xb2, 0x37, 0xba, 0x6d, 0x5f, 0xf4, 0xb0, 0x37, 0xf4, 0x86, 0xc1, 0x7d, 0x30, 0x45, - 0x34, 0x30, 0x0f, 0xd0, 0xe1, 0x55, 0xd7, 0x14, 0x11, 0x5d, 0x00, 0x7e, 0x29, 0xe6, 0x69, 0xcc, - 0x14, 0xbb, 0x74, 0x82, 0xda, 0xcf, 0xbc, 0x84, 0xdf, 0x00, 0x3a, 0x32, 0x0f, 0x02, 0x26, 0xe5, - 0xc0, 0xd2, 0xa6, 0xf5, 0x23, 0xbd, 0x05, 0x37, 0x1b, 0xce, 0xe5, 0xbf, 0xd0, 0x63, 0xe8, 0x8f, - 0x4f, 0x93, 0xe0, 0x7f, 0x84, 0xa1, 0x37, 0xe0, 0xfa, 0xb6, 0x76, 0x65, 0xf7, 0x04, 0x3a, 0x6f, - 0x27, 0x13, 0xc9, 0x94, 0xc4, 0xfb, 0x60, 0xcd, 0x79, 0xa2, 0x0d, 0x2c, 0x77, 0x73, 0xab, 0x15, - 0x6f, 0xa1, 0x4b, 0x6f, 0x14, 0x6f, 0x41, 0x67, 0x60, 0x1d, 0x95, 0x5d, 0xdc, 0x46, 0x31, 0x79, - 0x88, 0xef, 0x42, 0x37, 0xf5, 0x32, 0xc5, 0x15, 0x17, 0x89, 0xc6, 0xaf, 0xb8, 0x7f, 0x04, 0xfc, - 0x1c, 0x3a, 0xa2, 0xf4, 0xd0, 0x3d, 0xe8, 0x8d, 0xee, 0xb4, 0xa5, 0xac, 0x62, 0xb8, 0x35, 0x3b, - 0xfa, 0x66, 0xc2, 0xfe, 0x38, 0x38, 0x61, 0x61, 0x1e, 0xb3, 0x6c, 0xcc, 0xb2, 0xcf, 0x3c, 0x60, - 0xf8, 0x1d, 0xec, 0x95, 0xc3, 0xc7, 0xf7, 0xdb, 0x8a, 0x34, 0x16, 0x69, 0x48, 0x77, 0x21, 0x55, - 0x03, 0x0c, 0xfc, 0x11, 0x7a, 0x7f, 0x0d, 0x02, 0x3f, 0x6c, 0xfb, 0xe8, 0xe2, 0x8e, 0x0c, 0x1f, - 0xfd, 0x93, 0xdb, 0x3a, 0xbc, 0x87, 0x4e, 0xd5, 0x77, 0xdc, 0x1a, 0xa9, 0x39, 0xf0, 0xe1, 0x83, - 0x9d, 0x4c, 0x5d, 0xf5, 0xc5, 0x6c, 0xb9, 0x22, 0xc6, 0xd9, 0x8a, 0x18, 0xe7, 0x2b, 0x82, 0xbe, - 0x14, 0x04, 0x7d, 0x2f, 0x08, 0xfa, 0x51, 0x10, 0xb4, 0x2c, 0x08, 0xfa, 0x59, 0x10, 0xf4, 0xab, - 0x20, 0xc6, 0x79, 0x41, 0xd0, 0xd7, 0x35, 0x31, 0x96, 0x6b, 0x62, 0x9c, 0xad, 0x89, 0x71, 0xfc, - 0x6c, 0xca, 0xd5, 0x49, 0xee, 0xdb, 0x81, 0x98, 0x3b, 0xd3, 0xcc, 0x9b, 0x78, 0x89, 0xe7, 0xc4, - 0x22, 0xe2, 0x3b, 0x8f, 0xb2, 0xbf, 0xa7, 0x2f, 0x4f, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x68, - 0x46, 0x93, 0x30, 0xf1, 0x03, 0x00, 0x00, + // 481 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0x4f, 0x6e, 0xd3, 0x40, + 0x14, 0xc6, 0x3d, 0x36, 0x34, 0xe4, 0x45, 0x84, 0x32, 0x80, 0xb0, 0x02, 0x8c, 0x8a, 0x91, 0xa0, + 0x2c, 0xb0, 0xa5, 0x00, 0x17, 0xa0, 0x0b, 0x44, 0x41, 0x42, 0x38, 0x5d, 0x75, 0x03, 0xfe, 0x33, + 0x49, 0xa7, 0x76, 0x3c, 0xc6, 0x33, 0x46, 0xe9, 0x8e, 0x23, 0x70, 0x01, 0x58, 0x73, 0x14, 0x96, + 0x59, 0x76, 0x49, 0x9c, 0x0d, 0xcb, 0x1e, 0x01, 0x65, 0x6c, 0x07, 0xac, 0x5a, 0xa1, 0x1b, 0x56, + 0xb6, 0xbf, 0xf9, 0xf9, 0x7d, 0x9f, 0xdf, 0x7b, 0x06, 0x27, 0x8d, 0x26, 0x8e, 0x1f, 0xf3, 0x20, + 0xf2, 0x73, 0x16, 0x87, 0x34, 0x73, 0xe4, 0x49, 0x4a, 0x85, 0x93, 0x66, 0x5c, 0xf2, 0xc6, 0x81, + 0xad, 0x24, 0x8c, 0x1b, 0x9a, 0x82, 0x2d, 0x1b, 0xae, 0xbe, 0xa4, 0x72, 0x9f, 0xfb, 0x2e, 0xfd, + 0x98, 0x53, 0x21, 0xf1, 0x3d, 0x80, 0x8a, 0x78, 0xcf, 0x42, 0x13, 0xed, 0xa0, 0xdd, 0xae, 0xdb, + 0xad, 0x94, 0x57, 0xa1, 0xf5, 0x1a, 0xfa, 0x35, 0x2f, 0x52, 0x9e, 0x08, 0x8a, 0x1f, 0x83, 0x71, + 0xcc, 0x7d, 0x45, 0xf6, 0x86, 0xb7, 0xed, 0xf3, 0x1e, 0xf6, 0x8a, 0x5e, 0x31, 0xb8, 0x0f, 0x3a, + 0x8f, 0x4c, 0x7d, 0x07, 0xed, 0x5e, 0x71, 0x75, 0x1e, 0x59, 0x33, 0xc0, 0x7b, 0x7c, 0x9a, 0xc6, + 0x54, 0xd2, 0x0b, 0x27, 0xa8, 0xfd, 0xf4, 0x0b, 0xf8, 0x99, 0xd0, 0x11, 0x79, 0x10, 0x50, 0x21, + 0x4c, 0x43, 0x99, 0xd6, 0x8f, 0xd6, 0x2d, 0xb8, 0xd1, 0x70, 0x2e, 0xbf, 0xc5, 0x3a, 0x84, 0xfe, + 0xe8, 0x24, 0x09, 0xfe, 0x47, 0x18, 0xeb, 0x3a, 0x5c, 0x5b, 0xd7, 0xae, 0xec, 0x9e, 0x40, 0xe7, + 0xed, 0x78, 0x2c, 0xa8, 0x14, 0x78, 0x1b, 0x8c, 0x29, 0x4b, 0x94, 0x81, 0xe1, 0xae, 0x6e, 0x95, + 0xe2, 0xcd, 0x54, 0xe9, 0x95, 0xe2, 0xcd, 0xac, 0x6f, 0x08, 0x8c, 0xfd, 0xb2, 0x8d, 0xeb, 0x2c, + 0x3a, 0x0b, 0xf1, 0x5d, 0xe8, 0xa6, 0x5e, 0x26, 0x99, 0x64, 0x3c, 0x51, 0xfc, 0x65, 0xf7, 0x8f, + 0x80, 0x9f, 0x43, 0x87, 0x97, 0x26, 0xaa, 0x09, 0xbd, 0xe1, 0x9d, 0xb6, 0x98, 0x55, 0x0e, 0xb7, + 0x66, 0xf1, 0x10, 0x6e, 0xbe, 0xf1, 0x84, 0xdc, 0xe3, 0x89, 0xc8, 0xa7, 0x34, 0x74, 0x69, 0xc0, + 0xb3, 0xf0, 0x60, 0x64, 0x5e, 0x52, 0x79, 0x5a, 0xcf, 0x86, 0x5f, 0x75, 0xd8, 0x1e, 0x05, 0x47, + 0x34, 0xcc, 0x63, 0x9a, 0x8d, 0x68, 0xf6, 0x89, 0x05, 0x14, 0xbf, 0x83, 0xad, 0x72, 0x63, 0xf0, + 0xfd, 0x36, 0xe3, 0xc6, 0xf6, 0x0d, 0xac, 0x4d, 0x48, 0xd5, 0x35, 0x0d, 0x7f, 0x80, 0xde, 0x5f, + 0xd3, 0xc3, 0x0f, 0xdb, 0x5e, 0x3a, 0xbf, 0x58, 0x83, 0x47, 0xff, 0xe4, 0xd6, 0x0e, 0x07, 0xd0, + 0xa9, 0x86, 0x85, 0x5b, 0x23, 0x35, 0xb7, 0x64, 0xf0, 0x60, 0x23, 0x53, 0x57, 0x7d, 0x71, 0x3c, + 0x5f, 0x10, 0xed, 0x74, 0x41, 0xb4, 0xb3, 0x05, 0x41, 0x9f, 0x0b, 0x82, 0xbe, 0x17, 0x04, 0xfd, + 0x28, 0x08, 0x9a, 0x17, 0x04, 0xfd, 0x2c, 0x08, 0xfa, 0x55, 0x10, 0xed, 0xac, 0x20, 0xe8, 0xcb, + 0x92, 0x68, 0xf3, 0x25, 0xd1, 0x4e, 0x97, 0x44, 0x3b, 0x7c, 0x36, 0x61, 0xf2, 0x28, 0xf7, 0xed, + 0x80, 0x4f, 0x9d, 0x49, 0xe6, 0x8d, 0xbd, 0xc4, 0x73, 0x62, 0x1e, 0xb1, 0x8d, 0xff, 0xbf, 0xbf, + 0xa5, 0x2e, 0x4f, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x4c, 0xa0, 0x1c, 0x17, 0x26, 0x04, 0x00, + 0x00, } func (this *GetJobRequest) Equal(that interface{}) bool { @@ -673,6 +683,9 @@ func (this *Job) Equal(that interface{}) bool { if !this.Offsets.Equal(that1.Offsets) { return false } + if this.LastConsumedRecordTS != that1.LastConsumedRecordTS { + return false + } return true } func (this *GetJobRequest) GoString() string { @@ -758,13 +771,14 @@ func (this *Job) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&proto.Job{") s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") s = append(s, "Partition: "+fmt.Sprintf("%#v", this.Partition)+",\n") if this.Offsets != nil { s = append(s, "Offsets: "+fmt.Sprintf("%#v", this.Offsets)+",\n") } + s = append(s, "LastConsumedRecordTS: "+fmt.Sprintf("%#v", this.LastConsumedRecordTS)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1203,6 +1217,11 @@ func (m *Job) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.LastConsumedRecordTS != 0 { + i = encodeVarintBlockbuilder(dAtA, i, uint64(m.LastConsumedRecordTS)) + i-- + dAtA[i] = 0x20 + } if m.Offsets != nil { { size, err := m.Offsets.MarshalToSizedBuffer(dAtA[:i]) @@ -1357,6 +1376,9 @@ func (m *Job) Size() (n int) { l = m.Offsets.Size() n += 1 + l + sovBlockbuilder(uint64(l)) } + if m.LastConsumedRecordTS != 0 { + n += 1 + sovBlockbuilder(uint64(m.LastConsumedRecordTS)) + } return n } @@ -1447,6 +1469,7 @@ func (this *Job) String() string { `Id:` + fmt.Sprintf("%v", this.Id) + `,`, `Partition:` + fmt.Sprintf("%v", this.Partition) + `,`, `Offsets:` + strings.Replace(this.Offsets.String(), "Offsets", "Offsets", 1) + `,`, + `LastConsumedRecordTS:` + fmt.Sprintf("%v", this.LastConsumedRecordTS) + `,`, `}`, }, "") return s @@ -2228,6 +2251,25 @@ func (m *Job) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LastConsumedRecordTS", wireType) + } + m.LastConsumedRecordTS = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.LastConsumedRecordTS |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipBlockbuilder(dAtA[iNdEx:]) diff --git a/pkg/blockbuilder/types/proto/blockbuilder.proto b/pkg/blockbuilder/types/proto/blockbuilder.proto index 607cfc1666b8f..e4779245052ec 100644 --- a/pkg/blockbuilder/types/proto/blockbuilder.proto +++ b/pkg/blockbuilder/types/proto/blockbuilder.proto @@ -55,4 +55,5 @@ message Job { string id = 1; int32 partition = 2; Offsets offsets = 3; + int64 LastConsumedRecordTS = 4; } diff --git a/pkg/kafka/partition/committer.go b/pkg/kafka/partition/committer.go index bbe109a5d5583..db92a0c814928 100644 --- a/pkg/kafka/partition/committer.go +++ b/pkg/kafka/partition/committer.go @@ -118,7 +118,7 @@ func (c *partitionCommitter) Commit(ctx context.Context, offset int64) error { startTime := time.Now() c.commitRequestsTotal.Inc() - if err := c.offsetManager.Commit(ctx, c.partition, offset); err != nil { + if err := c.offsetManager.Commit(ctx, c.partition, offset, ""); err != nil { level.Error(c.logger).Log("msg", "failed to commit offset", "err", err, "offset", offset) c.commitFailuresTotal.Inc() c.commitRequestsLatency.Observe(time.Since(startTime).Seconds()) diff --git a/pkg/kafka/partition/kafkautil.go b/pkg/kafka/partition/kafkautil.go index 8318581e302f8..47591da8a79f3 100644 --- a/pkg/kafka/partition/kafkautil.go +++ b/pkg/kafka/partition/kafkautil.go @@ -67,6 +67,7 @@ func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin Partition: o.Partition, At: o.Offset, LeaderEpoch: o.LeaderEpoch, + Metadata: MarshallCommitMeta(fallbackOffsetMillis), }}) } } @@ -78,3 +79,36 @@ func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin } return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil } + +const ( + kafkaCommitMetaV1 = 1 +) + +// commitRecTs: timestamp of the record which was committed (and not the commit time). +func MarshallCommitMeta(commitRecTs int64) string { + return fmt.Sprintf("%d,%d", kafkaCommitMetaV1, commitRecTs) +} + +// commitRecTs: timestamp of the record which was committed (and not the commit time). +func UnmarshallCommitMeta(s string) (commitRecTs int64, err error) { + if s == "" { + return + } + var ( + version int + metaStr string + ) + _, err = fmt.Sscanf(s, "%d,%s", &version, &metaStr) + if err != nil { + return 0, fmt.Errorf("invalid commit metadata format: parse meta version: %w", err) + } + + if version != kafkaCommitMetaV1 { + return 0, fmt.Errorf("unsupported commit meta version %d", version) + } + _, err = fmt.Sscanf(metaStr, "%d", &commitRecTs) + if err != nil { + return 0, fmt.Errorf("invalid commit metadata format: %w", err) + } + return +} diff --git a/pkg/kafka/partition/offset_manager.go b/pkg/kafka/partition/offset_manager.go index efec3a47a4168..2e165f26a5e53 100644 --- a/pkg/kafka/partition/offset_manager.go +++ b/pkg/kafka/partition/offset_manager.go @@ -27,7 +27,7 @@ type OffsetManager interface { GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error) FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error) - Commit(ctx context.Context, partition int32, offset int64) error + Commit(ctx context.Context, partition int32, offset int64, metadata string) error } var _ OffsetManager = &KafkaOffsetManager{} @@ -207,12 +207,18 @@ func (r *KafkaOffsetManager) GroupLag(ctx context.Context, lookbackPeriod time.D } // Commit commits an offset to the consumer group -func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error { +func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64, metadata string) error { admin := kadm.NewClient(r.client) // Commit the last consumed offset. toCommit := kadm.Offsets{} - toCommit.AddOffset(r.cfg.Topic, partitionID, offset, -1) + toCommit.Add(kadm.Offset{ + Topic: r.cfg.Topic, + Partition: partitionID, + At: offset, + LeaderEpoch: -1, + Metadata: metadata, + }) committed, err := admin.CommitOffsets(ctx, r.ConsumerGroup(), toCommit) if err != nil { diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index d4a83c7102572..5c3ecc62a5be2 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -19,7 +19,7 @@ import ( "github.com/grafana/loki/v3/pkg/kafka/client" ) -type SpecialOffset int +type SpecialOffset int64 const ( KafkaStartOffset SpecialOffset = -2 diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index e3eaa691a58e0..8ab4e5bfbc603 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1836,21 +1836,11 @@ func (t *Loki) initBlockBuilder() (services.Service, error) { return nil, err } - readerMetrics := partition.NewReaderMetrics(prometheus.DefaultRegisterer) - readerFactory := func(partitionID int32) (partition.Reader, error) { - return partition.NewKafkaReader( - t.Cfg.KafkaConfig, - partitionID, - logger, - readerMetrics, - ) - } - bb, err := blockbuilder.NewBlockBuilder( id, t.Cfg.BlockBuilder, + t.Cfg.KafkaConfig, t.Cfg.SchemaConfig.Configs, - readerFactory, t.Store, objectStore, logger,