From 8fb7b488bd0a2c495c8b764b99d44ef65a2285c9 Mon Sep 17 00:00:00 2001 From: Paul Rogers <129207811+paul1r@users.noreply.github.com> Date: Mon, 19 Aug 2024 11:01:23 -0400 Subject: [PATCH] chore(deps): update dskit 20240819 (#13924) --- docs/sources/shared/configuration.md | 8 + go.mod | 4 +- go.sum | 11 +- pkg/distributor/instance_count_test.go | 2 +- pkg/ingester/ingester_test.go | 10 ++ pkg/pattern/flush_test.go | 14 +- pkg/pattern/ring_client.go | 4 +- pkg/querier/querier_mock_test.go | 10 ++ pkg/ruler/base/lifecycle_test.go | 2 +- pkg/ruler/base/ruler_test.go | 134 +++++++-------- pkg/util/ring/ring_test.go | 10 ++ .../grafana/dskit/backoff/backoff.go | 11 +- .../grafana/dskit/concurrency/runner.go | 5 +- .../grafana/dskit/crypto/tls/tls.go | 42 +++-- .../grafana/dskit/internal/math/math.go | 9 - .../grafana/dskit/kv/memberlist/broadcast.go | 4 - .../dskit/kv/memberlist/memberlist_client.go | 112 ++++++++----- .../grafana/dskit/kv/memberlist/metrics.go | 32 +++- .../grafana/dskit/ring/basic_lifecycler.go | 6 +- .../grafana/dskit/ring/lifecycler.go | 86 ++++++++-- vendor/github.com/grafana/dskit/ring/model.go | 76 +++++++-- vendor/github.com/grafana/dskit/ring/ring.go | 109 +++++++++--- .../github.com/grafana/dskit/ring/ring.pb.go | 155 ++++++++++++++---- .../github.com/grafana/dskit/ring/ring.proto | 11 ++ .../grafana/dskit/ring/ring_http.go | 46 +++--- .../grafana/dskit/ring/ring_status.gohtml | 6 +- .../grafana/dskit/ring/shard/shard.go | 5 +- .../grafana/dskit/services/basic_service.go | 48 +++++- .../grafana/dskit/services/failure_watcher.go | 53 +++++- .../grafana/dskit/services/manager.go | 56 ++++++- .../grafana/dskit/services/service.go | 5 +- .../grafana/dskit/spanlogger/spanlogger.go | 50 ++++++ .../grafana/dskit/tenant/resolver.go | 17 +- .../grafana/dskit/tracing/tracing.go | 24 ++- .../godeltaprof/internal/pprof/stub.go | 3 - vendor/modules.txt | 7 +- 36 files changed, 899 insertions(+), 288 deletions(-) delete mode 100644 vendor/github.com/grafana/dskit/internal/math/math.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 3840252f1df6..42d9d07382e7 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -4181,6 +4181,14 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type # CLI flag: -memberlist.leave-timeout [leave_timeout: | default = 20s] +# Timeout for broadcasting all remaining locally-generated updates to other +# nodes when shutting down. Only used if there are nodes left in the memberlist +# cluster, and only applies to locally-generated updates, not to broadcast +# messages that are result of incoming gossip updates. 0 = no timeout, wait +# until all locally-generated updates are sent. +# CLI flag: -memberlist.broadcast-timeout-for-local-updates-on-shutdown +[broadcast_timeout_for_local_updates_on_shutdown: | default = 10s] + # How much space to use for keeping received and sent messages in memory for # troubleshooting (two buffers). 0 to disable. # CLI flag: -memberlist.message-history-buffer-bytes diff --git a/go.mod b/go.mod index 78ee5f51b6cb..fc2a4e8005ff 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.0 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 - github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d + github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc @@ -271,7 +271,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.5 // indirect github.com/gophercloud/gophercloud v1.13.0 // indirect - github.com/grafana/pyroscope-go/godeltaprof v0.1.6 // indirect + github.com/grafana/pyroscope-go/godeltaprof v0.1.7 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index 2bf3aadb5a8c..3e4a97d15469 100644 --- a/go.sum +++ b/go.sum @@ -274,9 +274,8 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA= -github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87/go.mod h1:iGLljf5n9GjT6kc0HBvyI1nOKnGQbNB66VzSNbK5iks= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= @@ -1044,8 +1043,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= -github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d h1:CD8PWWX+9lYdgeMquSofmLErvCtk7jb+3/W/SH6oo/k= -github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc= +github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 h1:iMShjkEYATnBMbEa2wV4QiK5PU2trw24FOCON3v7+K4= +github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0/go.mod h1:c4ASJAo1QFmXGydDzNed2o0+Fncx+x4YmQ1r9HfYU3c= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= @@ -1056,8 +1055,8 @@ github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 h1:NznuPwItog+r github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo= -github.com/grafana/pyroscope-go/godeltaprof v0.1.6/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE= +github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY= +github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU= diff --git a/pkg/distributor/instance_count_test.go b/pkg/distributor/instance_count_test.go index 7f861a262284..4eca382e8d0b 100644 --- a/pkg/distributor/instance_count_test.go +++ b/pkg/distributor/instance_count_test.go @@ -111,7 +111,7 @@ func TestInstanceCountDelegate_CorrectlyInvokesOtherDelegates(t *testing.T) { require.NoError(t, err) ingesters := ring.NewDesc() - ingesters.AddIngester("ingester-0", "ingester-0:3100", "zone-a", []uint32{1}, ring.ACTIVE, time.Now()) + ingesters.AddIngester("ingester-0", "ingester-0:3100", "zone-a", []uint32{1}, ring.ACTIVE, time.Now(), false, time.Now()) // initial state. require.Equal(t, 0, sentry1["Heartbeat"]) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 222cafd38939..f201da437e4e 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -1609,6 +1609,16 @@ func (r *readRingMock) GetTokenRangesForInstance(instance string) (ring.TokenRan return tr, nil } +// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. +func (r *readRingMock) WritableInstancesWithTokensCount() int { + return len(r.replicationSet.Instances) +} + +// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. +func (r *readRingMock) WritableInstancesWithTokensInZoneCount(_ string) int { + return len(r.replicationSet.Instances) +} + func mockReadRingWithOneActiveIngester() *readRingMock { return newReadRingMock([]ring.InstanceDesc{ {Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}, diff --git a/pkg/pattern/flush_test.go b/pkg/pattern/flush_test.go index ea71f6055d8b..be6a8f325333 100644 --- a/pkg/pattern/flush_test.go +++ b/pkg/pattern/flush_test.go @@ -144,7 +144,7 @@ func (f *fakeRingClient) State() services.State { panic("not implemented") } -func (f *fakeRingClient) AddListener(_ services.Listener) { +func (f *fakeRingClient) AddListener(_ services.Listener) func() { panic("not implemented") } @@ -184,6 +184,18 @@ func (f *fakeRing) ZonesCount() int { return args.Int(0) } +// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. +func (f *fakeRing) WritableInstancesWithTokensCount() int { + args := f.Called() + return args.Int(0) +} + +// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. +func (f *fakeRing) WritableInstancesWithTokensInZoneCount(zone string) int { + args := f.Called(zone) + return args.Int(0) +} + func (f *fakeRing) Get( key uint32, op ring.Operation, diff --git a/pkg/pattern/ring_client.go b/pkg/pattern/ring_client.go index 72739e0c0849..8c71328e60bc 100644 --- a/pkg/pattern/ring_client.go +++ b/pkg/pattern/ring_client.go @@ -110,8 +110,8 @@ func (r *ringClient) State() services.State { return r.ring.State() } -func (r *ringClient) AddListener(listener services.Listener) { - r.ring.AddListener(listener) +func (r *ringClient) AddListener(listener services.Listener) func() { + return r.ring.AddListener(listener) } func (r *ringClient) GetClientFor(addr string) (ring_client.PoolClient, error) { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index cff1f67637a1..e8a3b40ebd61 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -496,6 +496,16 @@ func (r *readRingMock) GetTokenRangesForInstance(_ string) (ring.TokenRanges, er return tr, nil } +// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. +func (r *readRingMock) WritableInstancesWithTokensCount() int { + return len(r.replicationSet.Instances) +} + +// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. +func (r *readRingMock) WritableInstancesWithTokensInZoneCount(_ string) int { + return len(r.replicationSet.Instances) +} + func mockReadRingWithOneActiveIngester() *readRingMock { return newReadRingMock([]ring.InstanceDesc{ {Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}, diff --git a/pkg/ruler/base/lifecycle_test.go b/pkg/ruler/base/lifecycle_test.go index 2fefc62bf6cb..417810d1dcf2 100644 --- a/pkg/ruler/base/lifecycle_test.go +++ b/pkg/ruler/base/lifecycle_test.go @@ -79,7 +79,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) { require.NoError(t, ringStore.CAS(ctx, ringKey, func(in interface{}) (interface{}, bool, error) { ringDesc := ring.GetOrCreateRingDesc(in) - instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE, time.Now()) + instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE, time.Now(), false, time.Now()) instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix() ringDesc.Ingesters[unhealthyInstanceID] = instance diff --git a/pkg/ruler/base/ruler_test.go b/pkg/ruler/base/ruler_test.go index 95d924c37409..c80ad29cb1ad 100644 --- a/pkg/ruler/base/ruler_test.go +++ b/pkg/ruler/base/ruler_test.go @@ -562,7 +562,7 @@ func TestGetRules(t *testing.T) { d = ring.NewDesc() } for rID, tokens := range allTokensByRuler { - d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now()) + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now(), false, time.Now()) } return d, true, nil }) @@ -744,7 +744,7 @@ func TestSharding(t *testing.T) { sharding: true, shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ruler1: allRules}, }, @@ -754,7 +754,7 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyDefault, enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1, user1Group2}, @@ -766,7 +766,7 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyDefault, disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user2: {user2Group1}, @@ -778,8 +778,8 @@ func TestSharding(t *testing.T) { sharding: true, shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -800,8 +800,8 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyDefault, enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -820,8 +820,8 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyDefault, disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -840,7 +840,7 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) desc.Ingesters[ruler2] = ring.InstanceDesc{ Addr: ruler2Addr, Timestamp: time.Now().Add(-time.Hour).Unix(), @@ -864,8 +864,8 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.LEAVING, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.LEAVING, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -880,8 +880,8 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.JOINING, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.JOINING, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -896,7 +896,7 @@ func TestSharding(t *testing.T) { shardingStrategy: util.ShardingStrategyShuffle, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -910,8 +910,8 @@ func TestSharding(t *testing.T) { shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -928,8 +928,8 @@ func TestSharding(t *testing.T) { setupRing: func(desc *ring.Desc) { // Exact same tokens setup as previous test. - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -944,8 +944,8 @@ func TestSharding(t *testing.T) { shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -964,9 +964,9 @@ func TestSharding(t *testing.T) { shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -988,9 +988,9 @@ func TestSharding(t *testing.T) { shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Token + 1, user1Group2Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Token + 1, user1Group2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1012,9 +1012,9 @@ func TestSharding(t *testing.T) { enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1035,9 +1035,9 @@ func TestSharding(t *testing.T) { disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1054,7 +1054,7 @@ func TestSharding(t *testing.T) { sharding: true, shardingAlgo: util.ShardingAlgoByRule, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ruler1: allRulesSharded}, }, @@ -1064,7 +1064,7 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user1: { @@ -1080,7 +1080,7 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user2: { @@ -1098,8 +1098,8 @@ func TestSharding(t *testing.T) { sharding: true, shardingAlgo: util.ShardingAlgoByRule, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1127,8 +1127,8 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1152,8 +1152,8 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1180,7 +1180,7 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) desc.Ingesters[ruler2] = ring.InstanceDesc{ Addr: ruler2Addr, Timestamp: time.Now().Add(-time.Hour).Unix(), @@ -1209,8 +1209,8 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.LEAVING, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.LEAVING, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1225,8 +1225,8 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.JOINING, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.JOINING, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1242,7 +1242,7 @@ func TestSharding(t *testing.T) { shardingAlgo: util.ShardingAlgoByRule, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1257,9 +1257,9 @@ func TestSharding(t *testing.T) { shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) // immaterial what tokens this ruler has, it won't be assigned any rules - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1277,9 +1277,9 @@ func TestSharding(t *testing.T) { setupRing: func(desc *ring.Desc) { // Exact same tokens setup as previous test. - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) // this ruler has all the rule tokens, so it gets all the rules - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1295,8 +1295,8 @@ func TestSharding(t *testing.T) { shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1327,9 +1327,9 @@ func TestSharding(t *testing.T) { shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Rule1Token + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group1Rule2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Rule1Token + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group1Rule2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1363,9 +1363,9 @@ func TestSharding(t *testing.T) { shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1400,9 +1400,9 @@ func TestSharding(t *testing.T) { enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ @@ -1429,9 +1429,9 @@ func TestSharding(t *testing.T) { disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { - desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now()) - desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now()) }, expectedRules: expectedRulesMap{ diff --git a/pkg/util/ring/ring_test.go b/pkg/util/ring/ring_test.go index fa285c0cb4ae..2f89ccbefe89 100644 --- a/pkg/util/ring/ring_test.go +++ b/pkg/util/ring/ring_test.go @@ -133,6 +133,16 @@ func (r *readRingMock) ZonesCount() int { return len(uniqueZone) } +// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. +func (r *readRingMock) WritableInstancesWithTokensCount() int { + return len(r.replicationSet.Instances) +} + +// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. +func (r *readRingMock) WritableInstancesWithTokensInZoneCount(_ string) int { + return len(r.replicationSet.Instances) +} + type readLifecyclerMock struct { mock.Mock addr string diff --git a/vendor/github.com/grafana/dskit/backoff/backoff.go b/vendor/github.com/grafana/dskit/backoff/backoff.go index 7ce556472848..419af80e1ad5 100644 --- a/vendor/github.com/grafana/dskit/backoff/backoff.go +++ b/vendor/github.com/grafana/dskit/backoff/backoff.go @@ -54,7 +54,7 @@ func (b *Backoff) Ongoing() bool { return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries) } -// Err returns the reason for terminating the backoff, or nil if it didn't terminate +// Err returns the reason for terminating the backoff, or nil if it didn't terminate. func (b *Backoff) Err() error { if b.ctx.Err() != nil { return b.ctx.Err() @@ -65,6 +65,15 @@ func (b *Backoff) Err() error { return nil } +// ErrCause is like Err() but returns the context cause if backoff is terminated because the +// context has been canceled. +func (b *Backoff) ErrCause() error { + if b.ctx.Err() != nil { + return context.Cause(b.ctx) + } + return b.Err() +} + // NumRetries returns the number of retries so far func (b *Backoff) NumRetries() int { return b.numRetries diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go index fcc892997149..f3ee57c857f7 100644 --- a/vendor/github.com/grafana/dskit/concurrency/runner.go +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -7,7 +7,6 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "github.com/grafana/dskit/internal/math" "github.com/grafana/dskit/multierror" ) @@ -31,7 +30,7 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun errsMx := sync.Mutex{} wg := sync.WaitGroup{} - for ix := 0; ix < math.Min(concurrency, len(userIDs)); ix++ { + for ix := 0; ix < min(concurrency, len(userIDs)); ix++ { wg.Add(1) go func() { defer wg.Done() @@ -108,7 +107,7 @@ func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) - for ix := 0; ix < math.Min(concurrency, jobs); ix++ { + for ix := 0; ix < min(concurrency, jobs); ix++ { g.Go(func() error { for ctx.Err() == nil { idx := int(indexes.Inc()) diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go index 7ed818f399a6..437ab722f7fd 100644 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ b/vendor/github.com/grafana/dskit/crypto/tls/tls.go @@ -109,6 +109,24 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { config.RootCAs = caCertPool } + loadCert := func() (tls.Certificate, error) { + cert, err := reader.ReadSecret(cfg.CertPath) + if err != nil { + return tls.Certificate{}, errors.Wrapf(err, "error loading client cert: %s", cfg.CertPath) + } + key, err := reader.ReadSecret(cfg.KeyPath) + if err != nil { + return tls.Certificate{}, errors.Wrapf(err, "error loading client key: %s", cfg.KeyPath) + } + + clientCert, err := tls.X509KeyPair(cert, key) + if err != nil { + return tls.Certificate{}, errors.Wrapf(err, "failed to load TLS certificate %s,%s", cfg.CertPath, cfg.KeyPath) + } + return clientCert, nil + + } + // Read Client Certificate if cfg.CertPath != "" || cfg.KeyPath != "" { if cfg.CertPath == "" { @@ -117,21 +135,23 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { if cfg.KeyPath == "" { return nil, errKeyMissing } - - cert, err := reader.ReadSecret(cfg.CertPath) + // Confirm that certificate and key paths are valid. + cert, err := loadCert() if err != nil { - return nil, errors.Wrapf(err, "error loading client cert: %s", cfg.CertPath) - } - key, err := reader.ReadSecret(cfg.KeyPath) - if err != nil { - return nil, errors.Wrapf(err, "error loading client key: %s", cfg.KeyPath) + return nil, err } - clientCert, err := tls.X509KeyPair(cert, key) - if err != nil { - return nil, errors.Wrapf(err, "failed to load TLS certificate %s,%s", cfg.CertPath, cfg.KeyPath) + config.GetClientCertificate = func(_ *tls.CertificateRequestInfo) (*tls.Certificate, error) { + c, err := loadCert() + if err != nil { + return nil, err + } + return &c, err } - config.Certificates = []tls.Certificate{clientCert} + // Allow fallback for callers using this config also for server purposes (i.e., kv/memberlist). + // Clients will prefer GetClientCertificate, but servers can use Certificates. + config.Certificates = []tls.Certificate{cert} + } if cfg.MinVersion != "" { diff --git a/vendor/github.com/grafana/dskit/internal/math/math.go b/vendor/github.com/grafana/dskit/internal/math/math.go deleted file mode 100644 index 9d6422e50e35..000000000000 --- a/vendor/github.com/grafana/dskit/internal/math/math.go +++ /dev/null @@ -1,9 +0,0 @@ -package math - -// Min returns the minimum of two ints. -func Min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go b/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go index 6657b73a51de..aefaa2f65e10 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go @@ -1,10 +1,7 @@ package memberlist import ( - "fmt" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/hashicorp/memberlist" ) @@ -45,7 +42,6 @@ func (r ringBroadcast) Invalidates(old memberlist.Broadcast) bool { // otherwise, we may be invalidating some older messages, which however covered different // ingesters if r.version >= oldb.version { - level.Debug(r.logger).Log("msg", "Invalidating forwarded broadcast", "key", r.key, "version", r.version, "oldVersion", oldb.version, "content", fmt.Sprintf("%v", r.content), "oldContent", fmt.Sprintf("%v", oldb.content)) return true } } diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index a1b659d4097e..a7eefe92fc22 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -157,7 +157,8 @@ type KVConfig struct { LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` + LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` + BroadcastTimeoutForLocalUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout_for_local_updates_on_shutdown" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` @@ -198,6 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") + f.DurationVar(&cfg.BroadcastTimeoutForLocalUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-local-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent.") cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } @@ -231,10 +233,11 @@ type KV struct { // dns discovery provider provider DNSProvider - // Protects access to memberlist and broadcasts fields. - delegateReady atomic.Bool - memberlist *memberlist.Memberlist - broadcasts *memberlist.TransmitLimitedQueue + // Protects access to memberlist and broadcast queues. + delegateReady atomic.Bool + memberlist *memberlist.Memberlist + localBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally + gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes // KV Store. storeMu sync.Mutex @@ -273,7 +276,8 @@ type KV struct { numberOfPushes prometheus.Counter totalSizeOfPulls prometheus.Counter totalSizeOfPushes prometheus.Counter - numberOfBroadcastMessagesInQueue prometheus.GaugeFunc + numberOfGossipMessagesInQueue prometheus.GaugeFunc + numberOfLocalMessagesInQueue prometheus.GaugeFunc totalSizeOfBroadcastMessagesInQueue prometheus.Gauge numberOfBroadcastMessagesDropped prometheus.Counter casAttempts prometheus.Counter @@ -456,7 +460,11 @@ func (m *KV) starting(ctx context.Context) error { } // Finish delegate initialization. m.memberlist = list - m.broadcasts = &memberlist.TransmitLimitedQueue{ + m.localBroadcasts = &memberlist.TransmitLimitedQueue{ + NumNodes: list.NumMembers, + RetransmitMult: mlCfg.RetransmitMult, + } + m.gossipBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: list.NumMembers, RetransmitMult: mlCfg.RetransmitMult, } @@ -719,20 +727,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string { func (m *KV) stopping(_ error) error { level.Info(m.logger).Log("msg", "leaving memberlist cluster") - // Wait until broadcast queue is empty, but don't wait for too long. + // Wait until queue with locally-generated messages is empty, but don't wait for too long. // Also don't wait if there is just one node left. - // Problem is that broadcast queue is also filled up by state changes received from other nodes, - // so it may never be empty in a busy cluster. However, we generally only care about messages - // generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able - // to get out in this timeout. + // Note: Once we enter Stopping state, we don't queue more locally-generated messages. - waitTimeout := time.Now().Add(10 * time.Second) - for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) { + deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown) + + msgs := m.localBroadcasts.NumQueued() + nodes := m.memberlist.NumMembers() + for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) { + level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes) time.Sleep(250 * time.Millisecond) + + msgs = m.localBroadcasts.NumQueued() + nodes = m.memberlist.NumMembers() } - if cnt := m.broadcasts.NumQueued(); cnt > 0 { - level.Warn(m.logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers()) + if msgs > 0 { + level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes) } err := m.memberlist.Leave(m.cfg.LeaveTimeout) @@ -972,11 +984,7 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - if m.State() == services.Running { - m.broadcastNewValue(key, change, newver, codec) - } else { - level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) - } + m.broadcastNewValue(key, change, newver, codec, true) } return nil @@ -1034,7 +1042,12 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) return change, newver, retry, nil } -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) { +func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) { + if locallyGenerated && m.State() != services.Running { + level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) + return + } + data, err := codec.Encode(change) if err != nil { level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err) @@ -1058,7 +1071,25 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec Changes: change.MergeContent(), }) - m.queueBroadcast(key, change.MergeContent(), version, pairData) + l := len(pairData) + b := ringBroadcast{ + key: key, + content: change.MergeContent(), + version: version, + msg: pairData, + finished: func(ringBroadcast) { + m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) + }, + logger: m.logger, + } + + m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) + + if locallyGenerated { + m.localBroadcasts.QueueBroadcast(b) + } else { + m.gossipBroadcasts.QueueBroadcast(b) + } } // NodeMeta is method from Memberlist Delegate interface @@ -1153,7 +1184,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { m.notifyWatchers(key) // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec) + m.broadcastNewValue(key, mod, version, update.codec, false) } case <-m.shutdown: @@ -1163,24 +1194,6 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { } } -func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) { - l := len(message) - - b := ringBroadcast{ - key: key, - content: content, - version: version, - msg: message, - finished: func(ringBroadcast) { - m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) - }, - logger: m.logger, - } - - m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) - m.broadcasts.QueueBroadcast(b) -} - // GetBroadcasts is method from Memberlist Delegate interface // It returns all pending broadcasts (within the size limit) func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { @@ -1188,7 +1201,18 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { return nil } - return m.broadcasts.GetBroadcasts(overhead, limit) + // Prioritize locally-generated messages + msgs := m.localBroadcasts.GetBroadcasts(overhead, limit) + + // Decrease limit for each message we got from locally-generated broadcasts. + for _, m := range msgs { + limit -= overhead + len(m) + } + + if limit > 0 { + msgs = append(msgs, m.gossipBroadcasts.GetBroadcasts(overhead, limit)...) + } + return msgs } // LocalState is method from Memberlist Delegate interface @@ -1335,7 +1359,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false) } } diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go index 75a6b2324761..0f09c5d71fb6 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go @@ -71,15 +71,33 @@ func (m *KV) createAndRegisterMetrics() { Help: "Total size of pulled state", }) - m.numberOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: m.cfg.MetricsNamespace, - Subsystem: subsystem, - Name: "messages_in_broadcast_queue", - Help: "Number of user messages in the broadcast queue", + const queueMetricName = "messages_in_broadcast_queue" + const queueMetricHelp = "Number of user messages in the broadcast queue" + + m.numberOfGossipMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: queueMetricName, + Help: queueMetricHelp, + ConstLabels: map[string]string{"queue": "gossip"}, + }, func() float64 { + // Queues are not set before Starting state + if m.State() == services.Running || m.State() == services.Stopping { + return float64(m.gossipBroadcasts.NumQueued()) + } + return 0 + }) + + m.numberOfLocalMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: queueMetricName, + Help: queueMetricHelp, + ConstLabels: map[string]string{"queue": "local"}, }, func() float64 { - // m.broadcasts is not set before Starting state + // Queues are not set before Starting state if m.State() == services.Running || m.State() == services.Stopping { - return float64(m.broadcasts.NumQueued()) + return float64(m.localBroadcasts.NumQueued()) } return 0 }) diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 0b72ef171171..1675cafac92d 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -73,6 +73,8 @@ Rather, it's the delegate's responsibility to call [BasicLifecycler.ChangeState] - The lifecycler will then periodically, based on the [ring.BasicLifecyclerConfig.TokensObservePeriod], attempt to verify that its tokens have been added to the ring, after which it will call [ring.BasicLifecyclerDelegate.OnRingInstanceTokens]. - The lifecycler will update they key/value store with heartbeats and state changes based on the [ring.BasicLifecyclerConfig.HeartbeatPeriod], calling [ring.BasicLifecyclerDelegate.OnRingInstanceHeartbeat] each time. - When the BasicLifecycler is stopped, it will call [ring.BasicLifecyclerDelegate.OnRingInstanceStopping]. + +BasicLifecycler does not support read only instances for now. */ type BasicLifecycler struct { *services.BasicService @@ -316,7 +318,7 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error { // Always overwrite the instance in the ring (even if already exists) because some properties // may have changed (stated, tokens, zone, address) and even if they didn't the heartbeat at // least did. - instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt) + instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt, false, time.Time{}) return ringDesc, true, nil }) @@ -443,7 +445,7 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc, // a resharding of tenants among instances: to guarantee query correctness we need to update the // registration timestamp to current time. registeredAt := time.Now() - instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt) + instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt, false, time.Time{}) } prevTimestamp := instanceDesc.Timestamp diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 7c54eabdd873..1ff80d99ac80 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -147,10 +147,12 @@ type Lifecycler struct { // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. - stateMtx sync.RWMutex - state InstanceState - tokens Tokens - registeredAt time.Time + stateMtx sync.RWMutex + state InstanceState + tokens Tokens + registeredAt time.Time + readOnly bool + readOnlyLastUpdated time.Time // Controls the ready-reporting readyLock sync.Mutex @@ -161,6 +163,7 @@ type Lifecycler struct { countersLock sync.RWMutex healthyInstancesCount int instancesCount int + readOnlyInstancesCount int healthyInstancesInZoneCount int instancesInZoneCount int zonesCount int @@ -349,6 +352,26 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error return <-errCh } +func (i *Lifecycler) ChangeReadOnlyState(ctx context.Context, readOnly bool) error { + errCh := make(chan error) + fn := func() { + prevReadOnly, _ := i.GetReadOnlyState() + if prevReadOnly == readOnly { + errCh <- nil + return + } + + level.Info(i.logger).Log("msg", "changing read-only state of instance in the ring", "readOnly", readOnly, "ring", i.RingName) + i.setReadOnlyState(readOnly, time.Now()) + errCh <- i.updateConsul(ctx) + } + + if err := i.sendToLifecyclerLoop(fn); err != nil { + return err + } + return <-errCh +} + func (i *Lifecycler) getTokens() Tokens { i.stateMtx.RLock() defer i.stateMtx.RUnlock() @@ -379,6 +402,21 @@ func (i *Lifecycler) setRegisteredAt(registeredAt time.Time) { i.registeredAt = registeredAt } +// GetReadOnlyState returns the read-only state of this instance -- whether instance is read-only, and when what the last +// update of read-only state (possibly zero). +func (i *Lifecycler) GetReadOnlyState() (bool, time.Time) { + i.stateMtx.RLock() + defer i.stateMtx.RUnlock() + return i.readOnly, i.readOnlyLastUpdated +} + +func (i *Lifecycler) setReadOnlyState(readOnly bool, readOnlyLastUpdated time.Time) { + i.stateMtx.Lock() + defer i.stateMtx.Unlock() + i.readOnly = readOnly + i.readOnlyLastUpdated = readOnlyLastUpdated +} + // ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. // // For this method to work correctly (especially when using gossiping), source ingester (specified by @@ -442,6 +480,14 @@ func (i *Lifecycler) InstancesCount() int { return i.instancesCount } +// ReadOnlyInstancesCount returns the total number of instances in the ring that are read only, updated during the last heartbeat period. +func (i *Lifecycler) ReadOnlyInstancesCount() int { + i.countersLock.RLock() + defer i.countersLock.RUnlock() + + return i.readOnlyInstancesCount +} + // HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in // this lifecycler's zone, updated during the last heartbeat period. func (i *Lifecycler) HealthyInstancesInZoneCount() int { @@ -629,10 +675,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error { instanceDesc, ok := ringDesc.Ingesters[i.ID] if !ok { - // The instance doesn't exist in the ring, so it's safe to set the registered timestamp - // as of now. - registeredAt := time.Now() - i.setRegisteredAt(registeredAt) + now := time.Now() + // The instance doesn't exist in the ring, so it's safe to set the registered timestamp as of now. + i.setRegisteredAt(now) + // Clear read-only state, and set last update time to "now". + i.setReadOnlyState(false, now) // We use the tokens from the file only if it does not exist in the ring yet. if len(tokensFromFile) > 0 { @@ -640,14 +687,16 @@ func (i *Lifecycler) initRing(ctx context.Context) error { if len(tokensFromFile) >= i.cfg.NumTokens { i.setState(ACTIVE) } - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), i.getRegisteredAt(), ro, rots) i.setTokens(tokensFromFile) return ringDesc, true, nil } // Either we are a new ingester, or consul must have restarted level.Info(i.logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName) - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), registeredAt) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), i.getRegisteredAt(), ro, rots) return ringDesc, true, nil } @@ -655,6 +704,9 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // but we need to update the local state accordingly. i.setRegisteredAt(instanceDesc.GetRegisteredAt()) + // Set lifecycler read-only state from ring entry. We will not modify ring entry's read-only state. + i.setReadOnlyState(instanceDesc.GetReadOnlyState()) + // If the ingester is in the JOINING state this means it crashed due to // a failed token transfer or some other reason during startup. We want // to set it back to PENDING in order to start the lifecycle from the @@ -747,7 +799,8 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool { ringTokens = append(ringTokens, newTokens...) sort.Sort(ringTokens) - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt()) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt(), ro, rots) i.setTokens(ringTokens) @@ -855,7 +908,8 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er sort.Sort(myTokens) i.setTokens(myTokens) - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt()) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt(), ro, rots) return ringDesc, true, nil }) @@ -889,7 +943,8 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { tokens = instanceDesc.Tokens } - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt()) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt(), ro, rots) return ringDesc, true, nil }) @@ -922,6 +977,7 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error func (i *Lifecycler) updateCounters(ringDesc *Desc) { healthyInstancesCount := 0 instancesCount := 0 + readOnlyInstancesCount := 0 zones := map[string]int{} healthyInstancesInZone := map[string]int{} @@ -931,6 +987,9 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { for _, ingester := range ringDesc.Ingesters { zones[ingester.Zone]++ instancesCount++ + if ingester.ReadOnly { + readOnlyInstancesCount++ + } // Count the number of healthy instances for Write operation. if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { @@ -944,6 +1003,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { i.countersLock.Lock() i.healthyInstancesCount = healthyInstancesCount i.instancesCount = instancesCount + i.readOnlyInstancesCount = readOnlyInstancesCount i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone] i.instancesInZoneCount = zones[i.cfg.Zone] i.zonesCount = len(zones) diff --git a/vendor/github.com/grafana/dskit/ring/model.go b/vendor/github.com/grafana/dskit/ring/model.go index 334b027d0f8b..fb3095172b55 100644 --- a/vendor/github.com/grafana/dskit/ring/model.go +++ b/vendor/github.com/grafana/dskit/ring/model.go @@ -45,26 +45,30 @@ func NewDesc() *Desc { } } +func timeToUnixSecons(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.Unix() +} + // AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, // any other tokens are removed. -func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc { +func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time, readOnly bool, readOnlyUpdated time.Time) InstanceDesc { if d.Ingesters == nil { d.Ingesters = map[string]InstanceDesc{} } - registeredTimestamp := int64(0) - if !registeredAt.IsZero() { - registeredTimestamp = registeredAt.Unix() - } - ingester := InstanceDesc{ - Id: id, - Addr: addr, - Timestamp: time.Now().Unix(), - RegisteredTimestamp: registeredTimestamp, - State: state, - Tokens: tokens, - Zone: zone, + Id: id, + Addr: addr, + Timestamp: time.Now().Unix(), + State: state, + Tokens: tokens, + Zone: zone, + RegisteredTimestamp: timeToUnixSecons(registeredAt), + ReadOnly: readOnly, + ReadOnlyUpdatedTimestamp: timeToUnixSecons(readOnlyUpdated), } d.Ingesters[id] = ingester @@ -142,6 +146,20 @@ func (i *InstanceDesc) GetRegisteredAt() time.Time { return time.Unix(i.RegisteredTimestamp, 0) } +// GetReadOnlyState returns the read-only state and timestamp of last read-only state update. +func (i *InstanceDesc) GetReadOnlyState() (bool, time.Time) { + if i == nil { + return false, time.Time{} + } + + ts := time.Time{} + if i.ReadOnlyUpdatedTimestamp > 0 { + ts = time.Unix(i.ReadOnlyUpdatedTimestamp, 0) + } + + return i.ReadOnly, ts +} + func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool { healthy := op.IsInstanceInStateHealthy(i.State) @@ -552,6 +570,30 @@ func (d *Desc) instancesWithTokensCountPerZone() map[string]int { return instancesCountPerZone } +func (d *Desc) writableInstancesWithTokensCount() int { + writableInstancesWithTokensCount := 0 + if d != nil { + for _, ingester := range d.Ingesters { + if len(ingester.Tokens) > 0 && !ingester.ReadOnly { + writableInstancesWithTokensCount++ + } + } + } + return writableInstancesWithTokensCount +} + +func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int { + instancesCountPerZone := map[string]int{} + if d != nil { + for _, ingester := range d.Ingesters { + if len(ingester.Tokens) > 0 && !ingester.ReadOnly { + instancesCountPerZone[ingester.Zone]++ + } + } + } + return instancesCountPerZone +} + type CompareResult int // CompareResult responses @@ -600,6 +642,14 @@ func (d *Desc) RingCompare(o *Desc) CompareResult { return Different } + if ing.ReadOnly != oing.ReadOnly { + return Different + } + + if ing.ReadOnlyUpdatedTimestamp != oing.ReadOnlyUpdatedTimestamp { + return Different + } + if len(ing.Tokens) != len(oing.Tokens) { return Different } diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index e1c1f6a5159d..bb7e29c28a41 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -20,7 +20,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/dskit/flagext" - dsmath "github.com/grafana/dskit/internal/math" "github.com/grafana/dskit/internal/slices" "github.com/grafana/dskit/kv" shardUtil "github.com/grafana/dskit/ring/shard" @@ -36,6 +35,7 @@ const ( ) // ReadRing represents the read interface to the ring. +// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. type ReadRing interface { // Get returns n (or more) instances which form the replicas for the given key. // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value @@ -88,6 +88,12 @@ type ReadRing interface { // InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens. InstancesWithTokensInZoneCount(zone string) int + // WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. + WritableInstancesWithTokensCount() int + + // WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. + WritableInstancesWithTokensInZoneCount(zone string) int + // ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring. ZonesCount() int } @@ -167,6 +173,7 @@ type instanceInfo struct { } // Ring is a Service that maintains an in-memory copy of a ring and watches for changes. +// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. type Ring struct { services.Service @@ -205,6 +212,12 @@ type Ring struct { // Nubmber of registered instances with tokens per zone. instancesWithTokensCountPerZone map[string]int + // Number of registered instances are writable and have tokens. + writableInstancesWithTokensCount int + + // Nubmber of registered instances with tokens per zone that are writable. + writableInstancesWithTokensCountPerZone map[string]int + // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes. // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring @@ -357,6 +370,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) { instancesWithTokensCount := ringDesc.instancesWithTokensCount() instancesCountPerZone := ringDesc.instancesCountPerZone() instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone() + writableInstancesWithTokensCount := ringDesc.writableInstancesWithTokensCount() + writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone() r.mtx.Lock() defer r.mtx.Unlock() @@ -368,6 +383,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.instancesWithTokensCount = instancesWithTokensCount r.instancesCountPerZone = instancesCountPerZone r.instancesWithTokensCountPerZone = instancesWithTokensCountPerZone + r.writableInstancesWithTokensCount = writableInstancesWithTokensCount + r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone r.oldestRegisteredTimestamp = oldestRegisteredTimestamp r.lastTopologyChange = now @@ -423,7 +440,7 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance distinctHosts = bufHosts[:0] distinctZones = bufZones[:0] ) - for i := start; len(distinctHosts) < dsmath.Min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ { + for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ { iterations++ // Wrap i around in the ring. i %= len(r.ringTokens) @@ -528,7 +545,7 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro // Given data is replicated to RF different zones, we can tolerate a number of // RF/2 failing zones. However, we need to protect from the case the ring currently // contains instances in a number of zones < RF. - numReplicatedZones := dsmath.Min(len(r.ringZones), r.cfg.ReplicationFactor) + numReplicatedZones := min(len(r.ringZones), r.cfg.ReplicationFactor) minSuccessZones := (numReplicatedZones / 2) + 1 maxUnavailableZones = minSuccessZones - 1 @@ -677,10 +694,14 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // // - Shuffling: probabilistically, for a large enough cluster each identifier gets a different // set of instances, with a reduced number of overlapping instances between two identifiers. +// +// Subring returned by this method does not contain instances that have read-only field set. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - // Nothing to do if the shard size is not smaller then the actual ring. - if size <= 0 || r.InstancesCount() <= size { - return r + // Use all possible instances if shuffle sharding is disabled. We don't set size to r.InstancesCount(), because + // that could lead to not all instances being returned when ring zones are unbalanced. + // Reason for not returning entire ring directly is that we need to filter out read-only instances. + if size <= 0 { + size = math.MaxInt } if cached := r.getCachedShuffledSubring(identifier, size); cached != nil { @@ -705,7 +726,7 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // This function supports caching, but the cache will only be effective if successive calls for the // same identifier are with the same lookbackPeriod and increasing values of now. func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { - // Nothing to do if the shard size is not smaller then the actual ring. + // Nothing to do if the shard size is not smaller than the actual ring. if size <= 0 || r.InstancesCount() <= size { return r } @@ -750,7 +771,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur actualZones = []string{""} } - shard := make(map[string]InstanceDesc, size) + shard := make(map[string]InstanceDesc, min(len(r.ringDesc.Ingesters), size)) // We need to iterate zones always in the same order to guarantee stability. for _, zone := range actualZones { @@ -797,6 +818,13 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur instanceID := info.InstanceID instance := r.ringDesc.Ingesters[instanceID] + + // The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded. + if lookbackPeriod == 0 && instance.ReadOnly { + continue + } + + // Include instance in the subring. shard[instanceID] = instance // If the lookback is enabled and this instance has been registered within the lookback period @@ -805,6 +833,15 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur continue } + // If the lookback is enabled, and this instance is read only or has switched its read-only state + // within the lookback period, then we should include it in the subring, but continue selecting more instances. + // + // * If instance switched to read-only state within the lookback period, then next instance is currently receiving data that previously belonged to this instance. + // * If instance switched to read-write state (read-only=false) within the lookback period, then there was another instance that received data that now belongs back to this instance. + if lookbackPeriod > 0 && (instance.ReadOnly || instance.ReadOnlyUpdatedTimestamp >= lookbackUntil) { + continue + } + found = true break } @@ -824,15 +861,17 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur shardTokens := mergeTokenGroups(shardTokensByZone) return &Ring{ - cfg: r.cfg, - strategy: r.strategy, - ringDesc: shardDesc, - ringTokens: shardTokens, - ringTokensByZone: shardTokensByZone, - ringZones: getZones(shardTokensByZone), - instancesWithTokensCount: shardDesc.instancesWithTokensCount(), - instancesCountPerZone: shardDesc.instancesCountPerZone(), - instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(), + cfg: r.cfg, + strategy: r.strategy, + ringDesc: shardDesc, + ringTokens: shardTokens, + ringTokensByZone: shardTokensByZone, + ringZones: getZones(shardTokensByZone), + instancesWithTokensCount: shardDesc.instancesWithTokensCount(), + instancesCountPerZone: shardDesc.instancesCountPerZone(), + instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(), + writableInstancesWithTokensCount: shardDesc.writableInstancesWithTokensCount(), + writableInstancesWithTokensCountPerZone: shardDesc.writableInstancesWithTokensCountPerZone(), oldestRegisteredTimestamp: shardDesc.getOldestRegisteredTimestamp(), @@ -1036,11 +1075,12 @@ func (r *Ring) setCachedShuffledSubringWithLookback(identifier string, size int, validForLookbackWindowsStartingBefore := int64(math.MaxInt64) for _, instance := range subring.ringDesc.Ingesters { - registeredDuringLookbackWindow := instance.RegisteredTimestamp >= lookbackWindowStart - - if registeredDuringLookbackWindow && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore { + if instance.RegisteredTimestamp >= lookbackWindowStart && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore { validForLookbackWindowsStartingBefore = instance.RegisteredTimestamp } + if instance.ReadOnlyUpdatedTimestamp >= lookbackWindowStart && instance.ReadOnlyUpdatedTimestamp < validForLookbackWindowsStartingBefore { + validForLookbackWindowsStartingBefore = instance.ReadOnlyUpdatedTimestamp + } } r.mtx.Lock() @@ -1141,6 +1181,22 @@ func (r *Ring) InstancesWithTokensInZoneCount(zone string) int { return r.instancesWithTokensCountPerZone[zone] } +// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. +func (r *Ring) WritableInstancesWithTokensCount() int { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.writableInstancesWithTokensCount +} + +// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. +func (r *Ring) WritableInstancesWithTokensInZoneCount(zone string) int { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.writableInstancesWithTokensCountPerZone[zone] +} + func (r *Ring) ZonesCount() int { r.mtx.RLock() defer r.mtx.RUnlock() @@ -1148,6 +1204,19 @@ func (r *Ring) ZonesCount() int { return len(r.ringZones) } +// readOnlyInstanceCount returns the number of read only instances in the ring. +func (r *Ring) readOnlyInstanceCount() int { + r.mtx.RLock() + c := 0 + for _, i := range r.ringDesc.Ingesters { + if i.ReadOnly { + c++ + } + } + r.mtx.RUnlock() + return c +} + // Operation describes which instances can be included in the replica set, based on their state. // // Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states. diff --git a/vendor/github.com/grafana/dskit/ring/ring.pb.go b/vendor/github.com/grafana/dskit/ring/ring.pb.go index fb9d76c480a5..f976b7e994d8 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.pb.go +++ b/vendor/github.com/grafana/dskit/ring/ring.pb.go @@ -128,6 +128,15 @@ type InstanceDesc struct { RegisteredTimestamp int64 `protobuf:"varint,8,opt,name=registered_timestamp,json=registeredTimestamp,proto3" json:"registered_timestamp,omitempty"` // ID of the instance. This value is the same as the key in the ingesters map in Desc. Id string `protobuf:"bytes,9,opt,name=id,proto3" json:"id,omitempty"` + // Unix timestamp (with seconds precision) of when the read_only flag was updated. This + // is used to find other instances that could have possibly owned a specific token in + // the past on the write path, due to *this* instance being read-only. This value should + // only increase. + ReadOnlyUpdatedTimestamp int64 `protobuf:"varint,10,opt,name=read_only_updated_timestamp,json=readOnlyUpdatedTimestamp,proto3" json:"read_only_updated_timestamp,omitempty"` + // Indicates whether this instance is read only. + // Read-only instances go through standard state changes, and special handling is applied to them + // during shuffle shards. + ReadOnly bool `protobuf:"varint,11,opt,name=read_only,json=readOnly,proto3" json:"read_only,omitempty"` } func (m *InstanceDesc) Reset() { *m = InstanceDesc{} } @@ -211,6 +220,20 @@ func (m *InstanceDesc) GetId() string { return "" } +func (m *InstanceDesc) GetReadOnlyUpdatedTimestamp() int64 { + if m != nil { + return m.ReadOnlyUpdatedTimestamp + } + return 0 +} + +func (m *InstanceDesc) GetReadOnly() bool { + if m != nil { + return m.ReadOnly + } + return false +} + func init() { proto.RegisterEnum("ring.InstanceState", InstanceState_name, InstanceState_value) proto.RegisterType((*Desc)(nil), "ring.Desc") @@ -221,35 +244,37 @@ func init() { func init() { proto.RegisterFile("ring.proto", fileDescriptor_26381ed67e202a6e) } var fileDescriptor_26381ed67e202a6e = []byte{ - // 435 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0x41, 0x8b, 0xd3, 0x40, - 0x1c, 0xc5, 0xe7, 0x9f, 0x4c, 0xb3, 0xe9, 0xbf, 0x6e, 0x09, 0xb3, 0x22, 0x71, 0x91, 0x31, 0xec, - 0x29, 0x0a, 0x76, 0xb1, 0x7a, 0x10, 0xc1, 0xc3, 0xae, 0x1b, 0x25, 0xa5, 0xd4, 0x25, 0x96, 0xbd, - 0x4a, 0xda, 0x8c, 0x31, 0xac, 0x4d, 0x96, 0x64, 0x2a, 0xac, 0x27, 0x3f, 0x82, 0x5f, 0xc0, 0xbb, - 0x1f, 0x65, 0x8f, 0x3d, 0xee, 0x49, 0x6c, 0x0a, 0xe2, 0x71, 0x3f, 0x82, 0xcc, 0xa4, 0x5a, 0x7b, - 0x7b, 0xbf, 0xbc, 0x37, 0xef, 0x4d, 0x60, 0x10, 0xcb, 0x2c, 0x4f, 0x7b, 0x17, 0x65, 0x21, 0x0b, - 0x46, 0x95, 0xde, 0x7f, 0x94, 0x66, 0xf2, 0xc3, 0x7c, 0xd2, 0x9b, 0x16, 0xb3, 0xc3, 0xb4, 0x48, - 0x8b, 0x43, 0x6d, 0x4e, 0xe6, 0xef, 0x35, 0x69, 0xd0, 0xaa, 0x39, 0x74, 0xf0, 0x0d, 0x90, 0x9e, - 0x88, 0x6a, 0xca, 0x5e, 0x60, 0x3b, 0xcb, 0x53, 0x51, 0x49, 0x51, 0x56, 0x2e, 0x78, 0xa6, 0xdf, - 0xe9, 0xdf, 0xed, 0xe9, 0x76, 0x65, 0xf7, 0xc2, 0xbf, 0x5e, 0x90, 0xcb, 0xf2, 0xf2, 0x98, 0x5e, - 0xfd, 0xb8, 0x4f, 0xa2, 0xcd, 0x89, 0xfd, 0x53, 0xec, 0x6e, 0x47, 0x98, 0x83, 0xe6, 0xb9, 0xb8, - 0x74, 0xc1, 0x03, 0xbf, 0x1d, 0x29, 0xc9, 0x7c, 0x6c, 0x7d, 0x8a, 0x3f, 0xce, 0x85, 0x6b, 0x78, - 0xe0, 0x77, 0xfa, 0xac, 0xa9, 0x0f, 0xf3, 0x4a, 0xc6, 0xf9, 0x54, 0xa8, 0x99, 0xa8, 0x09, 0x3c, - 0x37, 0x9e, 0xc1, 0x80, 0xda, 0x86, 0x63, 0x1e, 0xfc, 0x02, 0xbc, 0xf5, 0x7f, 0x82, 0x31, 0xa4, - 0x71, 0x92, 0x94, 0xeb, 0x5e, 0xad, 0xd9, 0x3d, 0x6c, 0xcb, 0x6c, 0x26, 0x2a, 0x19, 0xcf, 0x2e, - 0x74, 0xb9, 0x19, 0x6d, 0x3e, 0xb0, 0x07, 0xd8, 0xaa, 0x64, 0x2c, 0x85, 0x6b, 0x7a, 0xe0, 0x77, - 0xfb, 0x7b, 0xdb, 0xb3, 0x6f, 0x95, 0x15, 0x35, 0x09, 0x76, 0x07, 0x2d, 0x59, 0x9c, 0x8b, 0xbc, - 0x72, 0x2d, 0xcf, 0xf4, 0x77, 0xa3, 0x35, 0xa9, 0xd1, 0xcf, 0x45, 0x2e, 0xdc, 0x9d, 0x66, 0x54, - 0x69, 0xf6, 0x18, 0x6f, 0x97, 0x22, 0xcd, 0xd4, 0x1f, 0x8b, 0xe4, 0xdd, 0x66, 0xdf, 0xd6, 0xfb, - 0x7b, 0x1b, 0x6f, 0xfc, 0xef, 0x26, 0x5d, 0x34, 0xb2, 0xc4, 0x6d, 0xeb, 0x12, 0x23, 0x4b, 0x06, - 0xd4, 0xa6, 0x4e, 0x6b, 0x40, 0xed, 0x96, 0x63, 0x3d, 0x1c, 0xe2, 0xee, 0xd6, 0x95, 0x18, 0xa2, - 0x75, 0xf4, 0x72, 0x1c, 0x9e, 0x05, 0x0e, 0x61, 0x1d, 0xdc, 0x19, 0x06, 0x47, 0x67, 0xe1, 0xe8, - 0xb5, 0x03, 0x0a, 0x4e, 0x83, 0xd1, 0x89, 0x02, 0x43, 0xc1, 0xe0, 0x4d, 0x38, 0x52, 0x60, 0x32, - 0x1b, 0xe9, 0x30, 0x78, 0x35, 0x76, 0xe8, 0xf1, 0xd3, 0xc5, 0x92, 0x93, 0xeb, 0x25, 0x27, 0x37, - 0x4b, 0x0e, 0x5f, 0x6a, 0x0e, 0xdf, 0x6b, 0x0e, 0x57, 0x35, 0x87, 0x45, 0xcd, 0xe1, 0x67, 0xcd, - 0xe1, 0x77, 0xcd, 0xc9, 0x4d, 0xcd, 0xe1, 0xeb, 0x8a, 0x93, 0xc5, 0x8a, 0x93, 0xeb, 0x15, 0x27, - 0x13, 0x4b, 0xbf, 0x89, 0x27, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xad, 0x7a, 0xa4, 0x89, 0x56, - 0x02, 0x00, 0x00, + // 478 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0x31, 0x6f, 0xd3, 0x40, + 0x1c, 0xc5, 0x7d, 0xf6, 0xc5, 0xb5, 0xff, 0xa1, 0x91, 0x75, 0x45, 0xc8, 0xb4, 0xe8, 0xb0, 0x3a, + 0x19, 0x24, 0x52, 0x11, 0x18, 0x10, 0x52, 0x87, 0x96, 0x1a, 0xe4, 0x28, 0x4a, 0x2b, 0x13, 0xba, + 0x46, 0x4e, 0x7c, 0x18, 0xab, 0x89, 0x1d, 0xd9, 0x17, 0xa4, 0x30, 0xf1, 0x11, 0xf8, 0x02, 0xec, + 0x7c, 0x0e, 0xa6, 0x8e, 0x19, 0x3b, 0x21, 0xe2, 0x2c, 0x8c, 0xfd, 0x08, 0xe8, 0xce, 0x6d, 0xdd, + 0x6c, 0xef, 0xe5, 0xbd, 0xff, 0xef, 0xe5, 0x24, 0x03, 0xe4, 0x49, 0x1a, 0xb7, 0x67, 0x79, 0xc6, + 0x33, 0x82, 0x85, 0xde, 0x7d, 0x11, 0x27, 0xfc, 0xcb, 0x7c, 0xd4, 0x1e, 0x67, 0xd3, 0x83, 0x38, + 0x8b, 0xb3, 0x03, 0x19, 0x8e, 0xe6, 0x9f, 0xa5, 0x93, 0x46, 0xaa, 0xea, 0x68, 0xff, 0x27, 0x02, + 0x7c, 0xc2, 0x8a, 0x31, 0x39, 0x04, 0x33, 0x49, 0x63, 0x56, 0x70, 0x96, 0x17, 0x36, 0x72, 0x34, + 0xb7, 0xd9, 0x79, 0xdc, 0x96, 0x74, 0x11, 0xb7, 0xfd, 0xdb, 0xcc, 0x4b, 0x79, 0xbe, 0x38, 0xc6, + 0x97, 0x7f, 0x9e, 0x2a, 0x41, 0x7d, 0xb1, 0x7b, 0x06, 0xad, 0xcd, 0x0a, 0xb1, 0x40, 0xbb, 0x60, + 0x0b, 0x1b, 0x39, 0xc8, 0x35, 0x03, 0x21, 0x89, 0x0b, 0x8d, 0xaf, 0xe1, 0x64, 0xce, 0x6c, 0xd5, + 0x41, 0x6e, 0xb3, 0x43, 0x2a, 0xbc, 0x9f, 0x16, 0x3c, 0x4c, 0xc7, 0x4c, 0xcc, 0x04, 0x55, 0xe1, + 0xad, 0xfa, 0x06, 0x75, 0xb1, 0xa1, 0x5a, 0xda, 0xfe, 0x6f, 0x15, 0x1e, 0xdc, 0x6f, 0x10, 0x02, + 0x38, 0x8c, 0xa2, 0xfc, 0x86, 0x2b, 0x35, 0x79, 0x02, 0x26, 0x4f, 0xa6, 0xac, 0xe0, 0xe1, 0x74, + 0x26, 0xe1, 0x5a, 0x50, 0xff, 0x40, 0x9e, 0x41, 0xa3, 0xe0, 0x21, 0x67, 0xb6, 0xe6, 0x20, 0xb7, + 0xd5, 0xd9, 0xd9, 0x9c, 0xfd, 0x28, 0xa2, 0xa0, 0x6a, 0x90, 0x47, 0xa0, 0xf3, 0xec, 0x82, 0xa5, + 0x85, 0xad, 0x3b, 0x9a, 0xbb, 0x1d, 0xdc, 0x38, 0x31, 0xfa, 0x2d, 0x4b, 0x99, 0xbd, 0x55, 0x8d, + 0x0a, 0x4d, 0x5e, 0xc2, 0xc3, 0x9c, 0xc5, 0x89, 0x78, 0x31, 0x8b, 0x86, 0xf5, 0xbe, 0x21, 0xf7, + 0x77, 0xea, 0x6c, 0x70, 0xf7, 0x4f, 0x5a, 0xa0, 0x26, 0x91, 0x6d, 0x4a, 0x88, 0x9a, 0x44, 0xe4, + 0x10, 0xf6, 0x72, 0x16, 0x46, 0xc3, 0x2c, 0x9d, 0x2c, 0x86, 0xf3, 0x59, 0x14, 0xf2, 0x0d, 0x12, + 0x48, 0x92, 0x2d, 0x2a, 0xa7, 0xe9, 0x64, 0xf1, 0xa9, 0x2a, 0xd4, 0xb8, 0x3d, 0x30, 0xef, 0xce, + 0xed, 0xa6, 0x83, 0x5c, 0x23, 0x30, 0x6e, 0xcb, 0x5d, 0x6c, 0x60, 0xab, 0xd1, 0xc5, 0x46, 0xc3, + 0xd2, 0x9f, 0xf7, 0x60, 0x7b, 0xe3, 0xb9, 0x04, 0x40, 0x3f, 0x7a, 0x37, 0xf0, 0xcf, 0x3d, 0x4b, + 0x21, 0x4d, 0xd8, 0xea, 0x79, 0x47, 0xe7, 0x7e, 0xff, 0x83, 0x85, 0x84, 0x39, 0xf3, 0xfa, 0x27, + 0xc2, 0xa8, 0xc2, 0x74, 0x4f, 0xfd, 0xbe, 0x30, 0x1a, 0x31, 0x00, 0xf7, 0xbc, 0xf7, 0x03, 0x0b, + 0x1f, 0xbf, 0x5e, 0xae, 0xa8, 0x72, 0xb5, 0xa2, 0xca, 0xf5, 0x8a, 0xa2, 0xef, 0x25, 0x45, 0xbf, + 0x4a, 0x8a, 0x2e, 0x4b, 0x8a, 0x96, 0x25, 0x45, 0x7f, 0x4b, 0x8a, 0xfe, 0x95, 0x54, 0xb9, 0x2e, + 0x29, 0xfa, 0xb1, 0xa6, 0xca, 0x72, 0x4d, 0x95, 0xab, 0x35, 0x55, 0x46, 0xba, 0xfc, 0xde, 0x5e, + 0xfd, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x6a, 0x5b, 0x75, 0x81, 0xb2, 0x02, 0x00, 0x00, } func (x InstanceState) String() string { @@ -335,6 +360,12 @@ func (this *InstanceDesc) Equal(that interface{}) bool { if this.Id != that1.Id { return false } + if this.ReadOnlyUpdatedTimestamp != that1.ReadOnlyUpdatedTimestamp { + return false + } + if this.ReadOnly != that1.ReadOnly { + return false + } return true } func (this *Desc) GoString() string { @@ -363,7 +394,7 @@ func (this *InstanceDesc) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 11) + s := make([]string, 0, 13) s = append(s, "&ring.InstanceDesc{") s = append(s, "Addr: "+fmt.Sprintf("%#v", this.Addr)+",\n") s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n") @@ -372,6 +403,8 @@ func (this *InstanceDesc) GoString() string { s = append(s, "Zone: "+fmt.Sprintf("%#v", this.Zone)+",\n") s = append(s, "RegisteredTimestamp: "+fmt.Sprintf("%#v", this.RegisteredTimestamp)+",\n") s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") + s = append(s, "ReadOnlyUpdatedTimestamp: "+fmt.Sprintf("%#v", this.ReadOnlyUpdatedTimestamp)+",\n") + s = append(s, "ReadOnly: "+fmt.Sprintf("%#v", this.ReadOnly)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -450,6 +483,21 @@ func (m *InstanceDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.ReadOnly { + i-- + if m.ReadOnly { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x58 + } + if m.ReadOnlyUpdatedTimestamp != 0 { + i = encodeVarintRing(dAtA, i, uint64(m.ReadOnlyUpdatedTimestamp)) + i-- + dAtA[i] = 0x50 + } if len(m.Id) > 0 { i -= len(m.Id) copy(dAtA[i:], m.Id) @@ -570,6 +618,12 @@ func (m *InstanceDesc) Size() (n int) { if l > 0 { n += 1 + l + sovRing(uint64(l)) } + if m.ReadOnlyUpdatedTimestamp != 0 { + n += 1 + sovRing(uint64(m.ReadOnlyUpdatedTimestamp)) + } + if m.ReadOnly { + n += 2 + } return n } @@ -611,6 +665,8 @@ func (this *InstanceDesc) String() string { `Zone:` + fmt.Sprintf("%v", this.Zone) + `,`, `RegisteredTimestamp:` + fmt.Sprintf("%v", this.RegisteredTimestamp) + `,`, `Id:` + fmt.Sprintf("%v", this.Id) + `,`, + `ReadOnlyUpdatedTimestamp:` + fmt.Sprintf("%v", this.ReadOnlyUpdatedTimestamp) + `,`, + `ReadOnly:` + fmt.Sprintf("%v", this.ReadOnly) + `,`, `}`, }, "") return s @@ -1063,6 +1119,45 @@ func (m *InstanceDesc) Unmarshal(dAtA []byte) error { } m.Id = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReadOnlyUpdatedTimestamp", wireType) + } + m.ReadOnlyUpdatedTimestamp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRing + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ReadOnlyUpdatedTimestamp |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReadOnly", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRing + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.ReadOnly = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRing(dAtA[iNdEx:]) diff --git a/vendor/github.com/grafana/dskit/ring/ring.proto b/vendor/github.com/grafana/dskit/ring/ring.proto index 08bf885096cb..7795e8493fc3 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.proto +++ b/vendor/github.com/grafana/dskit/ring/ring.proto @@ -44,6 +44,17 @@ message InstanceDesc { // ID of the instance. This value is the same as the key in the ingesters map in Desc. string id = 9; + + // Unix timestamp (with seconds precision) of when the read_only flag was updated. This + // is used to find other instances that could have possibly owned a specific token in + // the past on the write path, due to *this* instance being read-only. This value should + // only increase. + int64 read_only_updated_timestamp = 10; + + // Indicates whether this instance is read only. + // Read-only instances go through standard state changes, and special handling is applied to them + // during shuffle shards. + bool read_only = 11; } enum InstanceState { diff --git a/vendor/github.com/grafana/dskit/ring/ring_http.go b/vendor/github.com/grafana/dskit/ring/ring_http.go index 7300430ddac1..67249e2b4967 100644 --- a/vendor/github.com/grafana/dskit/ring/ring_http.go +++ b/vendor/github.com/grafana/dskit/ring/ring_http.go @@ -24,9 +24,9 @@ var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.F if t.IsZero() { return "" } - return t.Format(time.RFC3339Nano) + return t.Format(time.RFC3339) }, - "durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Millisecond).String() }, + "durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Second).String() }, }).Parse(defaultPageContent)) type httpResponse struct { @@ -36,15 +36,17 @@ type httpResponse struct { } type ingesterDesc struct { - ID string `json:"id"` - State string `json:"state"` - Address string `json:"address"` - HeartbeatTimestamp time.Time `json:"timestamp"` - RegisteredTimestamp time.Time `json:"registered_timestamp"` - Zone string `json:"zone"` - Tokens []uint32 `json:"tokens"` - NumTokens int `json:"-"` - Ownership float64 `json:"-"` + ID string `json:"id"` + State string `json:"state"` + Address string `json:"address"` + HeartbeatTimestamp time.Time `json:"timestamp"` + RegisteredTimestamp time.Time `json:"registered_timestamp"` + ReadOnly bool `json:"read_only"` + ReadOnlyUpdatedTimestamp time.Time `json:"read_only_updated_timestamp"` + Zone string `json:"zone"` + Tokens []uint32 `json:"tokens"` + NumTokens int `json:"-"` + Ownership float64 `json:"-"` } type ringAccess interface { @@ -110,16 +112,20 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { state = "UNHEALTHY" } + ro, rots := ing.GetReadOnlyState() + ingesters = append(ingesters, ingesterDesc{ - ID: id, - State: state, - Address: ing.Addr, - HeartbeatTimestamp: time.Unix(ing.Timestamp, 0).UTC(), - RegisteredTimestamp: ing.GetRegisteredAt().UTC(), - Tokens: ing.Tokens, - Zone: ing.Zone, - NumTokens: len(ing.Tokens), - Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, + ID: id, + State: state, + Address: ing.Addr, + HeartbeatTimestamp: time.Unix(ing.Timestamp, 0).UTC(), + RegisteredTimestamp: ing.GetRegisteredAt().UTC(), + ReadOnly: ro, + ReadOnlyUpdatedTimestamp: rots.UTC(), + Tokens: ing.Tokens, + Zone: ing.Zone, + NumTokens: len(ing.Tokens), + Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, }) } diff --git a/vendor/github.com/grafana/dskit/ring/ring_status.gohtml b/vendor/github.com/grafana/dskit/ring/ring_status.gohtml index 80e5b6a8f769..5270b457c625 100644 --- a/vendor/github.com/grafana/dskit/ring/ring_status.gohtml +++ b/vendor/github.com/grafana/dskit/ring/ring_status.gohtml @@ -18,6 +18,8 @@ State Address Registered At + Read-Only + Read-Only Updated Last Heartbeat Tokens Ownership @@ -36,6 +38,8 @@ {{ .State }} {{ .Address }} {{ .RegisteredTimestamp | timeOrEmptyString }} + {{ .ReadOnly }} + {{ .ReadOnlyUpdatedTimestamp | timeOrEmptyString }} {{ .HeartbeatTimestamp | durationSince }} ago ({{ .HeartbeatTimestamp.Format "15:04:05.999" }}) {{ .NumTokens }} {{ .Ownership | humanFloat }}% @@ -66,4 +70,4 @@ {{ end }} - \ No newline at end of file + diff --git a/vendor/github.com/grafana/dskit/ring/shard/shard.go b/vendor/github.com/grafana/dskit/ring/shard/shard.go index 1d70eb6283b6..bbc966707313 100644 --- a/vendor/github.com/grafana/dskit/ring/shard/shard.go +++ b/vendor/github.com/grafana/dskit/ring/shard/shard.go @@ -30,6 +30,9 @@ func ShuffleShardSeed(identifier, zone string) int64 { // zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible // by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up. func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int { + if shardSize == math.MaxInt { + return math.MaxInt + } return int(math.Ceil(float64(shardSize) / float64(numZones))) } @@ -41,5 +44,5 @@ func ShuffleShardExpectedInstances(shardSize, numZones int) int { // yoloBuf will return an unsafe pointer to a string, as the name yolo.yoloBuf implies use at your own risk. func yoloBuf(s string) []byte { - return *((*[]byte)(unsafe.Pointer(&s))) + return unsafe.Slice(unsafe.StringData(s), len(s)) } diff --git a/vendor/github.com/grafana/dskit/services/basic_service.go b/vendor/github.com/grafana/dskit/services/basic_service.go index 185a4a10a23b..6ef464e5c23f 100644 --- a/vendor/github.com/grafana/dskit/services/basic_service.go +++ b/vendor/github.com/grafana/dskit/services/basic_service.go @@ -3,7 +3,10 @@ package services import ( "context" "fmt" + "slices" "sync" + + "go.uber.org/atomic" ) // StartingFn is called when service enters Starting state. If StartingFn returns @@ -325,26 +328,59 @@ func (b *BasicService) State() State { } // AddListener is part of Service interface. -func (b *BasicService) AddListener(listener Listener) { +func (b *BasicService) AddListener(listener Listener) func() { b.stateMu.Lock() defer b.stateMu.Unlock() if b.state == Terminated || b.state == Failed { // no more state transitions will be done, and channel wouldn't get closed - return + return func() {} } // There are max 4 state transitions. We use buffer to avoid blocking the sender, // which holds service lock. - ch := make(chan func(l Listener), 4) - b.listeners = append(b.listeners, ch) + listenerCh := make(chan func(l Listener), 4) + b.listeners = append(b.listeners, listenerCh) + + stop := make(chan struct{}) + stopClosed := atomic.NewBool(false) + + wg := sync.WaitGroup{} + wg.Add(1) // each listener has its own goroutine, processing events. go func() { - for lfn := range ch { - lfn(listener) + defer wg.Done() + for { + select { + // Process events from service. + case lfn, ok := <-listenerCh: + if !ok { + return + } + lfn(listener) + + case <-stop: + return + } } }() + + return func() { + if stopClosed.CompareAndSwap(false, true) { + // Tell listener goroutine to stop. + close(stop) + } + + // Remove channel for notifications from service's list of listeners. + b.stateMu.Lock() + b.listeners = slices.DeleteFunc(b.listeners, func(c chan func(l Listener)) bool { + return listenerCh == c + }) + b.stateMu.Unlock() + + wg.Wait() + } } // lock must be held here. Read lock would be good enough, but since diff --git a/vendor/github.com/grafana/dskit/services/failure_watcher.go b/vendor/github.com/grafana/dskit/services/failure_watcher.go index 657656f50d47..25c59d24b25e 100644 --- a/vendor/github.com/grafana/dskit/services/failure_watcher.go +++ b/vendor/github.com/grafana/dskit/services/failure_watcher.go @@ -1,16 +1,22 @@ package services import ( + "sync" + "github.com/pkg/errors" ) var ( errFailureWatcherNotInitialized = errors.New("FailureWatcher has not been initialized") + errFailureWatcherClosed = errors.New("FailureWatcher has been stopped") ) // FailureWatcher waits for service failures, and passed them to the channel. type FailureWatcher struct { - ch chan error + mu sync.Mutex + ch chan error + closed bool + unregisterListeners []func() } func NewFailureWatcher() *FailureWatcher { @@ -35,9 +41,17 @@ func (w *FailureWatcher) WatchService(service Service) { panic(errFailureWatcherNotInitialized) } - service.AddListener(NewListener(nil, nil, nil, nil, func(_ State, failure error) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + panic(errFailureWatcherClosed) + } + + stop := service.AddListener(NewListener(nil, nil, nil, nil, func(_ State, failure error) { w.ch <- errors.Wrapf(failure, "service %s failed", DescribeService(service)) })) + w.unregisterListeners = append(w.unregisterListeners, stop) } func (w *FailureWatcher) WatchManager(manager *Manager) { @@ -47,7 +61,40 @@ func (w *FailureWatcher) WatchManager(manager *Manager) { panic(errFailureWatcherNotInitialized) } - manager.AddListener(NewManagerListener(nil, nil, func(service Service) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + panic(errFailureWatcherClosed) + } + + stop := manager.AddListener(NewManagerListener(nil, nil, func(service Service) { w.ch <- errors.Wrapf(service.FailureCase(), "service %s failed", DescribeService(service)) })) + w.unregisterListeners = append(w.unregisterListeners, stop) +} + +// Close stops this failure watcher and closes channel returned by Chan() method. After closing failure watcher, +// it cannot be used to watch additional services or managers. +// Repeated calls to Close() do nothing. +func (w *FailureWatcher) Close() { + // Graceful handle the case FailureWatcher has not been initialized, + // to simplify the code in the components using it. + if w == nil { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return + } + for _, stop := range w.unregisterListeners { + stop() + } + + // All listeners are now stopped, and can't receive more notifications. We can close the channel. + close(w.ch) + w.closed = true } diff --git a/vendor/github.com/grafana/dskit/services/manager.go b/vendor/github.com/grafana/dskit/services/manager.go index da5ebf7331ba..7f02c0dc9c97 100644 --- a/vendor/github.com/grafana/dskit/services/manager.go +++ b/vendor/github.com/grafana/dskit/services/manager.go @@ -4,7 +4,10 @@ import ( "context" "errors" "fmt" + "slices" "sync" + + "go.uber.org/atomic" ) type managerState int @@ -31,6 +34,11 @@ type ManagerListener interface { // Manager can start them, and observe their state as a group. // Once all services are running, Manager is said to be Healthy. It is possible for manager to never reach the Healthy state, if some services fail to start. // When all services are stopped (Terminated or Failed), manager is Stopped. +// +// Note: Manager's state is defined by state of services. Services can be started outside of Manager and if all become Running, Manager will be Healthy as well. +// +// Note: Creating a manager immediately installs listeners to all services (to compute manager's state), which may start goroutines. +// To avoid leaking goroutines, make sure to eventually stop all services or the manager (which stops services), even if manager wasn't explicitly started. type Manager struct { services []Service @@ -226,25 +234,61 @@ func (m *Manager) serviceStateChanged(s Service, from State, to State) { // Specifically, a given listener will have its callbacks invoked in the same order as the underlying service enters those states. // Additionally, at most one of the listener's callbacks will execute at once. // However, multiple listeners' callbacks may execute concurrently, and listeners may execute in an order different from the one in which they were registered. -func (m *Manager) AddListener(listener ManagerListener) { +// +// Returned function can be used to stop the listener and free resources used by it (e.g. goroutine). +func (m *Manager) AddListener(listener ManagerListener) func() { m.mu.Lock() defer m.mu.Unlock() if m.state == stopped { // no need to register listener, as no more events will be sent - return + return func() {} } // max number of events is: failed notification for each service + healthy + stopped. // we use buffer to avoid blocking the sender, which holds the manager's lock. - ch := make(chan func(l ManagerListener), len(m.services)+2) - m.listeners = append(m.listeners, ch) + listenerCh := make(chan func(l ManagerListener), len(m.services)+2) + m.listeners = append(m.listeners, listenerCh) + + stop := make(chan struct{}) + stopClosed := atomic.NewBool(false) + + wg := sync.WaitGroup{} + wg.Add(1) + // each listener has its own goroutine, processing events. go func() { - for fn := range ch { - fn(listener) + defer wg.Done() + for { + select { + // Process events from service. + case fn, ok := <-listenerCh: + if !ok { + return + } + fn(listener) + + case <-stop: + return + } } }() + + return func() { + if stopClosed.CompareAndSwap(false, true) { + // Tell listener goroutine to stop. + close(stop) + } + + // Remove channel for notifications from manager's list of listeners. + m.mu.Lock() + m.listeners = slices.DeleteFunc(m.listeners, func(c chan func(listener ManagerListener)) bool { + return listenerCh == c + }) + m.mu.Unlock() + + wg.Wait() + } } // called with lock diff --git a/vendor/github.com/grafana/dskit/services/service.go b/vendor/github.com/grafana/dskit/services/service.go index e3de5a09cf6e..a4c0e9348776 100644 --- a/vendor/github.com/grafana/dskit/services/service.go +++ b/vendor/github.com/grafana/dskit/services/service.go @@ -91,7 +91,10 @@ type Service interface { // as the service enters those states. Additionally, at most one of the listener's callbacks will execute // at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute // in an order different from the one in which they were registered. - AddListener(listener Listener) + // + // Returned function can be used to stop the listener from receiving additional events from the service, + // and release resources used by the listener (e.g. goroutine, if it was started by adding listener). + AddListener(listener Listener) func() } // NamedService extends Service with a name. diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 70c86d16d85d..8daad995c95c 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -2,6 +2,8 @@ package spanlogger import ( "context" + "runtime" + "strings" "go.uber.org/atomic" // Really just need sync/atomic but there is a lint rule preventing it. @@ -160,6 +162,10 @@ func (s *SpanLogger) getLogger() log.Logger { if ok { logger = log.With(logger, "trace_id", traceID) } + + // Replace the default valuer for the 'caller' attribute with one that gets the caller of the methods in this file. + logger = log.With(logger, "caller", spanLoggerAwareCaller()) + // If the value has been set by another goroutine, fetch that other value and discard the one we made. if !s.logger.CompareAndSwap(nil, &logger) { pLogger := s.logger.Load() @@ -181,3 +187,47 @@ func (s *SpanLogger) SetSpanAndLogTag(key string, value interface{}) { wrappedLogger := log.With(logger, key, value) s.logger.Store(&wrappedLogger) } + +// spanLoggerAwareCaller is like log.Caller, but ensures that the caller information is +// that of the caller to SpanLogger, not SpanLogger itself. +func spanLoggerAwareCaller() log.Valuer { + valuer := atomic.NewPointer[log.Valuer](nil) + + return func() interface{} { + // If we've already determined the correct stack depth, use it. + existingValuer := valuer.Load() + if existingValuer != nil { + return (*existingValuer)() + } + + // We haven't been called before, determine the correct stack depth to + // skip the configured logger's internals and the SpanLogger's internals too. + // + // Note that we can't do this in spanLoggerAwareCaller() directly because we + // need to do this when invoked by the configured logger - otherwise we cannot + // measure the stack depth of the logger's internals. + + stackDepth := 3 // log.DefaultCaller uses a stack depth of 3, so start searching for the correct stack depth there. + + for { + _, file, _, ok := runtime.Caller(stackDepth) + if !ok { + // We've run out of possible stack frames. Give up. + valuer.Store(&unknownCaller) + return unknownCaller() + } + + if strings.HasSuffix(file, "spanlogger/spanlogger.go") { + stackValuer := log.Caller(stackDepth + 2) // Add one to skip the stack frame for the SpanLogger method, and another to skip the stack frame for the valuer which we'll invoke below. + valuer.Store(&stackValuer) + return stackValuer() + } + + stackDepth++ + } + } +} + +var unknownCaller log.Valuer = func() interface{} { + return "" +} diff --git a/vendor/github.com/grafana/dskit/tenant/resolver.go b/vendor/github.com/grafana/dskit/tenant/resolver.go index 35e95b1c8318..9a01d6322c9f 100644 --- a/vendor/github.com/grafana/dskit/tenant/resolver.go +++ b/vendor/github.com/grafana/dskit/tenant/resolver.go @@ -16,7 +16,18 @@ import ( // //nolint:revive func TenantID(ctx context.Context) (string, error) { - orgIDs, err := TenantIDs(ctx) + //lint:ignore faillint wrapper around upstream method + orgID, err := user.ExtractOrgID(ctx) + if err != nil { + return "", err + } + if !strings.Contains(orgID, tenantIDsSeparator) { + if err := ValidTenantID(orgID); err != nil { + return "", err + } + return orgID, nil + } + orgIDs, err := tenantIDsFromString(orgID) if err != nil { return "", err } @@ -42,6 +53,10 @@ func TenantIDs(ctx context.Context) ([]string, error) { return nil, err } + return tenantIDsFromString(orgID) +} + +func tenantIDsFromString(orgID string) ([]string, error) { orgIDs := strings.Split(orgID, tenantIDsSeparator) for _, id := range orgIDs { if err := ValidTenantID(id); err != nil { diff --git a/vendor/github.com/grafana/dskit/tracing/tracing.go b/vendor/github.com/grafana/dskit/tracing/tracing.go index 66b3a3cef4cc..1882a081dfd4 100644 --- a/vendor/github.com/grafana/dskit/tracing/tracing.go +++ b/vendor/github.com/grafana/dskit/tracing/tracing.go @@ -55,16 +55,30 @@ func NewFromEnv(serviceName string, options ...jaegercfg.Option) (io.Closer, err // ExtractTraceID extracts the trace id, if any from the context. func ExtractTraceID(ctx context.Context) (string, bool) { + if tid, _, ok := extractJaegerContext(ctx); ok { + return tid.String(), true + } + return "", false +} + +// ExtractTraceSpanID extracts the trace id, span id if any from the context. +func ExtractTraceSpanID(ctx context.Context) (string, string, bool) { + if tid, sid, ok := extractJaegerContext(ctx); ok { + return tid.String(), sid.String(), true + } + return "", "", false +} + +func extractJaegerContext(ctx context.Context) (tid jaeger.TraceID, sid jaeger.SpanID, success bool) { sp := opentracing.SpanFromContext(ctx) if sp == nil { - return "", false + return } - sctx, ok := sp.Context().(jaeger.SpanContext) + jsp, ok := sp.Context().(jaeger.SpanContext) if !ok { - return "", false + return } - - return sctx.TraceID().String(), true + return jsp.TraceID(), jsp.SpanID(), true } // ExtractSampledTraceID works like ExtractTraceID but the returned bool is only diff --git a/vendor/github.com/grafana/pyroscope-go/godeltaprof/internal/pprof/stub.go b/vendor/github.com/grafana/pyroscope-go/godeltaprof/internal/pprof/stub.go index c617015ecd3c..41dee8b7c5da 100644 --- a/vendor/github.com/grafana/pyroscope-go/godeltaprof/internal/pprof/stub.go +++ b/vendor/github.com/grafana/pyroscope-go/godeltaprof/internal/pprof/stub.go @@ -1,6 +1,3 @@ -//go:build go1.16 && !go1.23 -// +build go1.16,!go1.23 - package pprof // unsafe is required for go:linkname diff --git a/vendor/modules.txt b/vendor/modules.txt index 34745b9f1361..8e3c23db1be0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -971,8 +971,8 @@ github.com/gorilla/websocket # github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 ## explicit; go 1.17 github.com/grafana/cloudflare-go -# github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d -## explicit; go 1.20 +# github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 +## explicit; go 1.21 github.com/grafana/dskit/aws github.com/grafana/dskit/backoff github.com/grafana/dskit/cancellation @@ -988,7 +988,6 @@ github.com/grafana/dskit/grpcutil github.com/grafana/dskit/httpgrpc github.com/grafana/dskit/httpgrpc/server github.com/grafana/dskit/instrument -github.com/grafana/dskit/internal/math github.com/grafana/dskit/internal/slices github.com/grafana/dskit/kv github.com/grafana/dskit/kv/codec @@ -1029,7 +1028,7 @@ github.com/grafana/jsonparser # github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 => ./pkg/push ## explicit; go 1.19 github.com/grafana/loki/pkg/push -# github.com/grafana/pyroscope-go/godeltaprof v0.1.6 +# github.com/grafana/pyroscope-go/godeltaprof v0.1.7 ## explicit; go 1.16 github.com/grafana/pyroscope-go/godeltaprof github.com/grafana/pyroscope-go/godeltaprof/http/pprof