Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing renenerate ingester tokens #6063

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [ENHANCEMENT] Upgrade Alpine to 3.19. #6014
* [ENHANCEMENT] Upgrade go to 1.21.11 #6014
* [ENHANCEMENT] Ingester: Add a new experimental `-ingester.labels-string-interning-enabled` flag to enable string interning for metrics labels. #6057
* [ENHANCEMENT] Ingester: Add link to renew 10% of the ingesters tokens in the admin page. #6063
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ type Ingester interface {
client.IngesterServer
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
RenewTokenHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}

Expand All @@ -292,8 +293,10 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {

a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage")
a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)")
a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)")
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

// Legacy Routes
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,11 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
}
}

func (i *Ingester) RenewTokenHandler(w http.ResponseWriter, r *http.Request) {
i.lifecycler.RenewTokens(0.1, r.Context())
w.WriteHeader(http.StatusNoContent)
}

// ShutdownHandler triggers the following set of operations in order:
// - Change the state of ring to stop accepting writes.
// - Flush all the chunks.
Expand Down
46 changes: 46 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,52 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return err
}

func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context) {
if ratio > 1 {
ratio = 1
}
err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return in, false, nil
}

ringDesc := in.(*Desc)
_, ok := ringDesc.Ingesters[i.ID]

if !ok {
return in, false, nil
}

tokensToBeRenewed := int(float64(i.cfg.NumTokens) * ratio)
ringTokens, _ := ringDesc.TokensFor(i.ID)

// Removing random tokens
for i := 0; i < tokensToBeRenewed; i++ {
if len(ringTokens) == 0 {
break
}
index := mathrand.Int() % len(ringTokens)
ringTokens = append(ringTokens[:index], ringTokens[index+1:]...)
}

needTokens := i.cfg.NumTokens - len(ringTokens)
level.Info(i.logger).Log("msg", "renewing new tokens", "count", needTokens, "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt())
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens, true)

ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt())
i.setTokens(ringTokens)
return ringDesc, true, nil
})

if err != nil {
level.Error(i.logger).Log("msg", "failed to regenerate tokens", "ring", i.RingName, "err", err)
}
}

// Verifies that tokens that this ingester has registered to the ring still belong to it.
// Gossiping ring may change the ownership of tokens in case of conflicts.
// If ingester doesn't own its tokens anymore, this method generates new tokens and puts them to the ring.
Expand Down
44 changes: 44 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -75,6 +76,49 @@ func TestLifecycler_JoinShouldNotBlock(t *testing.T) {
}
}

func TestLifecycler_RenewTokens(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

ctx := context.Background()
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig.NumTokens = 512

l1, err := NewLifecycler(lifecyclerConfig, &nopFlushTransferer{}, "ingester", ringKey, true, true, log.NewNopLogger(), nil)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(ctx, l1))
defer services.StopAndAwaitTerminated(ctx, l1) // nolint:errcheck

waitRingInstance(t, 3*time.Second, l1, func(instance InstanceDesc) error {
if instance.State != ACTIVE {
return errors.New("should be active")
}
return nil
})

originalTokens := l1.getTokens()
require.Len(t, originalTokens, 512)
require.IsIncreasing(t, originalTokens)
l1.RenewTokens(0.1, ctx)
newTokens := l1.getTokens()
require.Len(t, newTokens, 512)
require.IsIncreasing(t, newTokens)
diff := 0
for i := 0; i < len(originalTokens); i++ {
if !slices.Contains(originalTokens, newTokens[i]) {
diff++
}
}

require.Equal(t, 51, diff)
}

func TestLifecycler_DefferedJoin(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
Loading