Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zone aware replication support #4626

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,11 @@ lifecycler:
# CLI flag: -distributor.replication-factor
[replication_factor: <int> | default = 3]

# Set to true to enable zone-awareness and replicate ingested samples across
# different availability zones.
# CLI flag: -distributor.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# The number of tokens the lifecycler will generate and put into the ring if
# it joined without transferring tokens from another lifecycler.
# CLI flag: -ingester.num-tokens
Expand Down Expand Up @@ -1035,6 +1040,10 @@ lifecycler:
# CLI flag: -ingester.final-sleep
[final_sleep: <duration> | default = 0s]

# The availability zone where this instance is running.
# CLI flag: -ingester.availability-zone
[availability_zone: <string> | default = ""]

# Number of times to try and transfer chunks when leaving before
# falling back to flushing to the store. Zero = no transfers are done.
# CLI flag: -ingester.max-transfer-retries
Expand Down
10 changes: 6 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
}

const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc

var (
bufDescs [ring.GetBufferSize]ring.InstanceDesc
bufHosts [ring.GetBufferSize]string
bufZones [ring.GetBufferSize]string
)
samplesByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
for i, key := range keys {
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil)
replicationSet, err := d.ingestersRing.Get(key, ring.Write, bufDescs[:0], bufHosts[:0], bufZones[:0])
if err != nil {
return nil, err
}
Expand Down