From 3884aa525c3fd5e2eb2bb09dc3bb523dd2cefe2c Mon Sep 17 00:00:00 2001 From: JordanRushing Date: Thu, 16 May 2024 15:53:54 -0500 Subject: [PATCH 1/2] Add healthyInstancesInZoneCount to Lifecycler; update tests Signed-off-by: JordanRushing --- ring/lifecycler.go | 23 +++-- ring/lifecycler_test.go | 183 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 195 insertions(+), 11 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 4f51b46a5..848d45b60 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -158,11 +158,12 @@ type Lifecycler struct { readySince time.Time // Keeps stats updated at every heartbeat period - countersLock sync.RWMutex - healthyInstancesCount int - instancesCount int - instancesInZoneCount int - zonesCount int + countersLock sync.RWMutex + healthyInstancesCount int + instancesCount int + healthyInstancesInZoneCount int + instancesInZoneCount int + zonesCount int tokenGenerator TokenGenerator // The maximum time allowed to wait on the CanJoin() condition. @@ -441,6 +442,15 @@ func (i *Lifecycler) InstancesCount() int { return i.instancesCount } +// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in +// this lifecycler's zone, updated during the last heartbeat period. +func (i *Lifecycler) HealthyInstancesInZoneCount() int { + i.countersLock.RLock() + defer i.countersLock.RUnlock() + + return i.healthyInstancesInZoneCount +} + // InstancesInZoneCount returns the number of instances in the ring that are registered in // this lifecycler's zone, updated during the last heartbeat period. func (i *Lifecycler) InstancesInZoneCount() int { @@ -913,6 +923,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { healthyInstancesCount := 0 instancesCount := 0 zones := map[string]int{} + healthyZones := map[string]int{} if ringDesc != nil { now := time.Now() @@ -924,6 +935,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { // Count the number of healthy instances for Write operation. if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { healthyInstancesCount++ + healthyZones[ingester.Zone]++ } } } @@ -932,6 +944,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { i.countersLock.Lock() i.healthyInstancesCount = healthyInstancesCount i.instancesCount = instancesCount + i.healthyInstancesInZoneCount = healthyZones[i.cfg.Zone] i.instancesInZoneCount = zones[i.cfg.Zone] i.zonesCount = len(zones) i.countersLock.Unlock() diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index e4ce79911..e3426ad06 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -160,6 +160,157 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { }) } +func TestLifecycler_HealthyInstancesInZoneCount(t *testing.T) { + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + + ctx := context.Background() + + // Add the first ingester to the ring + lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig1.JoinAfter = 100 * time.Millisecond + lifecyclerConfig1.Zone = "zone-a" + + lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler1.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1)) + defer services.StopAndAwaitTerminated(ctx, lifecycler1) // nolint:errcheck + + // Assert the first ingester joined the ring + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 1 + }) + + // Add the second ingester to the ring in the same zone + lifecyclerConfig2 := testLifecyclerConfig(ringConfig, "ing2") + lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig2.JoinAfter = 100 * time.Millisecond + lifecyclerConfig2.Zone = "zone-a" + + lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler2.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler2)) + defer services.StopAndAwaitTerminated(ctx, lifecycler2) // nolint:errcheck + + // Assert the second ingester joined the ring + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler2.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the first ingester count is updated + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 2 + }) + + // Add the third ingester to the ring in a different zone + lifecyclerConfig3 := testLifecyclerConfig(ringConfig, "ing3") + lifecyclerConfig3.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig3.JoinAfter = 100 * time.Millisecond + lifecyclerConfig3.Zone = "zone-b" + + lifecycler3, err := NewLifecycler(lifecyclerConfig3, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler3.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler3)) + defer services.StopAndAwaitTerminated(ctx, lifecycler3) // nolint:errcheck + + // Assert the third ingester joined the ring + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler3.HealthyInstancesInZoneCount() == 1 + }) + + // Assert the first ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the second ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler2.HealthyInstancesInZoneCount() == 2 + }) + + // Add the fourth ingester to the ring in the same zone as the third ingester + lifecyclerConfig4 := testLifecyclerConfig(ringConfig, "ing4") + lifecyclerConfig4.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig4.JoinAfter = 100 * time.Millisecond + lifecyclerConfig4.Zone = "zone-b" + + lifecycler4, err := NewLifecycler(lifecyclerConfig4, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler4.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler4)) + defer services.StopAndAwaitTerminated(ctx, lifecycler4) // nolint:errcheck + + // Assert the fourth ingester joined the ring + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler4.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the first ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the second ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler2.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the third ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler3.HealthyInstancesInZoneCount() == 2 + }) + + // Create another lifecycler for zone-c + lifecyclerConfig5 := testLifecyclerConfig(ringConfig, "ing5") + lifecyclerConfig5.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig5.JoinAfter = 100 * time.Millisecond + lifecyclerConfig5.Zone = "zone-c" + + lifecycler5, err := NewLifecycler(lifecyclerConfig5, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler5.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler5)) + defer services.StopAndAwaitTerminated(ctx, lifecycler5) // nolint:errcheck + + // Assert the fifth ingester joined the ring + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler5.HealthyInstancesInZoneCount() == 1 + }) + + // Assert the first ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the second ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler2.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the third ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler3.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the fourth ingester count is correct + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + return lifecycler4.HealthyInstancesInZoneCount() == 2 + }) +} + func TestLifecycler_InstancesInZoneCount(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -169,12 +320,13 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { ringConfig.KVStore.Mock = ringStore instances := []struct { - zone string - healthy bool - expectedInstancesInZoneCount int - expectedInstancesCount int - expectedHealthyInstancesCount int - expectedZonesCount int + zone string + healthy bool + expectedInstancesInZoneCount int + expectedInstancesCount int + expectedHealthyInstancesCount int + expectedZonesCount int + expectedHealthyInstancesInZoneCount int }{ { zone: "zone-a", @@ -187,6 +339,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 1, // after adding a healthy instance in zone-a, expectedZonesCount is 1 expectedZonesCount: 1, + // after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount is 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-a", @@ -199,6 +353,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 1, // zone-a was already added, so expectedZonesCount remains 1 expectedZonesCount: 1, + // after adding an unhealthy instance in zone-a, expectedHealthyInstancesInZoneCount remains 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-a", @@ -211,6 +367,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 2, // zone-a was already added, so expectedZonesCount remains 1 expectedZonesCount: 1, + // after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount becomes 2 + expectedHealthyInstancesInZoneCount: 2, }, { zone: "zone-b", @@ -223,6 +381,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 3, // after adding a healthy instance in zone-b, expectedZonesCount becomes 2 expectedZonesCount: 2, + // after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-c", @@ -235,6 +395,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 3, // after adding an unhealthy instance in zone-c, expectedZonesCount becomes 3 expectedZonesCount: 3, + // after adding an unhealthy instance in zone-c, expectedHealthyInstancesInZoneCount is 0 + expectedHealthyInstancesInZoneCount: 0, }, { zone: "zone-c", @@ -247,6 +409,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 4, // zone-c was already added, so expectedZonesCount remains 3 expectedZonesCount: 3, + // after adding a healthy instance in zone-c, expectedHealthyInstancesInZoneCount is 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-b", @@ -259,6 +423,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 5, // zone-b was already added, so expectedZonesCount remains 3 expectedZonesCount: 3, + // after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 2 + expectedHealthyInstancesInZoneCount: 2, }, } @@ -292,10 +458,15 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { return lifecycler.HealthyInstancesCount() }) + test.Poll(t, time.Duration(joinWaitMs)*time.Millisecond, instance.expectedHealthyInstancesInZoneCount, func() interface{} { + return lifecycler.HealthyInstancesInZoneCount() + }) + require.Equal(t, instance.expectedInstancesInZoneCount, lifecycler.InstancesInZoneCount()) require.Equal(t, instance.expectedInstancesCount, lifecycler.InstancesCount()) require.Equal(t, instance.expectedHealthyInstancesCount, lifecycler.HealthyInstancesCount()) require.Equal(t, instance.expectedZonesCount, lifecycler.ZonesCount()) + require.Equal(t, instance.expectedHealthyInstancesInZoneCount, lifecycler.HealthyInstancesInZoneCount()) } } From 51a6c2a4961bdd35166ba25af494016e0fad473b Mon Sep 17 00:00:00 2001 From: JordanRushing Date: Wed, 26 Jun 2024 13:16:57 -0500 Subject: [PATCH 2/2] Improve healthyInstancesInZone naming; simplify test Signed-off-by: JordanRushing --- ring/lifecycler.go | 6 +-- ring/lifecycler_test.go | 83 +++-------------------------------------- 2 files changed, 9 insertions(+), 80 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 848d45b60..7c54eabdd 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -923,7 +923,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { healthyInstancesCount := 0 instancesCount := 0 zones := map[string]int{} - healthyZones := map[string]int{} + healthyInstancesInZone := map[string]int{} if ringDesc != nil { now := time.Now() @@ -935,7 +935,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { // Count the number of healthy instances for Write operation. if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { healthyInstancesCount++ - healthyZones[ingester.Zone]++ + healthyInstancesInZone[ingester.Zone]++ } } } @@ -944,7 +944,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { i.countersLock.Lock() i.healthyInstancesCount = healthyInstancesCount i.instancesCount = instancesCount - i.healthyInstancesInZoneCount = healthyZones[i.cfg.Zone] + i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone] i.instancesInZoneCount = zones[i.cfg.Zone] i.zonesCount = len(zones) i.countersLock.Unlock() diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 54776851d..5502ce548 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -184,7 +184,7 @@ func TestLifecycler_HealthyInstancesInZoneCount(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, lifecycler1) // nolint:errcheck // Assert the first ingester joined the ring - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + test.Poll(t, time.Second, true, func() interface{} { return lifecycler1.HealthyInstancesInZoneCount() == 1 }) @@ -202,12 +202,12 @@ func TestLifecycler_HealthyInstancesInZoneCount(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, lifecycler2) // nolint:errcheck // Assert the second ingester joined the ring - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + test.Poll(t, time.Second, true, func() interface{} { return lifecycler2.HealthyInstancesInZoneCount() == 2 }) // Assert the first ingester count is updated - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + test.Poll(t, time.Second, true, func() interface{} { return lifecycler1.HealthyInstancesInZoneCount() == 2 }) @@ -225,90 +225,19 @@ func TestLifecycler_HealthyInstancesInZoneCount(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, lifecycler3) // nolint:errcheck // Assert the third ingester joined the ring - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + test.Poll(t, time.Second, true, func() interface{} { return lifecycler3.HealthyInstancesInZoneCount() == 1 }) // Assert the first ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler1.HealthyInstancesInZoneCount() == 2 - }) - - // Assert the second ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler2.HealthyInstancesInZoneCount() == 2 - }) - - // Add the fourth ingester to the ring in the same zone as the third ingester - lifecyclerConfig4 := testLifecyclerConfig(ringConfig, "ing4") - lifecyclerConfig4.HeartbeatPeriod = 100 * time.Millisecond - lifecyclerConfig4.JoinAfter = 100 * time.Millisecond - lifecyclerConfig4.Zone = "zone-b" - - lifecycler4, err := NewLifecycler(lifecyclerConfig4, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) - require.NoError(t, err) - assert.Equal(t, 0, lifecycler4.HealthyInstancesInZoneCount()) - - require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler4)) - defer services.StopAndAwaitTerminated(ctx, lifecycler4) // nolint:errcheck - - // Assert the fourth ingester joined the ring - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler4.HealthyInstancesInZoneCount() == 2 - }) - - // Assert the first ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler1.HealthyInstancesInZoneCount() == 2 - }) - - // Assert the second ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler2.HealthyInstancesInZoneCount() == 2 - }) - - // Assert the third ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler3.HealthyInstancesInZoneCount() == 2 - }) - - // Create another lifecycler for zone-c - lifecyclerConfig5 := testLifecyclerConfig(ringConfig, "ing5") - lifecyclerConfig5.HeartbeatPeriod = 100 * time.Millisecond - lifecyclerConfig5.JoinAfter = 100 * time.Millisecond - lifecyclerConfig5.Zone = "zone-c" - - lifecycler5, err := NewLifecycler(lifecyclerConfig5, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) - require.NoError(t, err) - assert.Equal(t, 0, lifecycler5.HealthyInstancesInZoneCount()) - - require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler5)) - defer services.StopAndAwaitTerminated(ctx, lifecycler5) // nolint:errcheck - - // Assert the fifth ingester joined the ring - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler5.HealthyInstancesInZoneCount() == 1 - }) - - // Assert the first ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + test.Poll(t, time.Second, true, func() interface{} { return lifecycler1.HealthyInstancesInZoneCount() == 2 }) // Assert the second ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + test.Poll(t, time.Second, true, func() interface{} { return lifecycler2.HealthyInstancesInZoneCount() == 2 }) - - // Assert the third ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler3.HealthyInstancesInZoneCount() == 2 - }) - - // Assert the fourth ingester count is correct - test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - return lifecycler4.HealthyInstancesInZoneCount() == 2 - }) } func TestLifecycler_InstancesInZoneCount(t *testing.T) {