Skip to content

Commit

Permalink
Add zone aware replication support
Browse files Browse the repository at this point in the history
  • Loading branch information
taisho6339 committed Nov 3, 2021
1 parent 5c99962 commit 0368f74
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
9 changes: 9 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,11 @@ lifecycler:
# CLI flag: -distributor.replication-factor
[replication_factor: <int> | default = 3]
# True to enable the zone-awareness and replicate ingested samples across
# different availability zones.
# CLI flag: -distributor.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# The number of tokens the lifecycler will generate and put into the ring if
# it joined without transferring tokens from another lifecycler.
# CLI flag: -ingester.num-tokens
Expand Down Expand Up @@ -1035,6 +1040,10 @@ lifecycler:
# CLI flag: -ingester.final-sleep
[final_sleep: <duration> | default = 0s]
# The availability zone where this instance is running.
# CLI flag: -ingester.availability-zone
[availability_zone: <string> | default = ""]
# Number of times to try and transfer chunks when leaving before
# falling back to flushing to the store. Zero = no transfers are done.
# CLI flag: -ingester.max-transfer-retries
Expand Down
10 changes: 6 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
}

const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc

var (
bufDescs [ring.GetBufferSize]ring.InstanceDesc
bufHosts [ring.GetBufferSize]string
bufZones [ring.GetBufferSize]string
)
samplesByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
for i, key := range keys {
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil)
replicationSet, err := d.ingestersRing.Get(key, ring.Write, bufDescs[:0], bufHosts[:0], bufZones[:0])
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0368f74

Please sign in to comment.