From 47673b3f889f32764d60e9689951a37f86bc3a6f Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 15 Dec 2021 14:52:09 -0500 Subject: [PATCH 1/4] Switch compactor to new BasicLifecycler with AutoForgetDelegate, and other changes --- modules/compactor/compactor.go | 170 +++++++++++++++++++++++---------- modules/compactor/config.go | 18 ++++ 2 files changed, 137 insertions(+), 51 deletions(-) diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index 019515ce6b2..c168aed5330 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -8,6 +8,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/pkg/errors" @@ -19,7 +20,17 @@ import ( ) const ( - waitOnStartup = 90 * time.Second + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed. + ringAutoForgetUnhealthyPeriods = 2 + + // We use a safe default instead of exposing to config option to the user + // in order to simplify the config. + ringNumTokens = 512 +) + +var ( + ringOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) ) type Compactor struct { @@ -30,7 +41,7 @@ type Compactor struct { overrides *overrides.Overrides // Ring used for sharding compactions. - ringLifecycler *ring.Lifecycler + ringLifecycler *ring.BasicLifecycler Ring *ring.Ring subservices *services.Manager @@ -45,29 +56,38 @@ func New(cfg Config, store storage.Store, overrides *overrides.Overrides, reg pr overrides: overrides, } - subservices := []services.Service(nil) if c.isSharded() { - lifecyclerCfg := c.cfg.ShardingRing.ToLifecyclerConfig() - lifecycler, err := ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", cfg.OverrideRingKey, false, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) + + lifecyclerStore, err := kv.NewClient( + cfg.ShardingRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(reg, ring.CompactorRingKey+"-lifecycler"), + log.Logger, + ) + //lifecyclerStore, err := tempo_ring.NewKvClient(cfg.ShardingRing.KVStore, ring.CompactorRingKey+"-lifecycler", reg) if err != nil { - return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler") + return nil, err } - c.ringLifecycler = lifecycler - subservices = append(subservices, c.ringLifecycler) - ring, err := ring.New(lifecyclerCfg.RingConfig, "compactor", cfg.OverrideRingKey, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + delegate := ring.BasicLifecyclerDelegate(c) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, log.Logger) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.ShardingRing.HeartbeatTimeout, delegate, log.Logger) + + bcfg, err := toBasicLifecyclerConfig(cfg.ShardingRing, log.Logger) if err != nil { - return nil, errors.Wrap(err, "unable to initialize compactor ring") + return nil, err } - c.Ring = ring - subservices = append(subservices, c.Ring) - c.subservices, err = services.NewManager(subservices...) + c.ringLifecycler, err = ring.NewBasicLifecycler(bcfg, ring.CompactorRingKey, cfg.OverrideRingKey, lifecyclerStore, delegate, log.Logger, reg) if err != nil { - return nil, fmt.Errorf("failed to create subservices %w", err) + return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler") + } + + c.Ring, err = ring.New(c.cfg.ShardingRing.ToLifecyclerConfig().RingConfig, ring.CompactorRingKey, cfg.OverrideRingKey, log.Logger, reg) + if err != nil { + return nil, errors.Wrap(err, "unable to initialize compactor ring") } - c.subservicesWatcher = services.NewFailureWatcher() - c.subservicesWatcher.WatchManager(c.subservices) } c.Service = services.NewBasicService(c.starting, c.running, c.stopping) @@ -75,20 +95,56 @@ func New(cfg Config, store storage.Store, overrides *overrides.Overrides, reg pr return c, nil } -func (c *Compactor) starting(ctx context.Context) error { - if c.subservices != nil { +func (c *Compactor) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || c.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), c.subservices); stopErr != nil { + level.Error(log.Logger).Log("msg", "failed to gracefully stop compactor dependencies", "err", stopErr) + } + }() + + if c.isSharded() { + c.subservices, err = services.NewManager(c.ringLifecycler, c.Ring) + if err != nil { + return fmt.Errorf("failed to create subservices %w", err) + } + c.subservicesWatcher = services.NewFailureWatcher() + c.subservicesWatcher.WatchManager(c.subservices) + err := services.StartManagerAndAwaitHealthy(ctx, c.subservices) if err != nil { return fmt.Errorf("failed to start subservices %w", err) } - ctx := context.Background() - - level.Info(log.Logger).Log("msg", "waiting to be active in the ring") - err = c.waitRingActive(ctx) - if err != nil { + // Wait until the ring client detected this instance in the ACTIVE state. + level.Info(log.Logger).Log("msg", "waiting until compactor is ACTIVE in the ring") + ctxWithTimeout, cancel := context.WithTimeout(ctx, c.cfg.ShardingRing.WaitActiveInstanceTimeout) + defer cancel() + if err := ring.WaitInstanceState(ctxWithTimeout, c.Ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { return err } + level.Info(log.Logger).Log("msg", "compactor is ACTIVE in the ring") + + // In the event of a cluster cold start we may end up in a situation where each new compactor + // instance starts at a slightly different time and thus each one starts with a different state + // of the ring. It's better to just wait the ring stability for a short time. + if c.cfg.ShardingRing.WaitStabilityMinDuration > 0 { + minWaiting := c.cfg.ShardingRing.WaitStabilityMinDuration + maxWaiting := c.cfg.ShardingRing.WaitStabilityMaxDuration + + level.Info(log.Logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String()) + if err := ring.WaitRingStability(ctx, c.Ring, ringOp, minWaiting, maxWaiting); err != nil { + level.Warn(log.Logger).Log("msg", "compactor ring topology is not stable after the max waiting time, proceeding anyway") + } else { + level.Info(log.Logger).Log("msg", "compactor ring topology is stable") + } + } } // this will block until one poll cycle is complete @@ -98,12 +154,8 @@ func (c *Compactor) starting(ctx context.Context) error { } func (c *Compactor) running(ctx context.Context) error { - go func() { - level.Info(log.Logger).Log("msg", "waiting for compaction ring to settle", "waitDuration", waitOnStartup) - time.Sleep(waitOnStartup) - level.Info(log.Logger).Log("msg", "enabling compaction") - c.store.EnableCompaction(&c.cfg.Compactor, c, c) - }() + level.Info(log.Logger).Log("msg", "enabling compaction") + c.store.EnableCompaction(&c.cfg.Compactor, c, c) if c.subservices != nil { select { @@ -140,7 +192,7 @@ func (c *Compactor) Owns(hash string) bool { _, _ = hasher.Write([]byte(hash)) hash32 := hasher.Sum32() - rs, err := c.Ring.Get(hash32, ring.Read, []ring.InstanceDesc{}, nil, nil) + rs, err := c.Ring.Get(hash32, ringOp, []ring.InstanceDesc{}, nil, nil) if err != nil { level.Error(log.Logger).Log("msg", "failed to get ring", "err", err) return false @@ -151,9 +203,11 @@ func (c *Compactor) Owns(hash string) bool { return false } - level.Debug(log.Logger).Log("msg", "checking addresses", "owning_addr", rs.Instances[0].Addr, "this_addr", c.ringLifecycler.Addr) + ringAddr := c.ringLifecycler.GetInstanceAddr() + + level.Debug(log.Logger).Log("msg", "checking addresses", "owning_addr", rs.Instances[0].Addr, "this_addr", ringAddr) - return rs.Instances[0].Addr == c.ringLifecycler.Addr + return rs.Instances[0].Addr == ringAddr } // Combine implements common.ObjectCombiner @@ -166,27 +220,41 @@ func (c *Compactor) BlockRetentionForTenant(tenantID string) time.Duration { return c.overrides.BlockRetention(tenantID) } -func (c *Compactor) waitRingActive(ctx context.Context) error { - for { - // Check if the ingester is ACTIVE in the ring and our ring client - // has detected it. - if rs, err := c.Ring.GetAllHealthy(ring.Reporting); err == nil { - for _, i := range rs.Instances { - if i.GetAddr() == c.ringLifecycler.Addr && i.GetState() == ring.ACTIVE { - return nil - } - } - } +func (c *Compactor) isSharded() bool { + return c.cfg.ShardingRing.KVStore.Store != "" +} - select { - case <-time.After(time.Second): - // Nothing to do - case <-ctx.Done(): - return ctx.Err() - } +// OnRingInstanceRegister is called while the lifecycler is registering the +// instance within the ring and should return the state and set of tokens to +// use for the instance itself. +func (c *Compactor) OnRingInstanceRegister(lifecycler *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { + // When we initialize the compactor instance in the ring we want to start from + // a clean situation, so whatever is the state we set it ACTIVE, while we keep existing + // tokens (if any) or the ones loaded from file. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() } + + takenTokens := ringDesc.GetTokens() + newTokens := ring.GenerateTokens(512-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.ACTIVE, tokens } -func (c *Compactor) isSharded() bool { - return c.cfg.ShardingRing.KVStore.Store != "" +// OnRingInstanceTokens is called once the instance tokens are set and are +// stable within the ring (honoring the observe period, if set). +func (c *Compactor) OnRingInstanceTokens(lifecycler *ring.BasicLifecycler, tokens ring.Tokens) {} + +// OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler +// will continue to hearbeat the ring the this function is executing and will proceed +// to unregister the instance from the ring only after this function has returned. +func (c *Compactor) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {} + +// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat +// in the ring. +func (c *Compactor) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) { } diff --git a/modules/compactor/config.go b/modules/compactor/config.go index 6d19f0ad99e..32d6868f3f3 100644 --- a/modules/compactor/config.go +++ b/modules/compactor/config.go @@ -2,9 +2,11 @@ package compactor import ( "flag" + "fmt" "time" cortex_compactor "github.com/cortexproject/cortex/pkg/compactor" + "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/ring" "github.com/grafana/tempo/pkg/util" @@ -37,3 +39,19 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.DurationVar(&cfg.Compactor.MaxCompactionRange, util.PrefixConfig(prefix, "compaction.compaction-window"), time.Hour, "Maximum time window across which to compact blocks.") cfg.OverrideRingKey = ring.CompactorRingKey } + +func toBasicLifecyclerConfig(cfg cortex_compactor.RingConfig, logger log.Logger) (ring.BasicLifecyclerConfig, error) { + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger) + if err != nil { + return ring.BasicLifecyclerConfig{}, err + } + + instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + + return ring.BasicLifecyclerConfig{ + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + HeartbeatPeriod: cfg.HeartbeatPeriod, + NumTokens: ringNumTokens, + }, nil +} From 302a9d96d6ee25ef8f548a371f2ae6f5f2ee23ea Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 15 Dec 2021 15:00:36 -0500 Subject: [PATCH 2/4] cleanup --- modules/compactor/compactor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index c168aed5330..a0672e03b0f 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -65,7 +65,6 @@ func New(cfg Config, store storage.Store, overrides *overrides.Overrides, reg pr kv.RegistererWithKVName(reg, ring.CompactorRingKey+"-lifecycler"), log.Logger, ) - //lifecyclerStore, err := tempo_ring.NewKvClient(cfg.ShardingRing.KVStore, ring.CompactorRingKey+"-lifecycler", reg) if err != nil { return nil, err } From 20d263070a4a86149c71c41b61efee7905c26395 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 15 Dec 2021 15:25:25 -0500 Subject: [PATCH 3/4] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 311d2a27c01..4c4fee459c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [ENHANCEMENT] jsonnet: set rollingUpdate.maxSurge to 3 for distributor, frontend and queriers [#1164](https://github.com/grafana/tempo/pull/1164) (@kvrhdn) * [ENHANCEMENT] Reduce search data file sizes by optimizing contents [#1165](https://github.com/grafana/tempo/pull/1165) (@mdisibio) * [ENHANCEMENT] Add `tempo_ingester_live_traces` metric [#1170](https://github.com/grafana/tempo/pull/1170) (@mdisibio) +* [ENHANCEMENT] Update compactor ring to automatically forget unhealthy entries [#1178](https://github.com/grafana/tempo/pull/1178) (@mdisibio) * [BUGFIX] Add process name to vulture traces to work around display issues [#1127](https://github.com/grafana/tempo/pull/1127) (@mdisibio) * [BUGFIX] Fixed issue where compaction sometimes dropped spans. [#1130](https://github.com/grafana/tempo/pull/1130) (@joe-elliott) * [BUGFIX] Ensure that the admin client jsonnet has correct S3 bucket property. (@hedss) From 50c58586d915c1c0bad7201d8e484e26f64dbe6d Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 16 Dec 2021 07:18:24 -0500 Subject: [PATCH 4/4] Use const --- modules/compactor/compactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index a0672e03b0f..bcbfb7765b5 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -236,7 +236,7 @@ func (c *Compactor) OnRingInstanceRegister(lifecycler *ring.BasicLifecycler, rin } takenTokens := ringDesc.GetTokens() - newTokens := ring.GenerateTokens(512-len(tokens), takenTokens) + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) // Tokens sorting will be enforced by the parent caller. tokens = append(tokens, newTokens...)