diff --git a/CHANGELOG.md b/CHANGELOG.md index 71cd8163b4..d3bf00f4fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [ENHANCEMENT] Upgrade Alpine to 3.19. #6014 * [ENHANCEMENT] Upgrade go to 1.21.11 #6014 * [ENHANCEMENT] Ingester: Add a new experimental `-ingester.labels-string-interning-enabled` flag to enable string interning for metrics labels. #6057 +* [ENHANCEMENT] Ingester: Add link to renew 10% of the ingesters tokens in the admin page. #6063 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 * [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018 diff --git a/pkg/api/api.go b/pkg/api/api.go index 03e6c77801..97c7c44095 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -283,6 +283,7 @@ type Ingester interface { client.IngesterServer FlushHandler(http.ResponseWriter, *http.Request) ShutdownHandler(http.ResponseWriter, *http.Request) + RenewTokenHandler(http.ResponseWriter, *http.Request) Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) } @@ -292,8 +293,10 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage") a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)") + a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)") a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") + a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. // Legacy Routes diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3006dd7969..350c9bee85 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -971,6 +971,11 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { } } +func (i *Ingester) RenewTokenHandler(w http.ResponseWriter, r *http.Request) { + i.lifecycler.RenewTokens(0.1, r.Context()) + w.WriteHeader(http.StatusNoContent) +} + // ShutdownHandler triggers the following set of operations in order: // - Change the state of ring to stop accepting writes. // - Flush all the chunks. diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index fbd3159ca9..d4f1e5735b 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -706,6 +706,52 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return err } +func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context) { + if ratio > 1 { + ratio = 1 + } + err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return in, false, nil + } + + ringDesc := in.(*Desc) + _, ok := ringDesc.Ingesters[i.ID] + + if !ok { + return in, false, nil + } + + tokensToBeRenewed := int(float64(i.cfg.NumTokens) * ratio) + ringTokens, _ := ringDesc.TokensFor(i.ID) + + // Removing random tokens + for i := 0; i < tokensToBeRenewed; i++ { + if len(ringTokens) == 0 { + break + } + index := mathrand.Int() % len(ringTokens) + ringTokens = append(ringTokens[:index], ringTokens[index+1:]...) + } + + needTokens := i.cfg.NumTokens - len(ringTokens) + level.Info(i.logger).Log("msg", "renewing new tokens", "count", needTokens, "ring", i.RingName) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt()) + newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens, true) + + ringTokens = append(ringTokens, newTokens...) + sort.Sort(ringTokens) + + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt()) + i.setTokens(ringTokens) + return ringDesc, true, nil + }) + + if err != nil { + level.Error(i.logger).Log("msg", "failed to regenerate tokens", "ring", i.RingName, "err", err) + } +} + // Verifies that tokens that this ingester has registered to the ring still belong to it. // Gossiping ring may change the ownership of tokens in case of conflicts. // If ingester doesn't own its tokens anymore, this method generates new tokens and puts them to the ring. diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 51d07028b7..0b8a6402db 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -75,6 +76,49 @@ func TestLifecycler_JoinShouldNotBlock(t *testing.T) { } } +func TestLifecycler_RenewTokens(t *testing.T) { + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + + ctx := context.Background() + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig.NumTokens = 512 + + l1, err := NewLifecycler(lifecyclerConfig, &nopFlushTransferer{}, "ingester", ringKey, true, true, log.NewNopLogger(), nil) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(ctx, l1)) + defer services.StopAndAwaitTerminated(ctx, l1) // nolint:errcheck + + waitRingInstance(t, 3*time.Second, l1, func(instance InstanceDesc) error { + if instance.State != ACTIVE { + return errors.New("should be active") + } + return nil + }) + + originalTokens := l1.getTokens() + require.Len(t, originalTokens, 512) + require.IsIncreasing(t, originalTokens) + l1.RenewTokens(0.1, ctx) + newTokens := l1.getTokens() + require.Len(t, newTokens, 512) + require.IsIncreasing(t, newTokens) + diff := 0 + for i := 0; i < len(originalTokens); i++ { + if !slices.Contains(originalTokens, newTokens[i]) { + diff++ + } + } + + require.Equal(t, 51, diff) +} + func TestLifecycler_DefferedJoin(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) })