diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0ae99089cc70..9f0926c4f353 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -206,18 +206,16 @@ func (d *Distributor) stopping(_ error) error { // TODO taken from Cortex, see if we can refactor out an usable interface. type streamTracker struct { stream logproto.Stream - minSuccess int maxFailures int - succeeded atomic.Int32 failed atomic.Int32 } // TODO taken from Cortex, see if we can refactor out an usable interface. type pushTracker struct { - samplesPending atomic.Int32 - samplesFailed atomic.Int32 - done chan struct{} - err chan error + pending atomic.Int32 + samplesFailed atomic.Int32 + done chan struct{} + err chan error } // Push a set of streams. @@ -319,7 +317,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return nil, err } - streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors streams[i].maxFailures = replicationSet.MaxErrors for _, ingester := range replicationSet.Instances { samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i]) @@ -331,7 +328,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each err: make(chan error, 1), } - tracker.samplesPending.Store(int32(len(streams))) + tracker.pending.Store(int32(len(samplesByIngester))) for ingester, samples := range samplesByIngester { go func(ingester ring.InstanceDesc, samples []*streamTracker) { // Use a background context to make sure all ingesters get samples even if we return early @@ -393,16 +390,13 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDes } if pushTracker.samplesFailed.Inc() == 1 { pushTracker.err <- err - } - } else { - if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { - continue - } - if pushTracker.samplesPending.Dec() == 0 { - pushTracker.done <- struct{}{} + return } } } + if pushTracker.pending.Dec() == 0 { + pushTracker.done <- struct{}{} + } } // TODO taken from Cortex, see if we can refactor out an usable interface.