Skip to content

Commit

Permalink
fix: cancel parallel routers
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Feb 9, 2023
1 parent 7f581c8 commit e13d363
Showing 1 changed file with 13 additions and 25 deletions.
38 changes: 13 additions & 25 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ func (r *composableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid
return r.FindProvidersAsync(ctx, cid, count), nil
},
func() bool {
return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0
if count == 0 {
return false
}
return atomic.AddInt64(&totalCount, 1) >= int64(count)
},
)

Expand Down Expand Up @@ -369,23 +372,24 @@ func getChannelOrErrorParallel[T any](
return
}

if shouldStop() {
log.Debug("getChannelOrErrorParallel: stopping channel iteration for router ", r.Router,
select {
case <-ctx.Done():
log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside select for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" closed channel: ", ok,
)
return
case outCh <- val:
}

select {
case <-ctx.Done():
log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside select for router ", r.Router,
if shouldStop() {
log.Debug("getChannelOrErrorParallel: stopping channel iteration for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" closed channel: ", ok,
)
cancelAll()
return
case outCh <- val:
}
}
}
Expand All @@ -402,21 +406,5 @@ func getChannelOrErrorParallel[T any](
log.Debug("getChannelOrErrorParallel: finished executing all routers ", len(routers))
}()

select {
case err, ok := <-errCh:
if !ok {
log.Debug("getChannelOrErrorParallel: closed error channel")
return outCh, routing.ErrNotFound
}
log.Debug("getChannelOrErrorParallel: error on method execution: ", err)

return outCh, err
case <-ctx.Done():
err := ctx.Err()
log.Debug("getChannelOrErrorParallel: context done: ", err)
return outCh, err
default:
log.Debug("getChannelOrErrorParallel: returning channel")
return outCh, nil
}
return outCh, nil
}

0 comments on commit e13d363

Please sign in to comment.