Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

use Context's properly to stop executing when a request is canceled #728

Merged
merged 11 commits into from
Dec 5, 2017
42 changes: 25 additions & 17 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) {
}

// peerQuery takes a request and the path to request it on, then fans it out
// across the cluster, except to the local peer.
// across the cluster, except to the local peer. If any peer fails requests to
// other peers are aborted.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
Expand All @@ -197,11 +198,13 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
}
log.Debug("HTTP %s across %d instances", name, len(peers)-1)

result := make([][]byte, 0, len(peers)-1)
reqCtx, cancel := context.WithCancel(ctx)
defer cancel()

var errors []error
var errLock sync.Mutex
var resLock sync.Mutex
responses := make(chan struct {
data []byte
err error
}, 1)
var wg sync.WaitGroup
for _, peer := range peers {
if peer.IsLocal() {
Expand All @@ -211,24 +214,29 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
go func(peer cluster.Node) {
defer wg.Done()
log.Debug("HTTP Render querying %s%s", peer.Name, path)
buf, err := peer.Post(ctx, name, path, data)
buf, err := peer.Post(reqCtx, name, path, data)
if err != nil {
cancel()
log.Error(4, "HTTP Render error querying %s%s: %q", peer.Name, path, err)
errLock.Lock()
errors = append(errors, err)
errLock.Unlock()
return
}

resLock.Lock()
result = append(result, buf)
resLock.Unlock()
responses <- struct {
data []byte
err error
}{buf, err}
}(peer)
}
wg.Wait()
// wait for all list goroutines to end, then close our responses channel
go func() {
wg.Wait()
close(responses)
}()

if len(errors) > 0 {
return nil, errors[0]
result := make([][]byte, 0, len(peers)-1)
for resp := range responses {
if resp.err != nil {
return nil, err
}
result = append(result, resp.data)
}

return result, nil
Expand Down
3 changes: 3 additions & 0 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
fallbackGraphite string
timeZoneStr string

getTargetsConcurrency int

graphiteProxy *httputil.ReverseProxy
timeZone *time.Location
)
Expand All @@ -45,6 +47,7 @@ func ConfigSetup() {
apiCfg.BoolVar(&multiTenant, "multi-tenant", true, "require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed")
apiCfg.StringVar(&fallbackGraphite, "fallback-graphite-addr", "http://localhost:8080", "in case our /render endpoint does not support the requested processing, proxy the request to this graphite")
apiCfg.StringVar(&timeZoneStr, "time-zone", "local", "timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone")
apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.")
globalconf.Register("http", apiCfg)
}

Expand Down
Loading