diff --git a/api/cluster.go b/api/cluster.go index b9cfd8c0e7..e81b7d5b01 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -184,7 +184,8 @@ func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) { } // peerQuery takes a request and the path to request it on, then fans it out -// across the cluster, except to the local peer. +// across the cluster, except to the local peer. If any peer fails requests to +// other peers are aborted. // ctx: request context // data: request to be submitted // name: name to be used in logging & tracing @@ -197,11 +198,13 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa } log.Debug("HTTP %s across %d instances", name, len(peers)-1) - result := make([][]byte, 0, len(peers)-1) + reqCtx, cancel := context.WithCancel(ctx) + defer cancel() - var errors []error - var errLock sync.Mutex - var resLock sync.Mutex + responses := make(chan struct { + data []byte + err error + }, 1) var wg sync.WaitGroup for _, peer := range peers { if peer.IsLocal() { @@ -211,24 +214,29 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa go func(peer cluster.Node) { defer wg.Done() log.Debug("HTTP Render querying %s%s", peer.Name, path) - buf, err := peer.Post(ctx, name, path, data) + buf, err := peer.Post(reqCtx, name, path, data) if err != nil { + cancel() log.Error(4, "HTTP Render error querying %s%s: %q", peer.Name, path, err) - errLock.Lock() - errors = append(errors, err) - errLock.Unlock() - return } - - resLock.Lock() - result = append(result, buf) - resLock.Unlock() + responses <- struct { + data []byte + err error + }{buf, err} }(peer) } - wg.Wait() + // wait for all list goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() - if len(errors) > 0 { - return nil, errors[0] + result := make([][]byte, 0, len(peers)-1) + for resp := range responses { + if resp.err != nil { + return nil, err + } + result = append(result, resp.data) } return result, nil diff --git a/api/config.go b/api/config.go index 718ee718e6..64d5c5653e 100644 --- a/api/config.go +++ b/api/config.go @@ -27,6 +27,8 @@ var ( fallbackGraphite string timeZoneStr string + getTargetsConcurrency int + graphiteProxy *httputil.ReverseProxy timeZone *time.Location ) @@ -45,6 +47,7 @@ func ConfigSetup() { apiCfg.BoolVar(&multiTenant, "multi-tenant", true, "require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed") apiCfg.StringVar(&fallbackGraphite, "fallback-graphite-addr", "http://localhost:8080", "in case our /render endpoint does not support the requested processing, proxy the request to this graphite") apiCfg.StringVar(&timeZoneStr, "time-zone", "local", "timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone") + apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.") globalconf.Register("http", apiCfg) } diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 96a2697e20..2473a678fb 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -39,6 +39,20 @@ func doRecover(errp *error) { return } +type limiter chan struct{} + +func (l limiter) enter() { l <- struct{}{} } +func (l limiter) leave() { <-l } + +func newLimiter(l int) limiter { + return make(chan struct{}, l) +} + +type getTargetsResp struct { + series []models.Series + err error +} + // Fix assures all points are nicely aligned (quantized) and padded with nulls in case there's gaps in data // graphite does this quantization before storing, we may want to do that as well at some point // note: values are quantized to the right because we can't lie about the future: @@ -102,6 +116,17 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point { return out } +// divideContext wraps a Consolidate() call with a context.Context condition +func divideContext(ctx context.Context, pointsA, pointsB []schema.Point) []schema.Point { + select { + case <-ctx.Done(): + //request canceled + return nil + default: + } + return divide(pointsA, pointsB) +} + func divide(pointsA, pointsB []schema.Point) []schema.Point { if len(pointsA) != len(pointsB) { panic(fmt.Errorf("divide of a series with len %d by a series with len %d", len(pointsA), len(pointsB))) @@ -124,26 +149,21 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se } } - var mu sync.Mutex var wg sync.WaitGroup - out := make([]models.Series, 0) - errs := make([]error, 0) - + responses := make(chan getTargetsResp, 1) + getCtx, cancel := context.WithCancel(ctx) + defer cancel() if len(localReqs) > 0 { wg.Add(1) go func() { // the only errors returned are from us catching panics, so we should treat them // all as internalServerErrors - series, err := s.getTargetsLocal(ctx, localReqs) - mu.Lock() + series, err := s.getTargetsLocal(getCtx, localReqs) if err != nil { - errs = append(errs, err) - } - if len(series) > 0 { - out = append(out, series...) + cancel() } - mu.Unlock() + responses <- getTargetsResp{series, err} wg.Done() }() } @@ -151,92 +171,114 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se wg.Add(1) go func() { // all errors returned returned are *response.Error. - series, err := s.getTargetsRemote(ctx, remoteReqs) - mu.Lock() + series, err := s.getTargetsRemote(getCtx, remoteReqs) if err != nil { - errs = append(errs, err) + cancel() } - if len(series) > 0 { - out = append(out, series...) - } - mu.Unlock() + responses <- getTargetsResp{series, err} wg.Done() }() } - wg.Wait() - var err error - if len(errs) > 0 { - err = errs[0] + + // wait for all getTargets goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + out := make([]models.Series, 0) + for resp := range responses { + if resp.err != nil { + return nil, resp.err + } + out = append(out, resp.series...) } log.Debug("DP getTargets: %d series found on cluster", len(out)) - return out, err + return out, nil } func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) { - seriesChan := make(chan []models.Series, len(remoteReqs)) - errorsChan := make(chan error, len(remoteReqs)) + responses := make(chan getTargetsResp, len(remoteReqs)) + rCtx, cancel := context.WithCancel(ctx) + defer cancel() wg := sync.WaitGroup{} wg.Add(len(remoteReqs)) for _, nodeReqs := range remoteReqs { log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.Name) - go func(ctx context.Context, reqs []models.Req) { + go func(reqs []models.Req) { defer wg.Done() node := reqs[0].Node - buf, err := node.Post(ctx, "getTargetsRemote", "/getdata", models.GetData{Requests: reqs}) + buf, err := node.Post(rCtx, "getTargetsRemote", "/getdata", models.GetData{Requests: reqs}) if err != nil { - errorsChan <- err + cancel() + responses <- getTargetsResp{nil, err} return } var resp models.GetDataResp _, err = resp.UnmarshalMsg(buf) if err != nil { + cancel() log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err) - errorsChan <- err + responses <- getTargetsResp{nil, err} return } log.Debug("DP getTargetsRemote: %s returned %d series", node.Name, len(resp.Series)) - seriesChan <- resp.Series - }(ctx, nodeReqs) + responses <- getTargetsResp{resp.Series, nil} + }(nodeReqs) } + + // wait for all getTargetsRemote goroutines to end, then close our responses channel go func() { wg.Wait() - close(seriesChan) - close(errorsChan) + close(responses) }() + out := make([]models.Series, 0) - var err error - for series := range seriesChan { - out = append(out, series...) + for resp := range responses { + if resp.err != nil { + return nil, resp.err + } + out = append(out, resp.series...) } log.Debug("DP getTargetsRemote: total of %d series found on peers", len(out)) - for e := range errorsChan { - err = e - break - } - return out, err + return out, nil } // error is the error of the first failing target request func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]models.Series, error) { log.Debug("DP getTargetsLocal: handling %d reqs locally", len(reqs)) - seriesChan := make(chan models.Series, len(reqs)) - errorsChan := make(chan error, len(reqs)) - // TODO: abort pending requests on error, maybe use context, maybe timeouts too - wg := sync.WaitGroup{} - wg.Add(len(reqs)) + responses := make(chan getTargetsResp, len(reqs)) + + var wg sync.WaitGroup + reqLimiter := newLimiter(getTargetsConcurrency) + + rCtx, cancel := context.WithCancel(ctx) + defer cancel() +LOOP: for _, req := range reqs { - go func(ctx context.Context, wg *sync.WaitGroup, req models.Req) { - ctx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal") + // check to see if the request has been canceled, if so abort now. + select { + case <-rCtx.Done(): + //request canceled + break LOOP + default: + } + // if there are already getDataConcurrency goroutines running, then block + // until a slot becomes free. + reqLimiter.enter() + wg.Add(1) + go func(req models.Req) { + rCtx, span := tracing.NewSpan(rCtx, s.Tracer, "getTargetsLocal") req.Trace(span) - defer span.Finish() pre := time.Now() - points, interval, err := s.getTarget(ctx, req) + points, interval, err := s.getTarget(rCtx, req) if err != nil { tags.Error.Set(span, true) - errorsChan <- err + cancel() // cancel all other requests. + responses <- getTargetsResp{nil, err} } else { getTargetDuration.Value(time.Now().Sub(pre)) - seriesChan <- models.Series{ + responses <- getTargetsResp{[]models.Series{{ Target: req.Target, // always simply the metric name from index Datapoints: points, Interval: interval, @@ -245,27 +287,27 @@ func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]mode QueryTo: req.To, QueryCons: req.ConsReq, Consolidator: req.Consolidator, - } + }}, nil} } wg.Done() - }(ctx, &wg, req) + // pop an item of our limiter so that other requests can be processed. + reqLimiter.leave() + span.Finish() + }(req) } go func() { wg.Wait() - close(seriesChan) - close(errorsChan) + close(responses) }() out := make([]models.Series, 0, len(reqs)) - var err error - for series := range seriesChan { - out = append(out, series) + for resp := range responses { + if resp.err != nil { + return nil, resp.err + } + out = append(out, resp.series...) } log.Debug("DP getTargetsLocal: %d series found locally", len(out)) - for e := range errorsChan { - err = e - break - } - return out, err + return out, nil } @@ -285,28 +327,55 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema } if !readRollup && !normalize { - return s.getSeriesFixed(ctx, req, consolidation.None), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) + return fixed, req.OutInterval, err } else if !readRollup && normalize { - return consolidation.Consolidate(s.getSeriesFixed(ctx, req, consolidation.None), req.AggNum, req.Consolidator), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) + if err != nil { + return nil, req.OutInterval, err + } + return consolidation.ConsolidateContext(ctx, fixed, req.AggNum, req.Consolidator), req.OutInterval, nil } else if readRollup && !normalize { if req.Consolidator == consolidation.Avg { - return divide( - s.getSeriesFixed(ctx, req, consolidation.Sum), - s.getSeriesFixed(ctx, req, consolidation.Cnt), + sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum) + if err != nil { + return nil, req.OutInterval, err + } + cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt) + if err != nil { + return nil, req.OutInterval, err + } + return divideContext( + ctx, + sumFixed, + cntFixed, ), req.OutInterval, nil } else { - return s.getSeriesFixed(ctx, req, req.Consolidator), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) + return fixed, req.OutInterval, err } } else { // readRollup && normalize if req.Consolidator == consolidation.Avg { - return divide( - consolidation.Consolidate(s.getSeriesFixed(ctx, req, consolidation.Sum), req.AggNum, consolidation.Sum), - consolidation.Consolidate(s.getSeriesFixed(ctx, req, consolidation.Cnt), req.AggNum, consolidation.Sum), + sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum) + if err != nil { + return nil, req.OutInterval, err + } + cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt) + if err != nil { + return nil, req.OutInterval, err + } + return divideContext( + ctx, + consolidation.Consolidate(sumFixed, req.AggNum, consolidation.Sum), + consolidation.Consolidate(cntFixed, req.AggNum, consolidation.Sum), ), req.OutInterval, nil } else { - return consolidation.Consolidate( - s.getSeriesFixed(ctx, req, req.Consolidator), req.AggNum, req.Consolidator), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, req.Consolidator) + if err != nil { + return nil, req.OutInterval, err + } + return consolidation.ConsolidateContext(ctx, fixed, req.AggNum, req.Consolidator), req.OutInterval, nil } } } @@ -321,30 +390,55 @@ func AggMetricKey(key, archive string, aggSpan uint32) string { return fmt.Sprintf("%s_%s_%d", key, archive, aggSpan) } -func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) []schema.Point { +func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) { + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } rctx := newRequestContext(ctx, &req, consolidator) - res := s.getSeries(rctx) + res, err := s.getSeries(rctx) + if err != nil { + return nil, err + } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } res.Points = append(s.itersToPoints(rctx, res.Iters), res.Points...) - return Fix(res.Points, req.From, req.To, req.ArchInterval) + return Fix(res.Points, req.From, req.To, req.ArchInterval), nil } -func (s *Server) getSeries(ctx *requestContext) mdata.Result { +func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { res := s.getSeriesAggMetrics(ctx) + select { + case <-ctx.ctx.Done(): + //request canceled + return res, nil + default: + } + log.Debug("oldest from aggmetrics is %d", res.Oldest) span := opentracing.SpanFromContext(ctx.ctx) span.SetTag("oldest_in_ring", res.Oldest) if res.Oldest <= ctx.From { reqSpanMem.ValueUint32(ctx.To - ctx.From) - return res + return res, nil } // if oldest < to -> search until oldest, we already have the rest from mem // if to < oldest -> no need to search until oldest, only search until to until := util.Min(res.Oldest, ctx.To) - - res.Iters = append(s.getSeriesCachedStore(ctx, until), res.Iters...) - return res + fromCache, err := s.getSeriesCachedStore(ctx, until) + if err != nil { + return res, err + } + res.Iters = append(fromCache, res.Iters...) + return res, nil } // getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive) @@ -394,7 +488,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) mdata.Result { } // will only fetch until until, but uses ctx.To for debug logging -func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk.Iter { +func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chunk.Iter, error) { var iters []chunk.Iter var prevts uint32 @@ -416,6 +510,14 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk cacheRes := s.Cache.Search(ctx.ctx, key, ctx.From, until) log.Debug("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End)) + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.ctx.Done(): + //request canceled + return iters, nil + default: + } + for _, itgen := range cacheRes.Start { iter, err := itgen.Get() prevts = itgen.Ts @@ -423,17 +525,32 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk // TODO(replay) figure out what to do if one piece is corrupt tracing.Failure(span) tracing.Errorf(span, "itergen: error getting iter from Start list %+v", err) - continue + return iters, err } iters = append(iters, *iter) } + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.ctx.Done(): + //request canceled + return iters, nil + default: + } + // the request cannot completely be served from cache, it will require cassandra involvement if !cacheRes.Complete { if cacheRes.From != cacheRes.Until { storeIterGens, err := s.BackendStore.Search(ctx.ctx, key, ctx.Req.TTL, cacheRes.From, cacheRes.Until) if err != nil { - panic(err) + return iters, err + } + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.ctx.Done(): + //request canceled + return iters, nil + default: } for _, itgen := range storeIterGens { @@ -442,7 +559,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk // TODO(replay) figure out what to do if one piece is corrupt tracing.Failure(span) tracing.Errorf(span, "itergen: error getting iter from cassandra slice %+v", err) - continue + return iters, err } // it's important that the itgens get added in chronological order, // currently we rely on cassandra returning results in order @@ -458,13 +575,13 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk if err != nil { // TODO(replay) figure out what to do if one piece is corrupt log.Error(3, "itergen: error getting iter from cache result end slice %+v", err) - continue + return iters, err } iters = append(iters, *it) } } - return iters + return iters, nil } // check for duplicate series names for the same query. If found merge the results. diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 894e93c14e..efa2bfb22a 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -368,7 +368,10 @@ func TestGetSeriesFixed(t *testing.T) { metric.Add(40+offset, 50) // this point will always be quantized to 50 req := models.NewReq(name, name, name, from, to, 1000, 10, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0) req.ArchInterval = 10 - points := srv.getSeriesFixed(test.NewContext(), req, consolidation.None) + points, err := srv.getSeriesFixed(test.NewContext(), req, consolidation.None) + if err != nil { + t.Errorf("case %q - error: %s", name, err) + } if !reflect.DeepEqual(expected, points) { t.Errorf("case %q - exp: %v - got %v", name, expected, points) } @@ -637,7 +640,10 @@ func TestGetSeriesCachedStore(t *testing.T) { req := reqRaw(metric, from, to, span, 1, consolidation.None, 0, 0) req.ArchInterval = 1 ctx := newRequestContext(test.NewContext(), &req, consolidation.None) - iters := srv.getSeriesCachedStore(ctx, to) + iters, err := srv.getSeriesCachedStore(ctx, to) + if err != nil { + t.Fatalf("Pattern %s From %d To %d: error %s", pattern, from, to, err) + } // expecting the first returned timestamp to be the T0 of the chunk containing "from" expectResFrom := from - (from % span) diff --git a/api/graphite.go b/api/graphite.go index a9707ca900..e2a10b5489 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -59,49 +59,72 @@ func (s *Server) findSeries(ctx context.Context, orgId int, patterns []string, s return nil, err } log.Debug("HTTP findSeries for %v across %d instances", patterns, len(peers)) - errors := make([]error, 0) - series := make([]Series, 0) - - var mu sync.Mutex var wg sync.WaitGroup + + responses := make(chan struct { + series []Series + err error + }, 1) + findCtx, cancel := context.WithCancel(ctx) + defer cancel() for _, peer := range peers { log.Debug("HTTP findSeries getting results from %s", peer.Name) wg.Add(1) if peer.IsLocal() { go func() { - result, err := s.findSeriesLocal(ctx, orgId, patterns, seenAfter) - mu.Lock() + result, err := s.findSeriesLocal(findCtx, orgId, patterns, seenAfter) if err != nil { - errors = append(errors, err) + // cancel requests on all other peers. + cancel() } - series = append(series, result...) - mu.Unlock() + responses <- struct { + series []Series + err error + }{result, err} wg.Done() }() } else { go func(peer cluster.Node) { - result, err := s.findSeriesRemote(ctx, orgId, patterns, seenAfter, peer) - mu.Lock() + result, err := s.findSeriesRemote(findCtx, orgId, patterns, seenAfter, peer) if err != nil { - errors = append(errors, err) + // cancel requests on all other peers. + cancel() } - series = append(series, result...) - mu.Unlock() + responses <- struct { + series []Series + err error + }{result, err} wg.Done() }(peer) } } - wg.Wait() - if len(errors) > 0 { - err = errors[0] + + // wait for all findSeries goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + series := make([]Series, 0) + for resp := range responses { + if resp.err != nil { + return nil, err + } + series = append(series, resp.series...) } - return series, err + return series, nil } func (s *Server) findSeriesLocal(ctx context.Context, orgId int, patterns []string, seenAfter int64) ([]Series, error) { result := make([]Series, 0) for _, pattern := range patterns { + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } _, span := tracing.NewSpan(ctx, s.Tracer, "findSeriesLocal") span.SetTag("org", orgId) span.SetTag("pattern", pattern) @@ -133,6 +156,12 @@ func (s *Server) findSeriesRemote(ctx context.Context, orgId int, patterns []str log.Error(4, "HTTP Render error querying %s/index/find: %q", peer.Name, err) return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } resp := models.NewIndexFindResp() _, err = resp.UnmarshalMsg(buf) if err != nil { @@ -249,6 +278,15 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR return } + // check to see if the request has been canceled, if so abort now. + select { + case <-newctx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + noDataPoints := true for _, o := range out { if len(o.Datapoints) != 0 { @@ -279,11 +317,22 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin return } nodes := make([]idx.Node, 0) - series, err := s.findSeries(ctx.Req.Context(), ctx.OrgId, []string{request.Query}, int64(fromUnix)) + reqCtx := ctx.Req.Context() + series, err := s.findSeries(reqCtx, ctx.OrgId, []string{request.Query}, int64(fromUnix)) if err != nil { response.Write(ctx, response.WrapError(err)) return } + + // check to see if the request has been canceled, if so abort now. + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + seenPaths := make(map[string]struct{}) // different nodes may have overlapping data in their index. // maybe because they used to receive a certain shard but now dont. or because they host metrics under branches @@ -320,6 +369,12 @@ func (s *Server) listRemote(ctx context.Context, orgId int, peer cluster.Node) ( log.Error(4, "HTTP IndexJson() error querying %s/index/list: %q", peer.Name, err) return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } result := make([]idx.Archive, 0) for len(buf) != 0 { var def idx.Archive @@ -339,53 +394,67 @@ func (s *Server) metricsIndex(ctx *middleware.Context) { response.Write(ctx, response.WrapError(err)) return } - errors := make([]error, 0) - series := make([]idx.Archive, 0) - seenDefs := make(map[string]struct{}) - var mu sync.Mutex + reqCtx, cancel := context.WithCancel(ctx.Req.Context()) + defer cancel() + responses := make(chan struct { + series []idx.Archive + err error + }, 1) var wg sync.WaitGroup for _, peer := range peers { wg.Add(1) if peer.IsLocal() { go func() { result := s.listLocal(ctx.OrgId) - mu.Lock() - for _, def := range result { - if _, ok := seenDefs[def.Id]; !ok { - series = append(series, def) - seenDefs[def.Id] = struct{}{} - } - } - mu.Unlock() + responses <- struct { + series []idx.Archive + err error + }{result, nil} wg.Done() }() } else { go func(peer cluster.Node) { - result, err := s.listRemote(ctx.Req.Context(), ctx.OrgId, peer) - mu.Lock() + result, err := s.listRemote(reqCtx, ctx.OrgId, peer) if err != nil { - errors = append(errors, err) - } - for _, def := range result { - if _, ok := seenDefs[def.Id]; !ok { - series = append(series, def) - seenDefs[def.Id] = struct{}{} - } + cancel() } - mu.Unlock() + responses <- struct { + series []idx.Archive + err error + }{result, err} wg.Done() }(peer) } } - wg.Wait() - if len(errors) > 0 { - err = errors[0] + + // wait for all list goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + series := make([]idx.Archive, 0) + seenDefs := make(map[string]struct{}) + for resp := range responses { + if resp.err != nil { + response.Write(ctx, response.WrapError(err)) + return + } + for _, def := range resp.series { + if _, ok := seenDefs[def.Id]; !ok { + series = append(series, def) + seenDefs[def.Id] = struct{}{} + } + } } - if err != nil { - log.Error(3, "HTTP IndexJson() %s", err.Error()) - response.Write(ctx, response.WrapError(err)) + // check to see if the request has been canceled, if so abort now. + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) return + default: } response.Write(ctx, response.NewFastJson(200, models.MetricNames(series))) @@ -479,9 +548,14 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete peers := cluster.Manager.MemberList() peers = append(peers, cluster.Manager.ThisNode()) log.Debug("HTTP metricsDelete for %v across %d instances", req.Query, len(peers)) - errors := make([]error, 0) + + reqCtx, cancel := context.WithCancel(ctx.Req.Context()) + defer cancel() deleted := 0 - var mu sync.Mutex + responses := make(chan struct { + deleted int + err error + }, len(peers)) var wg sync.WaitGroup for _, peer := range peers { log.Debug("HTTP metricsDelete getting results from %s", peer.Name) @@ -489,36 +563,57 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete if peer.IsLocal() { go func() { result, err := s.metricsDeleteLocal(ctx.OrgId, req.Query) - mu.Lock() + var e error if err != nil { - // errors can be due to bad user input or corrupt index. + cancel() if strings.Contains(err.Error(), "Index is corrupt") { - errors = append(errors, response.NewError(http.StatusInternalServerError, err.Error())) + e = response.NewError(http.StatusInternalServerError, err.Error()) } else { - errors = append(errors, response.NewError(http.StatusBadRequest, err.Error())) + e = response.NewError(http.StatusBadRequest, err.Error()) } } - deleted += result - mu.Unlock() + responses <- struct { + deleted int + err error + }{result, e} wg.Done() }() } else { go func(peer cluster.Node) { - result, err := s.metricsDeleteRemote(ctx.Req.Context(), ctx.OrgId, req.Query, peer) - mu.Lock() + result, err := s.metricsDeleteRemote(reqCtx, ctx.OrgId, req.Query, peer) if err != nil { - errors = append(errors, err) + cancel() } - deleted += result - mu.Unlock() + responses <- struct { + deleted int + err error + }{result, err} wg.Done() }(peer) } } - wg.Wait() - var err error - if len(errors) > 0 { - response.Write(ctx, response.WrapError(err)) + + // wait for all metricsDelete goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + for resp := range responses { + if resp.err != nil { + response.Write(ctx, response.WrapError(resp.err)) + return + } + deleted += resp.deleted + } + + // check to see if the request has been canceled, if so abort now. + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: } resp := models.MetricsDeleteResp{ @@ -545,6 +640,14 @@ func (s *Server) metricsDeleteRemote(ctx context.Context, orgId int, query strin log.Error(4, "HTTP metricDelete error querying %s/index/delete: %q", peer.Name, err) return 0, err } + + select { + case <-ctx.Done(): + //request canceled + return 0, nil + default: + } + resp := models.MetricsDeleteResp{} _, err = resp.UnmarshalMsg(buf) if err != nil { @@ -568,6 +671,12 @@ func (s *Server) executePlan(ctx context.Context, orgId int, plan expr.Plan) ([] // e.g. target=movingAvg(foo.*, "1h")&target=foo.* // note that in this case we fetch foo.* twice. can be optimized later for _, r := range plan.Reqs { + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } series, err := s.findSeries(ctx, orgId, []string{r.Query}, int64(r.From)) if err != nil { return nil, err @@ -600,6 +709,13 @@ func (s *Server) executePlan(ctx context.Context, orgId int, plan expr.Plan) ([] } } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + reqRenderSeriesCount.Value(len(reqs)) if len(reqs) == 0 { return nil, nil @@ -626,6 +742,7 @@ func (s *Server) executePlan(ctx context.Context, orgId int, plan expr.Plan) ([] log.Error(3, "HTTP Render %s", err.Error()) return nil, err } + out = mergeSeries(out) // instead of waiting for all data to come in and then start processing everything, we could consider starting processing earlier, at the risk of doing needless work @@ -684,13 +801,21 @@ func (s *Server) graphiteTagDetails(ctx *middleware.Context, request models.Grap response.Write(ctx, response.NewError(http.StatusBadRequest, "not tag specified")) return } - - tagValues, err := s.clusterTagDetails(ctx.Req.Context(), ctx.OrgId, tag, request.Filter, request.From) + reqCtx := ctx.Req.Context() + tagValues, err := s.clusterTagDetails(reqCtx, ctx.OrgId, tag, request.Filter, request.From) if err != nil { response.Write(ctx, response.WrapError(err)) return } + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + resp := models.GraphiteTagDetailsResp{ Tag: tag, Values: make([]models.GraphiteTagDetailsValueResp, 0, len(tagValues)), @@ -714,12 +839,24 @@ func (s *Server) clusterTagDetails(ctx context.Context, orgId int, tag, filter s if result == nil { result = make(map[string]uint64) } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } data := models.IndexTagDetails{OrgId: orgId, Tag: tag, Filter: filter, From: from} bufs, err := s.peerQuery(ctx, data, "clusterTagDetails", "/index/tag_details") if err != nil { return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } resp := models.IndexTagDetailsResp{} for _, buf := range bufs { _, err = resp.UnmarshalMsg(buf) @@ -735,12 +872,21 @@ func (s *Server) clusterTagDetails(ctx context.Context, orgId int, tag, filter s } func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.GraphiteTagFindSeries) { - series, err := s.clusterFindByTag(ctx.Req.Context(), ctx.OrgId, request.Expr, request.From) + reqCtx := ctx.Req.Context() + series, err := s.clusterFindByTag(reqCtx, ctx.OrgId, request.Expr, request.From) if err != nil { response.Write(ctx, response.WrapError(err)) return } + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + response.Write(ctx, response.NewJson(200, series, "")) } @@ -752,6 +898,13 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId int, expressions [] return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + for _, series := range result { seriesSet[series] = struct{}{} } @@ -762,6 +915,13 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId int, expressions [] return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + resp := models.IndexFindByTagResp{} for _, buf := range bufs { _, err = resp.UnmarshalMsg(buf) @@ -782,12 +942,21 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId int, expressions [] } func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTags) { - tags, err := s.clusterTags(ctx.Req.Context(), ctx.OrgId, request.Filter, request.From) + reqCtx := ctx.Req.Context() + tags, err := s.clusterTags(reqCtx, ctx.OrgId, request.Filter, request.From) if err != nil { response.Write(ctx, response.WrapError(err)) return } + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + var resp models.GraphiteTagsResp for _, tag := range tags { resp = append(resp, models.GraphiteTagResp{Tag: tag}) @@ -800,6 +969,12 @@ func (s *Server) clusterTags(ctx context.Context, orgId int, filter string, from if err != nil { return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } tagSet := make(map[string]struct{}, len(result)) for _, tag := range result { @@ -812,6 +987,13 @@ func (s *Server) clusterTags(ctx context.Context, orgId int, filter string, from return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + resp := models.IndexTagsResp{} for _, buf := range bufs { _, err = resp.UnmarshalMsg(buf) diff --git a/api/response/error.go b/api/response/error.go index a58ce7cc65..3d7723eb83 100644 --- a/api/response/error.go +++ b/api/response/error.go @@ -51,3 +51,5 @@ func (r *ErrorResp) Headers() (headers map[string]string) { headers = map[string]string{"content-type": "text/plain"} return headers } + +var RequestCanceledErr = NewError(499, "request canceled") diff --git a/cluster/config.go b/cluster/config.go index cdacd080b7..e926454f04 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -23,7 +23,8 @@ var ( httpTimeout time.Duration minAvailableShards int - client http.Client + client http.Client + transport *http.Transport ) func ConfigSetup() { @@ -57,17 +58,17 @@ func ConfigProcess() { clusterPort = addr.Port Mode = ModeType(mode) - + transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: time.Second * 5, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: time.Second, + } client = http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: time.Second * 5, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: time.Second, - }, - Timeout: httpTimeout, + Transport: transport, + Timeout: httpTimeout, } } diff --git a/cluster/node.go b/cluster/node.go index 42b90e06ad..b88cf0d7f3 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -152,17 +152,45 @@ func (n Node) Post(ctx context.Context, name, path string, body Traceable) (ret log.Error(3, "CLU failed to inject span into headers: %s", err) } req.Header.Add("Content-Type", "application/json") - rsp, err := client.Do(req) - if err != nil { - log.Error(3, "CLU Node: %s unreachable. %s", n.Name, err.Error()) - return nil, NewError(http.StatusServiceUnavailable, fmt.Errorf("cluster node unavailable")) + + c := make(chan struct { + r *http.Response + err error + }, 1) + + go func() { + rsp, err := client.Do(req) + c <- struct { + r *http.Response + err error + }{rsp, err} + }() + + // wait for either our results from the http request or if out context has been canceled + // then abort the http request. + select { + case <-ctx.Done(): + log.Debug("CLU Node: context canceled. terminating request to peer %s", n.Name) + transport.CancelRequest(req) + <-c // Wait for client.Do but ignore result + case resp := <-c: + err := resp.err + rsp := resp.r + if err != nil { + tags.Error.Set(span, true) + log.Error(3, "CLU Node: %s unreachable. %s", n.Name, err.Error()) + return nil, NewError(http.StatusServiceUnavailable, fmt.Errorf("cluster node unavailable")) + } + return handleResp(rsp) } - return handleResp(rsp) + + return nil, nil } func handleResp(rsp *http.Response) ([]byte, error) { defer rsp.Body.Close() if rsp.StatusCode != 200 { + ioutil.ReadAll(rsp.Body) return nil, NewError(rsp.StatusCode, fmt.Errorf(rsp.Status)) } return ioutil.ReadAll(rsp.Body) diff --git a/consolidation/consolidate.go b/consolidation/consolidate.go index 6fa5989af1..cfabda329a 100644 --- a/consolidation/consolidate.go +++ b/consolidation/consolidate.go @@ -1,9 +1,22 @@ package consolidation import ( + "context" + "gopkg.in/raintank/schema.v1" ) +// ConsolidateContext wraps a Consolidate() call with a context.Context condition +func ConsolidateContext(ctx context.Context, in []schema.Point, aggNum uint32, consolidator Consolidator) []schema.Point { + select { + case <-ctx.Done(): + //request canceled + return nil + default: + } + return Consolidate(in, aggNum, consolidator) +} + // Consolidate consolidates `in`, aggNum points at a time via the given function // note: the returned slice repurposes in's backing array. func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []schema.Point { diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index 048972fdaa..df241c1721 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://graphite log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index 7cb6f96242..3c938a4a02 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://graphite log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/docs/config.md b/docs/config.md index 2200cfa6e2..720fd68859 100644 --- a/docs/config.md +++ b/docs/config.md @@ -187,6 +187,8 @@ fallback-graphite-addr = http://localhost:8080 log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node +get-targets-concurrency = 20 ``` ## metric data inputs ## diff --git a/mdata/cwr.go b/mdata/cwr.go index eea3b8c873..c9c97026e5 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -1,6 +1,7 @@ package mdata import ( + "context" "time" "github.com/grafana/metrictank/mdata/chunk" @@ -13,6 +14,7 @@ type ChunkReadRequest struct { p []interface{} timestamp time.Time out chan outcome + ctx context.Context } type ChunkWriteRequest struct { diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index 493c2bc050..9c15773182 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -44,6 +44,7 @@ var ( errReadQueueFull = errors.New("the read queue is full") errReadTooOld = errors.New("the read is too old") errTableNotFound = errors.New("table for given TTL not found") + errCtxCanceled = errors.New("context canceled") // metric store.cassandra.get.exec is the duration of getting from cassandra store cassGetExecDuration = stats.NewLatencyHistogram15s32("store.cassandra.get.exec") @@ -406,7 +407,7 @@ type outcome struct { month uint32 sortKey uint32 i *gocql.Iter - omitted bool + err error } type asc []outcome @@ -416,15 +417,29 @@ func (o asc) Less(i, j int) bool { return o[i].sortKey < o[j].sortKey } func (c *CassandraStore) processReadQueue() { for crr := range c.readQueue { + // check to see if the request has been canceled, if so abort now. + select { + case <-crr.ctx.Done(): + //request canceled + crr.out <- outcome{err: errCtxCanceled} + continue + default: + } waitDuration := time.Since(crr.timestamp) cassGetWaitDuration.Value(waitDuration) if waitDuration > c.omitReadTimeout { cassOmitOldRead.Inc() - crr.out <- outcome{omitted: true} + crr.out <- outcome{err: errReadTooOld} continue } + pre := time.Now() - iter := outcome{crr.month, crr.sortKey, c.Session.Query(crr.q, crr.p...).Iter(), false} + iter := outcome{ + month: crr.month, + sortKey: crr.sortKey, + i: c.Session.Query(crr.q, crr.p...).Iter(), + err: nil, + } cassGetExecDuration.Value(time.Since(pre)) crr.out <- iter } @@ -460,7 +475,7 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key, table string, sta crrs := make([]*ChunkReadRequest, 0) query := func(month, sortKey uint32, q string, p ...interface{}) { - crrs = append(crrs, &ChunkReadRequest{month, sortKey, q, p, pre, nil}) + crrs = append(crrs, &ChunkReadRequest{month, sortKey, q, p, pre, nil, ctx}) } start_month := start - (start % Month_sec) // starting row has to be at, or before, requested start @@ -503,6 +518,10 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key, table string, sta for i := range crrs { crrs[i].out = results select { + case <-ctx.Done(): + // request has been canceled, so no need to continue queuing reads. + // reads already queued will be aborted when read from the queue. + return nil, nil case c.readQueue <- crrs[i]: default: cassReadQueueFull.Inc() @@ -514,17 +533,28 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key, table string, sta outcomes := make([]outcome, 0, numQueries) seen := 0 - for o := range results { - if o.omitted { - tracing.Failure(span) - tracing.Error(span, errReadTooOld) - return nil, errReadTooOld - } - seen += 1 - outcomes = append(outcomes, o) - if seen == numQueries { - close(results) - break +LOOP: + for { + select { + case <-ctx.Done(): + // request has been canceled, so no need to continue processing results + return nil, nil + case o := <-results: + if o.err != nil { + if o.err == errCtxCanceled { + // context was canceled, return immediately. + return nil, nil + } + tracing.Failure(span) + tracing.Error(span, o.err) + return nil, o.err + } + seen += 1 + outcomes = append(outcomes, o) + if seen == numQueries { + close(results) + break LOOP + } } } cassGetChunksDuration.Value(time.Since(pre)) diff --git a/metrictank-sample.ini b/metrictank-sample.ini index ce503eca8f..b8d98465c5 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -149,6 +149,8 @@ fallback-graphite-addr = http://localhost:8080 log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index 32dade5fe4..5b12044b74 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://graphite log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index 401708b22a..77b1780078 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://localhost:8080 log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ##