Skip to content

Commit

Permalink
PR fixes.
Browse files Browse the repository at this point in the history
Signed-off-by: Aleskey Sin <leks.sin@gmail.com>
  • Loading branch information
IKSIN committed Feb 18, 2020
1 parent ec2441d commit 6fcb677
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,18 +411,12 @@ func startStreamSeriesSet(
frameTimeoutCtx, cancel := frameCtx(s.responseTimeout)
defer cancel()
var rr *recvResponse

var err error
select {
case <-ctx.Done():
close(done)
err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name))
s.handleErr(err)
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done)
return
case <-frameTimeoutCtx.Done():
close(done)
err = errors.Wrap(frameTimeoutCtx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name))
s.handleErr(err)
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data in %s from %s", s.responseTimeout.String(), s.name), done)
return
case rr = <-rCh:
}
Expand All @@ -434,8 +428,7 @@ func startStreamSeriesSet(

if rr.err != nil {
wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name)
s.handleErr(wrapErr)
close(done)
s.handleErr(wrapErr, done)
return
}
numResponses++
Expand All @@ -450,8 +443,10 @@ func startStreamSeriesSet(
return s
}

func (s *streamSeriesSet) handleErr(err error) {
func (s *streamSeriesSet) handleErr(err error, done chan struct{}) {
defer close(done)
s.closeSeries()

if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
Expand Down

0 comments on commit 6fcb677

Please sign in to comment.