Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait on all ingesters to complete processing before returning success #6241

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,16 @@ func (d *Distributor) stopping(_ error) error {
// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
stream logproto.Stream
minSuccess int
maxFailures int
succeeded atomic.Int32
failed atomic.Int32
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
type pushTracker struct {
samplesPending atomic.Int32
samplesFailed atomic.Int32
done chan struct{}
err chan error
pending atomic.Int32
samplesFailed atomic.Int32
done chan struct{}
err chan error
}

// Push a set of streams.
Expand Down Expand Up @@ -319,7 +317,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, err
}

streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
streams[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Instances {
samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i])
Expand All @@ -331,7 +328,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
tracker.samplesPending.Store(int32(len(streams)))
tracker.pending.Store(int32(len(samplesByIngester)))
for ingester, samples := range samplesByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
Expand Down Expand Up @@ -393,16 +390,13 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDes
}
if pushTracker.samplesFailed.Inc() == 1 {
pushTracker.err <- err
}
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if pushTracker.samplesPending.Dec() == 0 {
pushTracker.done <- struct{}{}
return
}
}
}
if pushTracker.pending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down