Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ring.Lifecycler: Handle when previous ring state is leaving and the number of tokens has changed #79

Merged
merged 25 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,4 @@
* [BUGFIX] Ring status page: fixed the owned tokens percentage value displayed. #282
* [BUGFIX] Ring: prevent iterating the whole ring when using `ExcludedZones`. #285
* [BUGFIX] grpcclient: fix missing `.` in flag name for initial connection window size flag. #314
* [BUGFIX] ring.Lifecycler: Handle when previous ring state is leaving and the number of tokens has changed. #79
42 changes: 31 additions & 11 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -616,23 +617,42 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return ringDesc, true, nil
}

// If the ingester failed to clean its ring entry up it can leave its state in LEAVING
// OR unregister_on_shutdown=false
// Move it into ACTIVE to ensure the ingester joins the ring.
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens {
tokens := Tokens(instanceDesc.Tokens)
level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens",
len(tokens), "ring", i.RingName)

// If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its
// ring state as LEAVING. Make sure to switch to the ACTIVE state.
if instanceDesc.State == LEAVING {
delta := i.cfg.NumTokens - len(tokens)
if delta > 0 {
// We need more tokens
level.Info(i.logger).Log("msg", "existing instance has too few tokens, adding difference",
"current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens)
newTokens := GenerateTokens(delta, ringDesc.GetTokens())
tokens = append(tokens, newTokens...)
sort.Sort(tokens)
} else if delta < 0 {
// We have too many tokens
level.Info(i.logger).Log("msg", "existing instance has too many tokens, removing difference",
"current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens)
// Make sure we don't pick the N smallest tokens, since that would increase the chance of the instance receiving only smaller hashes.
rand.Shuffle(len(tokens), tokens.Swap)
tokens = tokens[0:i.cfg.NumTokens]
sort.Sort(tokens)
}

instanceDesc.State = ACTIVE
instanceDesc.Tokens = tokens
}

// We're taking over this entry, update instanceDesc with our values
instanceDesc.Addr = i.Addr
instanceDesc.Zone = i.Zone

// We exist in the ring, so assume the ring is right and copy out tokens & state out of there.
// Set the local state based on the updated instance.
i.setState(instanceDesc.State)
tokens, _ := ringDesc.TokensFor(i.ID)
i.setTokens(tokens)

level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName)
// We're taking over this entry, update instanceDesc with our values
instanceDesc.Addr = i.Addr
instanceDesc.Zone = i.Zone

// Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat
// can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady.
Expand Down
158 changes: 158 additions & 0 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,164 @@ func TestLifecycler_HeartbeatAfterBackendReset(t *testing.T) {
assert.Equal(t, prevTokens, Tokens(desc.GetTokens()))
}

// Test Lifecycler when increasing tokens and instance is already in the ring in leaving state.
func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) {
ctx := context.Background()

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

const numTokens = 128

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, r))
})

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
// Make sure changes are applied instantly
lifecyclerConfig.HeartbeatPeriod = 0
lifecyclerConfig.NumTokens = numTokens

// Simulate ingester with 64 tokens left the ring in LEAVING state
origTokens := GenerateTokens(64, nil)
err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6)
if err != nil {
return nil, false, err
}

ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, origTokens, LEAVING, time.Now())
return ringDesc, false, nil
})
require.NoError(t, err)

// Start ingester with increased number of tokens
l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, l))
})

// Verify ingester joined, is active, and has 128 tokens
var ingDesc InstanceDesc
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
require.True(t, ok)
ingDesc = desc.Ingesters["ing1"]
t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens),
"state", ingDesc.State, "tokens", len(ingDesc.Tokens))
return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens
})

origSeen := 0
for _, ot := range origTokens {
for _, tok := range ingDesc.Tokens {
if tok == ot {
origSeen++
break
}
}
}
assert.Equal(t, len(origTokens), origSeen, "original tokens should be kept")

assert.True(t, sort.SliceIsSorted(ingDesc.Tokens, func(i, j int) bool {
return ingDesc.Tokens[i] < ingDesc.Tokens[j]
}), "tokens should be sorted")
}

// Test Lifecycler when decreasing tokens and instance is already in the ring in leaving state.
func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) {
ctx := context.Background()

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

const numTokens = 64

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, r))
})

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
// Make sure changes are applied instantly
lifecyclerConfig.HeartbeatPeriod = 0
lifecyclerConfig.NumTokens = numTokens

// Simulate ingester with 128 tokens left the ring in LEAVING state
origTokens := GenerateTokens(128, nil)
err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6)
if err != nil {
return nil, false, err
}

ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, origTokens, LEAVING, time.Now())
return ringDesc, false, nil
})
require.NoError(t, err)

// Start ingester with decreased number of tokens
l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, l))
})

// Verify ingester joined, is active, and has 64 tokens
var ingDesc InstanceDesc
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
require.True(t, ok)
ingDesc = desc.Ingesters["ing1"]
t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens),
"state", ingDesc.State, "tokens", len(ingDesc.Tokens))
return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens
})

seen := map[uint32]struct{}{}
for _, tok := range ingDesc.Tokens {
// Guard against potential bug in token shuffling
_, exists := seen[tok]
require.False(t, exists, "tokens are not unique")
seen[tok] = struct{}{}

found := false
for _, ot := range origTokens {
if tok == ot {
found = true
break
}
}
require.True(t, found, "old tokens were not re-used")
}

assert.True(t, sort.SliceIsSorted(ingDesc.Tokens, func(i, j int) bool {
return ingDesc.Tokens[i] < ingDesc.Tokens[j]
}), "tokens should be sorted")
}

type MockClient struct {
ListFunc func(ctx context.Context, prefix string) ([]string, error)
GetFunc func(ctx context.Context, key string) (interface{}, error)
Expand Down