Skip to content

Commit

Permalink
Switch compactor to new BasicLifecycler with AutoForgetDelegate (#1178)
Browse files Browse the repository at this point in the history
* Switch compactor to new BasicLifecycler with AutoForgetDelegate, and other changes

* cleanup

* changelog

* Use const
  • Loading branch information
mdisibio authored Dec 16, 2021
1 parent f4100f5 commit 9f0a021
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [ENHANCEMENT] jsonnet: set rollingUpdate.maxSurge to 3 for distributor, frontend and queriers [#1164](https://github.com/grafana/tempo/pull/1164) (@kvrhdn)
* [ENHANCEMENT] Reduce search data file sizes by optimizing contents [#1165](https://github.com/grafana/tempo/pull/1165) (@mdisibio)
* [ENHANCEMENT] Add `tempo_ingester_live_traces` metric [#1170](https://github.com/grafana/tempo/pull/1170) (@mdisibio)
* [ENHANCEMENT] Update compactor ring to automatically forget unhealthy entries [#1178](https://github.com/grafana/tempo/pull/1178) (@mdisibio)
* [BUGFIX] Add process name to vulture traces to work around display issues [#1127](https://github.com/grafana/tempo/pull/1127) (@mdisibio)
* [BUGFIX] Fixed issue where compaction sometimes dropped spans. [#1130](https://github.com/grafana/tempo/pull/1130) (@joe-elliott)
* [BUGFIX] Ensure that the admin client jsonnet has correct S3 bucket property. (@hedss)
Expand Down
169 changes: 118 additions & 51 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
Expand All @@ -19,7 +20,17 @@ import (
)

const (
waitOnStartup = 90 * time.Second
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 2

// We use a safe default instead of exposing to config option to the user
// in order to simplify the config.
ringNumTokens = 512
)

var (
ringOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
)

type Compactor struct {
Expand All @@ -30,7 +41,7 @@ type Compactor struct {
overrides *overrides.Overrides

// Ring used for sharding compactions.
ringLifecycler *ring.Lifecycler
ringLifecycler *ring.BasicLifecycler
Ring *ring.Ring

subservices *services.Manager
Expand All @@ -45,50 +56,94 @@ func New(cfg Config, store storage.Store, overrides *overrides.Overrides, reg pr
overrides: overrides,
}

subservices := []services.Service(nil)
if c.isSharded() {
lifecyclerCfg := c.cfg.ShardingRing.ToLifecyclerConfig()
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", cfg.OverrideRingKey, false, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
reg = prometheus.WrapRegistererWithPrefix("cortex_", reg)

lifecyclerStore, err := kv.NewClient(
cfg.ShardingRing.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(reg, ring.CompactorRingKey+"-lifecycler"),
log.Logger,
)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
return nil, err
}
c.ringLifecycler = lifecycler
subservices = append(subservices, c.ringLifecycler)

ring, err := ring.New(lifecyclerCfg.RingConfig, "compactor", cfg.OverrideRingKey, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
delegate := ring.BasicLifecyclerDelegate(c)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log.Logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.ShardingRing.HeartbeatTimeout, delegate, log.Logger)

bcfg, err := toBasicLifecyclerConfig(cfg.ShardingRing, log.Logger)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring")
return nil, err
}
c.Ring = ring
subservices = append(subservices, c.Ring)

c.subservices, err = services.NewManager(subservices...)
c.ringLifecycler, err = ring.NewBasicLifecycler(bcfg, ring.CompactorRingKey, cfg.OverrideRingKey, lifecyclerStore, delegate, log.Logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create subservices %w", err)
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

c.Ring, err = ring.New(c.cfg.ShardingRing.ToLifecyclerConfig().RingConfig, ring.CompactorRingKey, cfg.OverrideRingKey, log.Logger, reg)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring")
}
c.subservicesWatcher = services.NewFailureWatcher()
c.subservicesWatcher.WatchManager(c.subservices)
}

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)

return c, nil
}

func (c *Compactor) starting(ctx context.Context) error {
if c.subservices != nil {
func (c *Compactor) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || c.subservices == nil {
return
}

if stopErr := services.StopManagerAndAwaitStopped(context.Background(), c.subservices); stopErr != nil {
level.Error(log.Logger).Log("msg", "failed to gracefully stop compactor dependencies", "err", stopErr)
}
}()

if c.isSharded() {
c.subservices, err = services.NewManager(c.ringLifecycler, c.Ring)
if err != nil {
return fmt.Errorf("failed to create subservices %w", err)
}
c.subservicesWatcher = services.NewFailureWatcher()
c.subservicesWatcher.WatchManager(c.subservices)

err := services.StartManagerAndAwaitHealthy(ctx, c.subservices)
if err != nil {
return fmt.Errorf("failed to start subservices %w", err)
}

ctx := context.Background()

level.Info(log.Logger).Log("msg", "waiting to be active in the ring")
err = c.waitRingActive(ctx)
if err != nil {
// Wait until the ring client detected this instance in the ACTIVE state.
level.Info(log.Logger).Log("msg", "waiting until compactor is ACTIVE in the ring")
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.cfg.ShardingRing.WaitActiveInstanceTimeout)
defer cancel()
if err := ring.WaitInstanceState(ctxWithTimeout, c.Ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(log.Logger).Log("msg", "compactor is ACTIVE in the ring")

// In the event of a cluster cold start we may end up in a situation where each new compactor
// instance starts at a slightly different time and thus each one starts with a different state
// of the ring. It's better to just wait the ring stability for a short time.
if c.cfg.ShardingRing.WaitStabilityMinDuration > 0 {
minWaiting := c.cfg.ShardingRing.WaitStabilityMinDuration
maxWaiting := c.cfg.ShardingRing.WaitStabilityMaxDuration

level.Info(log.Logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
if err := ring.WaitRingStability(ctx, c.Ring, ringOp, minWaiting, maxWaiting); err != nil {
level.Warn(log.Logger).Log("msg", "compactor ring topology is not stable after the max waiting time, proceeding anyway")
} else {
level.Info(log.Logger).Log("msg", "compactor ring topology is stable")
}
}
}

// this will block until one poll cycle is complete
Expand All @@ -98,12 +153,8 @@ func (c *Compactor) starting(ctx context.Context) error {
}

func (c *Compactor) running(ctx context.Context) error {
go func() {
level.Info(log.Logger).Log("msg", "waiting for compaction ring to settle", "waitDuration", waitOnStartup)
time.Sleep(waitOnStartup)
level.Info(log.Logger).Log("msg", "enabling compaction")
c.store.EnableCompaction(&c.cfg.Compactor, c, c)
}()
level.Info(log.Logger).Log("msg", "enabling compaction")
c.store.EnableCompaction(&c.cfg.Compactor, c, c)

if c.subservices != nil {
select {
Expand Down Expand Up @@ -140,7 +191,7 @@ func (c *Compactor) Owns(hash string) bool {
_, _ = hasher.Write([]byte(hash))
hash32 := hasher.Sum32()

rs, err := c.Ring.Get(hash32, ring.Read, []ring.InstanceDesc{}, nil, nil)
rs, err := c.Ring.Get(hash32, ringOp, []ring.InstanceDesc{}, nil, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "failed to get ring", "err", err)
return false
Expand All @@ -151,9 +202,11 @@ func (c *Compactor) Owns(hash string) bool {
return false
}

level.Debug(log.Logger).Log("msg", "checking addresses", "owning_addr", rs.Instances[0].Addr, "this_addr", c.ringLifecycler.Addr)
ringAddr := c.ringLifecycler.GetInstanceAddr()

level.Debug(log.Logger).Log("msg", "checking addresses", "owning_addr", rs.Instances[0].Addr, "this_addr", ringAddr)

return rs.Instances[0].Addr == c.ringLifecycler.Addr
return rs.Instances[0].Addr == ringAddr
}

// Combine implements common.ObjectCombiner
Expand All @@ -166,27 +219,41 @@ func (c *Compactor) BlockRetentionForTenant(tenantID string) time.Duration {
return c.overrides.BlockRetention(tenantID)
}

func (c *Compactor) waitRingActive(ctx context.Context) error {
for {
// Check if the ingester is ACTIVE in the ring and our ring client
// has detected it.
if rs, err := c.Ring.GetAllHealthy(ring.Reporting); err == nil {
for _, i := range rs.Instances {
if i.GetAddr() == c.ringLifecycler.Addr && i.GetState() == ring.ACTIVE {
return nil
}
}
}
func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}

select {
case <-time.After(time.Second):
// Nothing to do
case <-ctx.Done():
return ctx.Err()
}
// OnRingInstanceRegister is called while the lifecycler is registering the
// instance within the ring and should return the state and set of tokens to
// use for the instance itself.
func (c *Compactor) OnRingInstanceRegister(lifecycler *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the compactor instance in the ring we want to start from
// a clean situation, so whatever is the state we set it ACTIVE, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}

takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

return ring.ACTIVE, tokens
}

func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
// OnRingInstanceTokens is called once the instance tokens are set and are
// stable within the ring (honoring the observe period, if set).
func (c *Compactor) OnRingInstanceTokens(lifecycler *ring.BasicLifecycler, tokens ring.Tokens) {}

// OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler
// will continue to hearbeat the ring the this function is executing and will proceed
// to unregister the instance from the ring only after this function has returned.
func (c *Compactor) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {}

// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
// in the ring.
func (c *Compactor) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) {
}
18 changes: 18 additions & 0 deletions modules/compactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package compactor

import (
"flag"
"fmt"
"time"

cortex_compactor "github.com/cortexproject/cortex/pkg/compactor"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/ring"
"github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -37,3 +39,19 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.DurationVar(&cfg.Compactor.MaxCompactionRange, util.PrefixConfig(prefix, "compaction.compaction-window"), time.Hour, "Maximum time window across which to compact blocks.")
cfg.OverrideRingKey = ring.CompactorRingKey
}

func toBasicLifecyclerConfig(cfg cortex_compactor.RingConfig, logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}

instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)

return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
HeartbeatPeriod: cfg.HeartbeatPeriod,
NumTokens: ringNumTokens,
}, nil
}

0 comments on commit 9f0a021

Please sign in to comment.