From 7dd0f7fcccd68515f1156b1ae1e01f14f6c6c2bb Mon Sep 17 00:00:00 2001 From: Mathew Heard Date: Wed, 25 May 2022 13:37:38 +1000 Subject: [PATCH] Wait on all ingesters to complete processing before returning success Errors still immediately return an error --- pkg/distributor/distributor.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0ae99089cc70..44f8be8ca76a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -386,12 +386,13 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDes // // The use of atomic increments here guarantees only a single sendSamples // goroutine will write to either channel. + done := false for i := range streamTrackers { if err != nil { if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { continue } - if pushTracker.samplesFailed.Inc() == 1 { + if !done && pushTracker.samplesFailed.Inc() == 1 { pushTracker.err <- err } } else { @@ -399,10 +400,13 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDes continue } if pushTracker.samplesPending.Dec() == 0 { - pushTracker.done <- struct{}{} + done = true } } } + if done { + pushTracker.done <- struct{}{} + } } // TODO taken from Cortex, see if we can refactor out an usable interface.