Skip to content

Commit

Permalink
Implement Distributor Rate Store (#7349)
Browse files Browse the repository at this point in the history
The `RateStore` concurrently queries all ingester `GetStreamRate` APIs
and stores the results. `RateStore` deduplicates replicas and combines
sharded streams so a caller of `RateFor` gets an unsharded view of a
stream's rate.
  • Loading branch information
MasslessParticle authored Oct 6, 2022
1 parent a7e7a8e commit 7979cfb
Show file tree
Hide file tree
Showing 8 changed files with 562 additions and 69 deletions.
21 changes: 17 additions & 4 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,23 @@ ring:
# The CLI flags prefix for this block config is: distributor.ring
[etcd: <etcd_config>]
# The heartbeat timeout after which ingesters are skipped for
# reading and writing.
# CLI flag: -distributor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# The heartbeat timeout after which ingesters are skipped for
# reading and writing.
# CLI flag: -distributor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
rate_store:
# The max number of concurrent requests to make to ingester stream apis
# CLI flag: -distributor.rate-store.max-request-parallelism
[max_request_parallelism: <int> | default = 200]
# The interval on which distributors will update current stream rates
# from ingesters
# CLI flag: -distributor.rate-store.stream-rate-update-interval
[stream_rate_update_interval: <duration> | default = 1s]
# Timeout for communication between distributors and ingesters when updating
# rates
# CLI flag: -distributor.rate-store.ingester-request-timeout
[ingester_request_timeout: <duration> | default = 1s]
```

## querier
Expand Down
52 changes: 33 additions & 19 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@ type Config struct {

// For testing.
factory ring_client.PoolFactory `yaml:"-"`

RateStore RateStoreConfig `yaml:"rate_store"`
}

// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
}

// RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStore interface {
RateFor(stream *logproto.Stream) (int, error)
RateFor(streamHash uint64) int64
}

// Distributor coordinates replicates and distribution of log streams.
Expand Down Expand Up @@ -118,6 +121,12 @@ func New(
}
}

internalFactory := func(addr string) (ring_client.PoolClient, error) {
internalCfg := clientCfg
internalCfg.Internal = true
return client.New(internalCfg, addr)
}

validator, err := NewValidator(overrides)
if err != nil {
return nil, err
Expand Down Expand Up @@ -150,6 +159,7 @@ func New(
if err != nil {
return nil, err
}

d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
Expand Down Expand Up @@ -188,7 +198,21 @@ func New(
d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
rfStats.Set(int64(ingestersRing.ReplicationFactor()))

servs = append(servs, d.pool)
rs := NewRateStore(
d.cfg.RateStore,
ingestersRing,
clientpool.NewPool(
clientCfg.PoolConfig,
ingestersRing,
internalFactory,
util_log.Logger,
),
overrides,
registerer,
)
d.rateStore = rs

servs = append(servs, d.pool, rs)
d.subservices, err = services.NewManager(servs...)
if err != nil {
return nil, errors.Wrap(err, "services manager")
Expand All @@ -197,8 +221,6 @@ func New(
d.subservicesWatcher.WatchManager(d.subservices)
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)

d.rateStore = &noopRateStore{}

return &d, nil
}

Expand Down Expand Up @@ -393,7 +415,7 @@ func min(x1, x2 int) int {
func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID string) ([]uint32, []streamTracker) {
shardStreamsCfg := d.validator.Limits.ShardStreams(userID)
logger := log.With(util_log.WithUserID(userID, util_log.Logger), "stream", stream.Labels)
shardCount := d.shardCountFor(logger, &stream, streamSize, d.rateStore, shardStreamsCfg)
shardCount := d.shardCountFor(logger, &stream, streamSize, shardStreamsCfg)

if shardCount <= 1 {
return []uint32{util.TokenFor(userID, stream.Labels)}, []streamTracker{{stream: stream}}
Expand Down Expand Up @@ -442,8 +464,8 @@ func labelTemplate(lbls string) labels.Labels {
return streamLabels
}

func (d *Distributor) createShard(shardStreamsCfg *shardstreams.Config, stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) {
lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber, shardStreamsCfg.LoggingEnabled)
func (d *Distributor) createShard(streamshardCfg *shardstreams.Config, stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) {
lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber, streamshardCfg.LoggingEnabled)
if !ok {
return logproto.Stream{}, false
}
Expand Down Expand Up @@ -580,23 +602,15 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
// based on the rate stored in the rate store and will store the new evaluated number of shards.
//
// desiredRate is expected to be given in bytes.
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, streamSize int, rateStore RateStore, streamShardcfg *shardstreams.Config) int {
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, streamSize int, streamShardcfg *shardstreams.Config) int {
if streamShardcfg.DesiredRate.Val() <= 0 {
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", streamShardcfg.DesiredRate.String())
}
return 1
}

rate, err := rateStore.RateFor(stream)
if err != nil {
d.streamShardingFailures.WithLabelValues("rate_not_found").Inc()
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "couldn't shard stream because rate store returned error", "err", err)
}
return 1
}

rate := d.rateStore.RateFor(stream.Hash)
shards := calculateShards(rate, streamSize, streamShardcfg.DesiredRate.Val())
if shards > len(stream.Entries) {
d.streamShardingFailures.WithLabelValues("too_many_shards").Inc()
Expand All @@ -613,8 +627,8 @@ func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream,
return shards
}

func calculateShards(rate, streamSize, desiredRate int) int {
shards := float64((rate + streamSize)) / float64(desiredRate)
func calculateShards(rate int64, streamSize, desiredRate int) int {
shards := float64(rate+int64(streamSize)) / float64(desiredRate)
if shards <= 1 {
return 1
}
Expand Down
28 changes: 21 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func TestStreamShard(t *testing.T) {
require.NoError(t, err)

d := Distributor{
rateStore: &noopRateStore{},
rateStore: &fakeRateStore{},
streamShardingFailures: shardingFailureMetric,
validator: validator,
}
Expand Down Expand Up @@ -728,7 +728,7 @@ func BenchmarkShardStream(b *testing.B) {
distributorBuilder := func(shards int) *Distributor {
d := &Distributor{streamShardingFailures: shardingFailureMetric}
// streamSize is always zero, so number of shards will be dictated just by the rate returned from store.
d.rateStore = &noopRateStore{rate: desiredRate*shards - 1}
d.rateStore = &fakeRateStore{rate: int64(desiredRate*shards - 1)}
return d
}

Expand Down Expand Up @@ -824,7 +824,7 @@ func TestShardCalculation(t *testing.T) {
for _, tc := range []struct {
name string
streamSize int
rate int
rate int64

wantShards int
}{
Expand All @@ -837,7 +837,7 @@ func TestShardCalculation(t *testing.T) {
{
name: "enough data to have two shards, stream size (1mb) + ingested rate (4mb) > 3mb",
streamSize: 1 * megabyte,
rate: desiredRate + 1,
rate: int64(desiredRate + 1),
wantShards: 2,
},
{
Expand All @@ -849,7 +849,7 @@ func TestShardCalculation(t *testing.T) {
{
name: "a lot of shards, stream size (1mb) + ingested rate (300mb) > 3mb",
streamSize: 1 * megabyte,
rate: 300 * megabyte,
rate: int64(300 * megabyte),
wantShards: 101,
},
} {
Expand All @@ -874,7 +874,7 @@ func TestShardCountFor(t *testing.T) {
for _, tc := range []struct {
name string
stream *logproto.Stream
rate int
rate int64
desiredRate loki_flagext.ByteSize

wantStreamSize int // used for sanity check.
Expand Down Expand Up @@ -958,8 +958,9 @@ func TestShardCountFor(t *testing.T) {

d := &Distributor{
streamShardingFailures: shardingFailureMetric,
rateStore: &fakeRateStore{tc.rate},
}
got := d.shardCountFor(util_log.Logger, tc.stream, tc.wantStreamSize, &noopRateStore{tc.rate}, limits.ShardStreams)
got := d.shardCountFor(util_log.Logger, tc.stream, tc.wantStreamSize, limits.ShardStreams)
require.Equal(t, tc.wantShards, got)
})
}
Expand Down Expand Up @@ -1194,6 +1195,7 @@ func makeWriteRequest(lines, size int) *logproto.PushRequest {
type mockIngester struct {
grpc_health_v1.HealthClient
logproto.PusherClient
logproto.StreamDataClient

failAfter time.Duration
succeedAfter time.Duration
Expand All @@ -1217,6 +1219,18 @@ func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts
return nil, nil
}

func (i *mockIngester) GetStreamRates(ctx context.Context, in *logproto.StreamRatesRequest, opts ...grpc.CallOption) (*logproto.StreamRatesResponse, error) {
return &logproto.StreamRatesResponse{}, nil
}

func (i *mockIngester) Close() error {
return nil
}

type fakeRateStore struct {
rate int64
}

func (s *fakeRateStore) RateFor(_ uint64) int64 {
return s.rate
}
Loading

0 comments on commit 7979cfb

Please sign in to comment.