From 7979cfbe076da067ab4057b513f1df7502eb4404 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Thu, 6 Oct 2022 16:12:38 -0600 Subject: [PATCH] Implement Distributor Rate Store (#7349) The `RateStore` concurrently queries all ingester `GetStreamRate` APIs and stores the results. `RateStore` deduplicates replicas and combines sharded streams so a caller of `RateFor` gets an unsharded view of a stream's rate. --- docs/sources/configuration/_index.md | 21 ++- pkg/distributor/distributor.go | 52 +++--- pkg/distributor/distributor_test.go | 28 +++- pkg/distributor/ratestore.go | 231 +++++++++++++++++++++++++-- pkg/distributor/ratestore_test.go | 209 ++++++++++++++++++++++++ pkg/ingester/client/client.go | 38 +++-- pkg/ingester/ingester.go | 51 ++++-- pkg/loki/loki.go | 1 + 8 files changed, 562 insertions(+), 69 deletions(-) create mode 100644 pkg/distributor/ratestore_test.go diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 70d28b98fd07..c1096a4330e6 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -314,10 +314,23 @@ ring: # The CLI flags prefix for this block config is: distributor.ring [etcd: ] - # The heartbeat timeout after which ingesters are skipped for - # reading and writing. - # CLI flag: -distributor.ring.heartbeat-timeout - [heartbeat_timeout: | default = 1m] + # The heartbeat timeout after which ingesters are skipped for + # reading and writing. + # CLI flag: -distributor.ring.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + rate_store: + # The max number of concurrent requests to make to ingester stream apis + # CLI flag: -distributor.rate-store.max-request-parallelism + [max_request_parallelism: | default = 200] + # The interval on which distributors will update current stream rates + # from ingesters + # CLI flag: -distributor.rate-store.stream-rate-update-interval + [stream_rate_update_interval: | default = 1s] + # Timeout for communication between distributors and ingesters when updating + # rates + # CLI flag: -distributor.rate-store.ingester-request-timeout + [ingester_request_timeout: | default = 1s] ``` ## querier diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f75fdf5a957f..b05df30aa67b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -58,16 +58,19 @@ type Config struct { // For testing. factory ring_client.PoolFactory `yaml:"-"` + + RateStore RateStoreConfig `yaml:"rate_store"` } // RegisterFlags registers distributor-related flags. func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.DistributorRing.RegisterFlags(fs) + cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs) } // RateStore manages the ingestion rate of streams, populated by data fetched from ingesters. type RateStore interface { - RateFor(stream *logproto.Stream) (int, error) + RateFor(streamHash uint64) int64 } // Distributor coordinates replicates and distribution of log streams. @@ -118,6 +121,12 @@ func New( } } + internalFactory := func(addr string) (ring_client.PoolClient, error) { + internalCfg := clientCfg + internalCfg.Internal = true + return client.New(internalCfg, addr) + } + validator, err := NewValidator(overrides) if err != nil { return nil, err @@ -150,6 +159,7 @@ func New( if err != nil { return nil, err } + d := Distributor{ cfg: cfg, clientCfg: clientCfg, @@ -188,7 +198,21 @@ func New( d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor())) rfStats.Set(int64(ingestersRing.ReplicationFactor())) - servs = append(servs, d.pool) + rs := NewRateStore( + d.cfg.RateStore, + ingestersRing, + clientpool.NewPool( + clientCfg.PoolConfig, + ingestersRing, + internalFactory, + util_log.Logger, + ), + overrides, + registerer, + ) + d.rateStore = rs + + servs = append(servs, d.pool, rs) d.subservices, err = services.NewManager(servs...) if err != nil { return nil, errors.Wrap(err, "services manager") @@ -197,8 +221,6 @@ func New( d.subservicesWatcher.WatchManager(d.subservices) d.Service = services.NewBasicService(d.starting, d.running, d.stopping) - d.rateStore = &noopRateStore{} - return &d, nil } @@ -393,7 +415,7 @@ func min(x1, x2 int) int { func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID string) ([]uint32, []streamTracker) { shardStreamsCfg := d.validator.Limits.ShardStreams(userID) logger := log.With(util_log.WithUserID(userID, util_log.Logger), "stream", stream.Labels) - shardCount := d.shardCountFor(logger, &stream, streamSize, d.rateStore, shardStreamsCfg) + shardCount := d.shardCountFor(logger, &stream, streamSize, shardStreamsCfg) if shardCount <= 1 { return []uint32{util.TokenFor(userID, stream.Labels)}, []streamTracker{{stream: stream}} @@ -442,8 +464,8 @@ func labelTemplate(lbls string) labels.Labels { return streamLabels } -func (d *Distributor) createShard(shardStreamsCfg *shardstreams.Config, stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) { - lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber, shardStreamsCfg.LoggingEnabled) +func (d *Distributor) createShard(streamshardCfg *shardstreams.Config, stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) { + lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber, streamshardCfg.LoggingEnabled) if !ok { return logproto.Stream{}, false } @@ -580,7 +602,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, // based on the rate stored in the rate store and will store the new evaluated number of shards. // // desiredRate is expected to be given in bytes. -func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, streamSize int, rateStore RateStore, streamShardcfg *shardstreams.Config) int { +func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, streamSize int, streamShardcfg *shardstreams.Config) int { if streamShardcfg.DesiredRate.Val() <= 0 { if streamShardcfg.LoggingEnabled { level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", streamShardcfg.DesiredRate.String()) @@ -588,15 +610,7 @@ func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, return 1 } - rate, err := rateStore.RateFor(stream) - if err != nil { - d.streamShardingFailures.WithLabelValues("rate_not_found").Inc() - if streamShardcfg.LoggingEnabled { - level.Error(logger).Log("msg", "couldn't shard stream because rate store returned error", "err", err) - } - return 1 - } - + rate := d.rateStore.RateFor(stream.Hash) shards := calculateShards(rate, streamSize, streamShardcfg.DesiredRate.Val()) if shards > len(stream.Entries) { d.streamShardingFailures.WithLabelValues("too_many_shards").Inc() @@ -613,8 +627,8 @@ func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, return shards } -func calculateShards(rate, streamSize, desiredRate int) int { - shards := float64((rate + streamSize)) / float64(desiredRate) +func calculateShards(rate int64, streamSize, desiredRate int) int { + shards := float64(rate+int64(streamSize)) / float64(desiredRate) if shards <= 1 { return 1 } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index e802b2021523..3e87f8d4bebb 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -683,7 +683,7 @@ func TestStreamShard(t *testing.T) { require.NoError(t, err) d := Distributor{ - rateStore: &noopRateStore{}, + rateStore: &fakeRateStore{}, streamShardingFailures: shardingFailureMetric, validator: validator, } @@ -728,7 +728,7 @@ func BenchmarkShardStream(b *testing.B) { distributorBuilder := func(shards int) *Distributor { d := &Distributor{streamShardingFailures: shardingFailureMetric} // streamSize is always zero, so number of shards will be dictated just by the rate returned from store. - d.rateStore = &noopRateStore{rate: desiredRate*shards - 1} + d.rateStore = &fakeRateStore{rate: int64(desiredRate*shards - 1)} return d } @@ -824,7 +824,7 @@ func TestShardCalculation(t *testing.T) { for _, tc := range []struct { name string streamSize int - rate int + rate int64 wantShards int }{ @@ -837,7 +837,7 @@ func TestShardCalculation(t *testing.T) { { name: "enough data to have two shards, stream size (1mb) + ingested rate (4mb) > 3mb", streamSize: 1 * megabyte, - rate: desiredRate + 1, + rate: int64(desiredRate + 1), wantShards: 2, }, { @@ -849,7 +849,7 @@ func TestShardCalculation(t *testing.T) { { name: "a lot of shards, stream size (1mb) + ingested rate (300mb) > 3mb", streamSize: 1 * megabyte, - rate: 300 * megabyte, + rate: int64(300 * megabyte), wantShards: 101, }, } { @@ -874,7 +874,7 @@ func TestShardCountFor(t *testing.T) { for _, tc := range []struct { name string stream *logproto.Stream - rate int + rate int64 desiredRate loki_flagext.ByteSize wantStreamSize int // used for sanity check. @@ -958,8 +958,9 @@ func TestShardCountFor(t *testing.T) { d := &Distributor{ streamShardingFailures: shardingFailureMetric, + rateStore: &fakeRateStore{tc.rate}, } - got := d.shardCountFor(util_log.Logger, tc.stream, tc.wantStreamSize, &noopRateStore{tc.rate}, limits.ShardStreams) + got := d.shardCountFor(util_log.Logger, tc.stream, tc.wantStreamSize, limits.ShardStreams) require.Equal(t, tc.wantShards, got) }) } @@ -1194,6 +1195,7 @@ func makeWriteRequest(lines, size int) *logproto.PushRequest { type mockIngester struct { grpc_health_v1.HealthClient logproto.PusherClient + logproto.StreamDataClient failAfter time.Duration succeedAfter time.Duration @@ -1217,6 +1219,18 @@ func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts return nil, nil } +func (i *mockIngester) GetStreamRates(ctx context.Context, in *logproto.StreamRatesRequest, opts ...grpc.CallOption) (*logproto.StreamRatesResponse, error) { + return &logproto.StreamRatesResponse{}, nil +} + func (i *mockIngester) Close() error { return nil } + +type fakeRateStore struct { + rate int64 +} + +func (s *fakeRateStore) RateFor(_ uint64) int64 { + return s.rate +} diff --git a/pkg/distributor/ratestore.go b/pkg/distributor/ratestore.go index 81d8ac2cd312..7315c6882d4b 100644 --- a/pkg/distributor/ratestore.go +++ b/pkg/distributor/ratestore.go @@ -1,25 +1,234 @@ package distributor import ( - "fmt" + "context" + "flag" + "sync" + "time" + + "github.com/grafana/loki/pkg/validation" + + "github.com/weaveworks/common/instrument" + + "github.com/grafana/dskit/services" + + "github.com/go-kit/log/level" + + util_log "github.com/grafana/loki/pkg/util/log" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/ring/client" "github.com/grafana/loki/pkg/logproto" ) -type unshardableStreamErr struct { - labels string - entriesNum int - shardNum int +type poolClientFactory interface { + GetClientFor(addr string) (client.PoolClient, error) +} + +type RateStoreConfig struct { + MaxParallelism int `yaml:"max_request_parallelism"` + StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"` + IngesterReqTimeout time.Duration `yaml:"ingester_request_timeout"` +} + +func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) { + fs.IntVar(&cfg.MaxParallelism, prefix+".max-request-parallelism", 200, "The max number of concurrent requests to make to ingester stream apis") + fs.DurationVar(&cfg.StreamRateUpdateInterval, prefix+".stream-rate-update-interval", time.Second, "The interval on which distributors will update current stream rates from ingesters") + fs.DurationVar(&cfg.IngesterReqTimeout, prefix+".ingester-request-timeout", time.Second, "Timeout for communication between distributors and ingesters when updating rates") +} + +type ingesterClient struct { + addr string + client logproto.StreamDataClient +} + +type overrides interface { + AllByUserID() map[string]*validation.Limits +} + +type rateStore struct { + services.Service + + ring ring.ReadRing + clientPool poolClientFactory + rates map[uint64]int64 + rateLock sync.RWMutex + rateCollectionInterval time.Duration + ingesterTimeout time.Duration + maxParallelism int + rateRefreshFailures *prometheus.CounterVec + refreshDuration *instrument.HistogramCollector + overrides overrides +} + +func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, o overrides, registerer prometheus.Registerer) *rateStore { //nolint + s := &rateStore{ + ring: r, + clientPool: cf, + rateCollectionInterval: cfg.StreamRateUpdateInterval, + maxParallelism: cfg.MaxParallelism, + ingesterTimeout: cfg.IngesterReqTimeout, + overrides: o, + rateRefreshFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "rate_store_refresh_failures_total", + Help: "The total number of failed attempts to refresh the distributor's view of stream rates", + }, []string{"source"}), + refreshDuration: instrument.NewHistogramCollector( + promauto.With(registerer).NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "loki", + Name: "rate_store_refresh_duration_seconds", + Help: "Time spent refreshing the rate store", + Buckets: prometheus.DefBuckets, + }, instrument.HistogramCollectorBuckets, + ), + ), + } + + s.Service = services. + NewTimerService(s.rateCollectionInterval, s.instrumentedUpdateAllRates, s.instrumentedUpdateAllRates, nil). + WithName("rate store") + + return s +} + +func (s *rateStore) instrumentedUpdateAllRates(ctx context.Context) error { + if !s.anyShardingEnabled() { + return nil + } + + return instrument.CollectedRequest(ctx, "GetAllStreamRates", s.refreshDuration, instrument.ErrorCode, s.updateAllRates) +} + +func (s *rateStore) updateAllRates(ctx context.Context) error { + clients, err := s.getClients() + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting ingester clients", "err", err) + s.rateRefreshFailures.WithLabelValues("ring").Inc() + return nil // Don't fail the service because we have an error getting the clients once + } + + streamRates := s.getRates(ctx, clients) + rates := s.aggregateByShard(streamRates) + + s.rateLock.Lock() + defer s.rateLock.Unlock() + s.rates = rates + + return nil +} + +func (s *rateStore) anyShardingEnabled() bool { + limits := s.overrides.AllByUserID() + if limits == nil { + return false + } + + for _, l := range limits { + if l.ShardStreams.Enabled { + return true + } + } + + return false } -func (u *unshardableStreamErr) Error() string { - return fmt.Sprintf("couldn't shard stream %s. number of shards (%d) is higher than number of entries (%d)", u.labels, u.shardNum, u.entriesNum) +func (s *rateStore) aggregateByShard(streamRates map[uint64]*logproto.StreamRate) map[uint64]int64 { + rates := make(map[uint64]int64) + for _, sr := range streamRates { + if _, ok := rates[sr.StreamHashNoShard]; ok { + rates[sr.StreamHashNoShard] += sr.Rate + continue + } + + rates[sr.StreamHashNoShard] = sr.Rate + } + return rates } -type noopRateStore struct { - rate int +func (s *rateStore) getRates(ctx context.Context, clients []ingesterClient) map[uint64]*logproto.StreamRate { + parallelClients := make(chan ingesterClient, len(clients)) + responses := make(chan *logproto.StreamRatesResponse, len(clients)) + + for i := 0; i < s.maxParallelism; i++ { + go s.getRatesFromIngesters(ctx, parallelClients, responses) + } + + for _, c := range clients { + parallelClients <- c + } + close(parallelClients) + + return ratesPerStream(responses, len(clients)) } -func (n *noopRateStore) RateFor(stream *logproto.Stream) (int, error) { - return n.rate, nil +func (s *rateStore) getRatesFromIngesters(ctx context.Context, clients chan ingesterClient, responses chan *logproto.StreamRatesResponse) { + for c := range clients { + ctx, cancel := context.WithTimeout(ctx, s.ingesterTimeout) + + resp, err := c.client.GetStreamRates(ctx, &logproto.StreamRatesRequest{}) + if err != nil { + level.Error(util_log.Logger).Log("msg", "unable to get stream rates", "err", err) + s.rateRefreshFailures.WithLabelValues(c.addr).Inc() + } + + responses <- resp + cancel() + } +} + +func ratesPerStream(responses chan *logproto.StreamRatesResponse, totalResponses int) map[uint64]*logproto.StreamRate { + streamRates := make(map[uint64]*logproto.StreamRate) + for i := 0; i < totalResponses; i++ { + resp := <-responses + if resp == nil { + continue + } + + for j := 0; j < len(resp.StreamRates); j++ { + rate := resp.StreamRates[j] + + if r, ok := streamRates[rate.StreamHash]; ok { + if r.Rate < rate.Rate { + streamRates[rate.StreamHash] = rate + } + continue + } + + streamRates[rate.StreamHash] = rate + } + } + + return streamRates +} + +func (s *rateStore) getClients() ([]ingesterClient, error) { + ingesters, err := s.ring.GetAllHealthy(ring.Read) + if err != nil { + return nil, err + } + + clients := make([]ingesterClient, 0, len(ingesters.Instances)) + for _, i := range ingesters.Instances { + client, err := s.clientPool.GetClientFor(i.Addr) + if err != nil { + return nil, err + } + + clients = append(clients, ingesterClient{i.Addr, client.(logproto.StreamDataClient)}) + } + + return clients, nil +} + +func (s *rateStore) RateFor(streamHash uint64) int64 { + s.rateLock.RLock() + defer s.rateLock.RUnlock() + + return s.rates[streamHash] } diff --git a/pkg/distributor/ratestore_test.go b/pkg/distributor/ratestore_test.go new file mode 100644 index 000000000000..cb1d354b9d0c --- /dev/null +++ b/pkg/distributor/ratestore_test.go @@ -0,0 +1,209 @@ +package distributor + +import ( + "context" + "testing" + "time" + + "github.com/grafana/loki/pkg/distributor/shardstreams" + "github.com/grafana/loki/pkg/validation" + + "github.com/stretchr/testify/require" + + client2 "github.com/grafana/loki/pkg/ingester/client" + + "google.golang.org/grpc" + + "github.com/grafana/loki/pkg/logproto" + + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/ring/client" +) + +func TestRateStore(t *testing.T) { + t.Run("it reports rates from all of the ingesters", func(t *testing.T) { + tc := setup(true) + tc.ring.replicationSet = ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "ingester0"}, + {Addr: "ingester1"}, + {Addr: "ingester2"}, + {Addr: "ingester3"}, + }, + } + + tc.clientPool.clients = map[string]client.PoolClient{ + "ingester0": newRateClient([]*logproto.StreamRate{{ + StreamHash: 0, StreamHashNoShard: 0, Rate: 15}}), + "ingester1": newRateClient([]*logproto.StreamRate{{ + StreamHash: 1, StreamHashNoShard: 1, Rate: 25}}), + "ingester2": newRateClient([]*logproto.StreamRate{{ + StreamHash: 2, StreamHashNoShard: 2, Rate: 35}}), + "ingester3": newRateClient([]*logproto.StreamRate{{ + StreamHash: 3, StreamHashNoShard: 3, Rate: 45}}), + } + + _ = tc.rateStore.StartAsync(context.Background()) + defer tc.rateStore.StopAsync() + + require.Eventually(t, func() bool { // There will be data + return tc.rateStore.RateFor(0) != 0 + }, time.Second, time.Millisecond) + + require.Equal(t, int64(15), tc.rateStore.RateFor(0)) + require.Equal(t, int64(25), tc.rateStore.RateFor(1)) + require.Equal(t, int64(35), tc.rateStore.RateFor(2)) + require.Equal(t, int64(45), tc.rateStore.RateFor(3)) + }) + + t.Run("it reports the highest rate from replicas", func(t *testing.T) { + tc := setup(true) + tc.ring.replicationSet = ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "ingester0"}, + {Addr: "ingester1"}, + {Addr: "ingester2"}, + }, + } + + tc.clientPool.clients = map[string]client.PoolClient{ + "ingester0": newRateClient([]*logproto.StreamRate{{ + StreamHash: 0, StreamHashNoShard: 0, Rate: 25}}), + "ingester1": newRateClient([]*logproto.StreamRate{{ + StreamHash: 0, StreamHashNoShard: 0, Rate: 35}}), + "ingester2": newRateClient([]*logproto.StreamRate{{ + StreamHash: 0, StreamHashNoShard: 0, Rate: 15}}), + } + + _ = tc.rateStore.StartAsync(context.Background()) + defer tc.rateStore.StopAsync() + + require.Eventually(t, func() bool { // There will be data + return tc.rateStore.RateFor(0) != 0 + }, time.Second, time.Millisecond) + + require.Equal(t, int64(35), tc.rateStore.RateFor(0)) + }) + + t.Run("it aggregates rates over shards", func(t *testing.T) { + tc := setup(true) + tc.ring.replicationSet = ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "ingester0"}, + }, + } + + tc.clientPool.clients = map[string]client.PoolClient{ + "ingester0": newRateClient([]*logproto.StreamRate{ + {StreamHash: 1, StreamHashNoShard: 0, Rate: 25}, + {StreamHash: 2, StreamHashNoShard: 0, Rate: 35}, + {StreamHash: 3, StreamHashNoShard: 0, Rate: 15}, + }), + } + _ = tc.rateStore.StartAsync(context.Background()) + defer tc.rateStore.StopAsync() + + require.Eventually(t, func() bool { // There will be data + return tc.rateStore.RateFor(0) != 0 + }, time.Second, time.Millisecond) + + require.Equal(t, int64(75), tc.rateStore.RateFor(0)) + }) + + t.Run("it does nothing if no one has enabled sharding", func(t *testing.T) { + tc := setup(false) + tc.ring.replicationSet = ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "ingester0"}, + }, + } + + tc.clientPool.clients = map[string]client.PoolClient{ + "ingester0": newRateClient([]*logproto.StreamRate{ + {StreamHash: 1, StreamHashNoShard: 0, Rate: 25}, + }), + } + _ = tc.rateStore.StartAsync(context.Background()) + defer tc.rateStore.StopAsync() + + time.Sleep(time.Second) + require.Equal(t, int64(0), tc.rateStore.RateFor(0)) + }) +} + +func newFakeRing() *fakeRing { + return &fakeRing{} +} + +type fakeRing struct { + ring.ReadRing + + replicationSet ring.ReplicationSet + err error +} + +func (r *fakeRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { + return r.replicationSet, r.err +} + +func newFakeClientPool() *fakeClientPool { + return &fakeClientPool{ + clients: make(map[string]client.PoolClient), + } +} + +type fakeClientPool struct { + clients map[string]client.PoolClient + err error +} + +func (p *fakeClientPool) GetClientFor(addr string) (client.PoolClient, error) { + return p.clients[addr], p.err +} + +func newRateClient(rates []*logproto.StreamRate) client.PoolClient { + return client2.ClosableHealthAndIngesterClient{ + StreamDataClient: &fakeStreamDataClient{resp: &logproto.StreamRatesResponse{StreamRates: rates}}, + } +} + +type fakeStreamDataClient struct { + resp *logproto.StreamRatesResponse + err error +} + +func (c *fakeStreamDataClient) GetStreamRates(ctx context.Context, in *logproto.StreamRatesRequest, opts ...grpc.CallOption) (*logproto.StreamRatesResponse, error) { + return c.resp, c.err +} + +type fakeOverrides struct { + enabled bool +} + +func (c *fakeOverrides) AllByUserID() map[string]*validation.Limits { + return map[string]*validation.Limits{ + "ingester0": { + ShardStreams: &shardstreams.Config{ + Enabled: c.enabled, + }, + }, + } +} + +type testContext struct { + ring *fakeRing + clientPool *fakeClientPool + rateStore *rateStore +} + +func setup(enabled bool) *testContext { + ring := newFakeRing() + cp := newFakeClientPool() + cfg := RateStoreConfig{MaxParallelism: 5, IngesterReqTimeout: time.Second, StreamRateUpdateInterval: 10 * time.Millisecond} + + return &testContext{ + ring: ring, + clientPool: cp, + rateStore: NewRateStore(cfg, ring, cp, &fakeOverrides{enabled}, nil), + } +} diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index b62c4980ab63..bd6a1f78ebab 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -34,6 +34,7 @@ type ClosableHealthAndIngesterClient struct { logproto.PusherClient logproto.QuerierClient logproto.IngesterClient + logproto.StreamDataClient grpc_health_v1.HealthClient io.Closer } @@ -45,6 +46,11 @@ type Config struct { GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"` GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"` + + // Internal is used to indicate that this client communicates on behalf of + // a machine and not a user. When Internal = true, the client won't attempt + // to inject an userid into the context. + Internal bool `yaml:"-"` } // RegisterFlags registers flags. @@ -73,29 +79,31 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) { return nil, err } return ClosableHealthAndIngesterClient{ - PusherClient: logproto.NewPusherClient(conn), - QuerierClient: logproto.NewQuerierClient(conn), - IngesterClient: logproto.NewIngesterClient(conn), - HealthClient: grpc_health_v1.NewHealthClient(conn), - Closer: conn, + PusherClient: logproto.NewPusherClient(conn), + QuerierClient: logproto.NewQuerierClient(conn), + IngesterClient: logproto.NewIngesterClient(conn), + StreamDataClient: logproto.NewStreamDataClient(conn), + HealthClient: grpc_health_v1.NewHealthClient(conn), + Closer: conn, }, nil } func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { var unaryInterceptors []grpc.UnaryClientInterceptor unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...) - unaryInterceptors = append(unaryInterceptors, - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - middleware.UnaryClientInstrumentInterceptor(ingesterClientRequestDuration), - ) + unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())) + if !cfg.Internal { + unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor) + } + unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(ingesterClientRequestDuration)) + var streamInterceptors []grpc.StreamClientInterceptor streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...) - streamInterceptors = append(streamInterceptors, - otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), - middleware.StreamClientUserHeaderInterceptor, - middleware.StreamClientInstrumentInterceptor(ingesterClientRequestDuration), - ) + streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())) + if !cfg.Internal { + streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor) + } + streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(ingesterClientRequestDuration)) return unaryInterceptors, streamInterceptors } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9609d6ad7d99..8440355457f1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -47,7 +47,8 @@ import ( const ( // RingKey is the key under which we store the ingesters ring in the KVStore. - RingKey = "ring" + RingKey = "ring" + internalInstanceID = "internal" ) // ErrReadOnly is returned when the ingester is shutting down and a push was @@ -192,10 +193,11 @@ type Ingester struct { clientConfig client.Config tenantConfigs *runtime.TenantConfigs - shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown - instancesMtx sync.RWMutex - instances map[string]*instance - readonly bool + shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown + instancesMtx sync.RWMutex + instances map[string]*instance + internalInstance *instance // used for non-user communication from the distributors + readonly bool lifecycler *ring.Lifecycler lifecyclerWatcher *services.FailureWatcher @@ -623,14 +625,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro // GetStreamRates returns a response containing all streams and their current rate // TODO: It might be nice for this to be human readable, eventually: Sort output and return labels, too? func (i *Ingester) GetStreamRates(ctx context.Context, req *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error) { - instanceID, err := tenant.TenantID(ctx) - if err != nil { - return nil, err - } else if i.readonly { - return nil, ErrReadOnly - } - - instance, err := i.GetOrCreateInstance(instanceID) + instance, err := i.getOrCreateInternalInstance() if err != nil { return &logproto.StreamRatesResponse{}, err } @@ -659,6 +654,25 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / return inst, nil } +func (i *Ingester) getOrCreateInternalInstance() (*instance, error) { //nolint:revive + if inst, ok := i.getInternalInstance(); ok { + return inst, nil + } + + i.instancesMtx.Lock() + defer i.instancesMtx.Unlock() + + if i.internalInstance == nil { + inst, err := newInstance(&i.cfg, i.periodicConfigs, internalInstanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter) + if err != nil { + return nil, err + } + i.internalInstance = inst + } + + return i.internalInstance, nil +} + // Query the ingests for log streams matching a set of matchers. func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { // initialize stats collection for ingester queries. @@ -955,6 +969,17 @@ func (i *Ingester) getInstanceByID(id string) (*instance, bool) { return inst, ok } +func (i *Ingester) getInternalInstance() (*instance, bool) { + i.instancesMtx.RLock() + defer i.instancesMtx.RUnlock() + + if i.internalInstance != nil { + return i.internalInstance, true + } + + return nil, false +} + func (i *Ingester) getInstances() []*instance { i.instancesMtx.RLock() defer i.instancesMtx.RUnlock() diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index f3917037d52b..5af8a13e8fa6 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -301,6 +301,7 @@ func (t *Loki) setupAuthMiddleware() { []string{ "/grpc.health.v1.Health/Check", "/logproto.Ingester/TransferChunks", + "/logproto.StreamData/GetStreamRates", "/frontend.Frontend/Process", "/frontend.Frontend/NotifyClientShutdown", "/schedulerpb.SchedulerForFrontend/FrontendLoop",