diff --git a/compparallel.go b/compparallel.go index c1c3c87..e0b37cc 100644 --- a/compparallel.go +++ b/compparallel.go @@ -95,7 +95,10 @@ func (r *composableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid return r.FindProvidersAsync(ctx, cid, count), nil }, func() bool { - return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0 + if count == 0 { + return false + } + return atomic.AddInt64(&totalCount, 1) >= int64(count) }, ) @@ -369,23 +372,24 @@ func getChannelOrErrorParallel[T any]( return } - if shouldStop() { - log.Debug("getChannelOrErrorParallel: stopping channel iteration for router ", r.Router, + select { + case <-ctx.Done(): + log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside select for router ", r.Router, " with timeout ", r.Timeout, " and ignore errors ", r.IgnoreError, - " closed channel: ", ok, ) return + case outCh <- val: } - select { - case <-ctx.Done(): - log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside select for router ", r.Router, + if shouldStop() { + log.Debug("getChannelOrErrorParallel: stopping channel iteration for router ", r.Router, " with timeout ", r.Timeout, " and ignore errors ", r.IgnoreError, + " closed channel: ", ok, ) + cancelAll() return - case outCh <- val: } } } diff --git a/version.json b/version.json index 42c14d1..eb3a29f 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.6.0" + "version": "v0.6.1" }