Skip to content

Commit

Permalink
Ingester block lifetime changes (#628)
Browse files Browse the repository at this point in the history
* WIP: working first draft

* Moved wal iteration/completion to tempodb and honor existing block config options

* Get and specify length when flushing ingester blocks to backend, to avoid high mem usage in Min.IO client (s3)

* Rename local.LocalBackend to local.Backend

* Rename local.LocalBackend to local.Backend

* Rename local.LocalBackend to local.Backend

* Delete obsolete complete block

* Rename ingester.IngesterBlock to ingester.LocalBlock

* Update changelog

* lint

* Rename CompactorBlock to StreamingBlock

* Break complete flush op into own method, remove dead code

* Fix StreamingBlock to finish writing data file before writing meta to fix #633

* Don't recreate obsolete /wal/completed folder after clearing it out on startup

* Move ReadReader to the main backend.Reader interface, leave unimplemented for all backends except local for now

* Move block copy functionality from BackendBlock to shared method

* Move LocalBlock to wal package

* lint

* Restore test for wal->backend block completion after moving to /tempodb. Remove commented code

* Move local backend ownership from ingester to wal, code cleanup
  • Loading branch information
mdisibio authored Apr 9, 2021
1 parent b6ffb50 commit 93c378a
Show file tree
Hide file tree
Showing 31 changed files with 586 additions and 768 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [BUGFIX] Fixes issue where wal was deleted before successful flush and adds exponential backoff for flush errors [#593](https://github.com/grafana/tempo/pull/593)
* [BUGFIX] Fixes issue where Tempo would not parse odd length trace ids [#605](https://github.com/grafana/tempo/pull/605)
* [BUGFIX] Sort traces on flush to reduce unexpected recombination work by compactors [#606](https://github.com/grafana/tempo/pull/606)
* [BUGFIX] Ingester fully persists blocks locally to reduce amount of work done after restart [#628](https://github.com/grafana/tempo/pull/628)
* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613)
* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627)

Expand Down
2 changes: 1 addition & 1 deletion modules/compactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Config struct {
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.Compactor = tempodb.CompactorConfig{
ChunkSizeBytes: 10 * 1024 * 1024, // 10 MiB
FlushSizeBytes: 30 * 1024 * 1024, // 30 MiB
FlushSizeBytes: tempodb.DefaultFlushSizeBytes,
CompactedBlockRetention: time.Hour,
RetentionConcurrency: tempodb.DefaultRetentionConcurrency,
}
Expand Down
123 changes: 62 additions & 61 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -173,57 +174,17 @@ func (i *Ingester) flushLoop(j int) {
op := o.(*flushOp)
op.attempts++

retry := false
var retry bool
var err error

if op.kind == opKindComplete {
// No point in proceeding if shutdown has been initiated since
// we won't be able to queue up the next flush op
if i.flushQueues.IsStopped() {
handleAbandonedOp(op)
continue
}

level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
handleFailedOp(op, err)
continue
}

err = instance.CompleteBlock(op.blockID)
if err != nil {
handleFailedOp(op, err)

if op.attempts >= maxCompleteAttempts {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS",
"userID", op.userID, "attempts", op.attempts, "block", op.blockID.String())

err = instance.ClearCompletingBlock(op.blockID)
if err != nil {
// Failure to delete the WAL doesn't prevent flushing the bloc
handleFailedOp(op, err)
}
} else {
retry = true
}
} else {
// add a flushOp for the block we just completed
// No delay
i.enqueue(&flushOp{
kind: opKindFlush,
userID: instance.instanceID,
blockID: op.blockID,
}, false)
}

retry, err = i.handleComplete(op)
} else {
level.Info(log.Logger).Log("msg", "flushing block", "userid", op.userID, "block", op.blockID.String())
retry, err = i.handleFlush(op.userID, op.blockID)
}

err := i.flushBlock(op.userID, op.blockID)
if err != nil {
handleFailedOp(op, err)
retry = true
}
if err != nil {
handleFailedOp(op, err)
}

if retry {
Expand All @@ -245,14 +206,62 @@ func handleAbandonedOp(op *flushOp) {
"op", op.kind, "block", op.blockID.String(), "attempts", op.attempts)
}

func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) {
// No point in proceeding if shutdown has been initiated since
// we won't be able to queue up the next flush op
if i.flushQueues.IsStopped() {
handleAbandonedOp(op)
return false, nil
}

level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
return false, err
}

err = instance.CompleteBlock(op.blockID)
if err != nil {
handleFailedOp(op, err)

if op.attempts >= maxCompleteAttempts {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS",
"userID", op.userID, "attempts", op.attempts, "block", op.blockID.String())

// Delete WAL and move on
err = instance.ClearCompletingBlock(op.blockID)
return false, err
}

return true, nil
}

err = instance.ClearCompletingBlock(op.blockID)
if err != nil {
return false, errors.Wrap(err, "error clearing completing block")
}

// add a flushOp for the block we just completed
// No delay
i.enqueue(&flushOp{
kind: opKindFlush,
userID: instance.instanceID,
blockID: op.blockID,
}, false)

return false, nil
}

func (i *Ingester) handleFlush(userID string, blockID uuid.UUID) (retry bool, err error) {
level.Info(log.Logger).Log("msg", "flushing block", "userid", userID, "block", blockID.String())

instance, err := i.getOrCreateInstance(userID)
if err != nil {
return err
return true, err
}

if instance == nil {
return fmt.Errorf("instance id %s not found", userID)
return false, fmt.Errorf("instance id %s not found", userID)
}

if block := instance.GetBlockToBeFlushed(blockID); block != nil {
Expand All @@ -264,23 +273,15 @@ func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
err = i.store.WriteBlock(ctx, block)
metricFlushDuration.Observe(time.Since(start).Seconds())
if err != nil {
return err
}

// Delete original wal only after successful flush
err = instance.ClearCompletingBlock(blockID)
if err != nil {
// Error deleting wal doesn't fail the flush
level.Error(log.Logger).Log("msg", "Error clearing wal", "userID", userID, "blockID", blockID.String(), "err", err)
metricFailedFlushes.Inc()
return true, err
}

metricBlocksFlushed.Inc()
} else {
return fmt.Errorf("error getting block to flush")
return false, fmt.Errorf("error getting block to flush")
}

return nil
return false, nil
}

func (i *Ingester) enqueue(op *flushOp, jitter bool) {
Expand Down
54 changes: 49 additions & 5 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package ingester

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb/backend/local"
tempodb_wal "github.com/grafana/tempo/tempodb/wal"
)

Expand All @@ -48,6 +48,7 @@ type Ingester struct {

lifecycler *ring.Lifecycler
store storage.Store
local *local.Backend

flushQueues *flushqueues.ExclusiveQueues
flushQueuesDone sync.WaitGroup
Expand All @@ -66,16 +67,18 @@ func New(cfg Config, store storage.Store, limits *overrides.Overrides) (*Ingeste
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
}

i.local = store.WAL().LocalBackend()

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
for j := 0; j < cfg.ConcurrentFlushes; j++ {
go i.flushLoop(j)
}

var err error
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, prometheus.DefaultRegisterer)
lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("NewLifecycler failed %w", err)
}
i.lifecycler = lc

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
Expand All @@ -94,6 +97,11 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("failed to replay wal %w", err)
}

err = i.rediscoverLocalBlocks()
if err != nil {
return fmt.Errorf("failed to rediscover local blocks %w", err)
}

// Now that user states have been created, we can start the lifecycler.
// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
Expand Down Expand Up @@ -236,7 +244,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (*instance, error) {
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(instanceID, i.limiter, i.store)
inst, err = newInstance(instanceID, i.limiter, i.store, i.local)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -351,3 +359,39 @@ func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock) error {

return nil
}

func (i *Ingester) rediscoverLocalBlocks() error {
ctx := context.TODO()

tenants, err := i.local.Tenants(ctx)
if err != nil {
return errors.Wrap(err, "getting local tenants")
}

level.Info(log.Logger).Log("msg", "reloading local blocks", "tenants", len(tenants))

for _, t := range tenants {
inst, err := i.getOrCreateInstance(t)
if err != nil {
return err
}

err = inst.rediscoverLocalBlocks(ctx)
if err != nil {
return errors.Wrapf(err, "getting local blocks for tenant %v", t)
}

// Requeue needed flushes
for _, b := range inst.completeBlocks {
if b.FlushedTime().IsZero() {
i.enqueue(&flushOp{
kind: opKindFlush,
userID: t,
blockID: b.BlockMeta().BlockID,
}, true)
}
}
}

return nil
}
Loading

0 comments on commit 93c378a

Please sign in to comment.