From 2319ad7662cb01b730266c46fe5071fa852fbfb7 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 14 Sep 2017 23:55:07 +0800 Subject: [PATCH 01/11] use context.Context's properly stop executing when a request is canceled - The ctx was already being passed between the different methods within the query pipeline, but the context was never being inspected to see if it had been canceled. This commit fixes that. - Now when an error occurs, whether that error is local or on a remote peer, the request is immediately canceled. - if the client disconnects before a response is returned (whether that is due to a timeout or the client canceling the request), that cancelation will propagate throught and abort any work. --- api/cluster.go | 3 +- api/dataprocessor.go | 144 ++++++----- api/graphite.go | 238 +++++++++++++----- api/response/error.go | 2 + cluster/config.go | 25 +- cluster/node.go | 38 ++- docker/docker-cluster/metrictank.ini | 2 + .../metrictank.ini | 2 + mdata/cwr.go | 2 + mdata/store_cassandra.go | 44 +++- metrictank-sample.ini | 2 + scripts/config/metrictank-docker.ini | 2 + scripts/config/metrictank-package.ini | 2 + 13 files changed, 343 insertions(+), 163 deletions(-) diff --git a/api/cluster.go b/api/cluster.go index b9cfd8c0e7..3867497f9b 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -158,7 +158,8 @@ func (s *Server) indexList(ctx *middleware.Context, req models.IndexList) { } func (s *Server) getData(ctx *middleware.Context, request models.GetData) { - series, err := s.getTargetsLocal(ctx.Req.Context(), request.Requests) + reqCtx, cancel := context.WithCancel(ctx.Req.Context()) + series, err := s.getTargetsLocal(reqCtx, cancel, request.Requests) if err != nil { // the only errors returned are from us catching panics, so we should treat them // all as internalServerErrors diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 96a2697e20..2fdf04f445 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -21,6 +21,11 @@ import ( "gopkg.in/raintank/schema.v1" ) +type getTargetsResp struct { + series []models.Series + err error +} + // doRecover is the handler that turns panics into returns from the top level of getTarget. func doRecover(errp *error) { e := recover() @@ -124,26 +129,18 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se } } - var mu sync.Mutex var wg sync.WaitGroup - out := make([]models.Series, 0) - errs := make([]error, 0) - + responses := make(chan getTargetsResp, 1) + getCtx, cancel := context.WithCancel(ctx) + defer cancel() if len(localReqs) > 0 { wg.Add(1) go func() { // the only errors returned are from us catching panics, so we should treat them // all as internalServerErrors - series, err := s.getTargetsLocal(ctx, localReqs) - mu.Lock() - if err != nil { - errs = append(errs, err) - } - if len(series) > 0 { - out = append(out, series...) - } - mu.Unlock() + series, err := s.getTargetsLocal(getCtx, cancel, localReqs) + responses <- getTargetsResp{series, err} wg.Done() }() } @@ -151,81 +148,84 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se wg.Add(1) go func() { // all errors returned returned are *response.Error. - series, err := s.getTargetsRemote(ctx, remoteReqs) - mu.Lock() - if err != nil { - errs = append(errs, err) - } - if len(series) > 0 { - out = append(out, series...) - } - mu.Unlock() + series, err := s.getTargetsRemote(getCtx, cancel, remoteReqs) + responses <- getTargetsResp{series, err} wg.Done() }() } - wg.Wait() - var err error - if len(errs) > 0 { - err = errs[0] + + // wait for all getTargets goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + out := make([]models.Series, 0) + for resp := range responses { + if resp.err != nil { + return nil, resp.err + } + out = append(out, resp.series...) } log.Debug("DP getTargets: %d series found on cluster", len(out)) - return out, err + return out, nil } -func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) { - seriesChan := make(chan []models.Series, len(remoteReqs)) - errorsChan := make(chan error, len(remoteReqs)) +func (s *Server) getTargetsRemote(ctx context.Context, cancel context.CancelFunc, remoteReqs map[string][]models.Req) ([]models.Series, error) { + responses := make(chan getTargetsResp, 1) + wg := sync.WaitGroup{} wg.Add(len(remoteReqs)) for _, nodeReqs := range remoteReqs { log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.Name) - go func(ctx context.Context, reqs []models.Req) { + go func(reqs []models.Req) { defer wg.Done() node := reqs[0].Node buf, err := node.Post(ctx, "getTargetsRemote", "/getdata", models.GetData{Requests: reqs}) if err != nil { - errorsChan <- err + cancel() // cancel all other requests. + responses <- getTargetsResp{nil, err} return } var resp models.GetDataResp _, err = resp.UnmarshalMsg(buf) if err != nil { log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err) - errorsChan <- err + cancel() // cancel all other requests. + responses <- getTargetsResp{nil, err} return } log.Debug("DP getTargetsRemote: %s returned %d series", node.Name, len(resp.Series)) - seriesChan <- resp.Series - }(ctx, nodeReqs) + responses <- getTargetsResp{resp.Series, nil} + }(nodeReqs) } + + // wait for all getTargetsRemote goroutines to end, then close our responses channel go func() { wg.Wait() - close(seriesChan) - close(errorsChan) + close(responses) }() + out := make([]models.Series, 0) - var err error - for series := range seriesChan { - out = append(out, series...) + for resp := range responses { + if resp.err != nil { + return nil, resp.err + } + out = append(out, resp.series...) } log.Debug("DP getTargetsRemote: total of %d series found on peers", len(out)) - for e := range errorsChan { - err = e - break - } - return out, err + return out, nil } // error is the error of the first failing target request -func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]models.Series, error) { +func (s *Server) getTargetsLocal(ctx context.Context, cancel context.CancelFunc, reqs []models.Req) ([]models.Series, error) { log.Debug("DP getTargetsLocal: handling %d reqs locally", len(reqs)) - seriesChan := make(chan models.Series, len(reqs)) - errorsChan := make(chan error, len(reqs)) - // TODO: abort pending requests on error, maybe use context, maybe timeouts too - wg := sync.WaitGroup{} + responses := make(chan getTargetsResp, 1) + + var wg sync.WaitGroup wg.Add(len(reqs)) for _, req := range reqs { - go func(ctx context.Context, wg *sync.WaitGroup, req models.Req) { + go func(req models.Req) { ctx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal") req.Trace(span) defer span.Finish() @@ -233,10 +233,11 @@ func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]mode points, interval, err := s.getTarget(ctx, req) if err != nil { tags.Error.Set(span, true) - errorsChan <- err + cancel() // cancel all other requests. + responses <- getTargetsResp{nil, err} } else { getTargetDuration.Value(time.Now().Sub(pre)) - seriesChan <- models.Series{ + responses <- getTargetsResp{[]models.Series{{ Target: req.Target, // always simply the metric name from index Datapoints: points, Interval: interval, @@ -245,27 +246,24 @@ func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]mode QueryTo: req.To, QueryCons: req.ConsReq, Consolidator: req.Consolidator, - } + }}, nil} } wg.Done() - }(ctx, &wg, req) + }(req) } go func() { wg.Wait() - close(seriesChan) - close(errorsChan) + close(responses) }() out := make([]models.Series, 0, len(reqs)) - var err error - for series := range seriesChan { - out = append(out, series) + for resp := range responses { + if resp.err != nil { + return nil, resp.err + } + out = append(out, resp.series...) } log.Debug("DP getTargetsLocal: %d series found locally", len(out)) - for e := range errorsChan { - err = e - break - } - return out, err + return out, nil } @@ -339,6 +337,14 @@ func (s *Server) getSeries(ctx *requestContext) mdata.Result { return res } + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.ctx.Done(): + //request canceled + return res + default: + } + // if oldest < to -> search until oldest, we already have the rest from mem // if to < oldest -> no need to search until oldest, only search until to until := util.Min(res.Oldest, ctx.To) @@ -416,6 +422,14 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk cacheRes := s.Cache.Search(ctx.ctx, key, ctx.From, until) log.Debug("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End)) + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.ctx.Done(): + //request canceled + return nil + default: + } + for _, itgen := range cacheRes.Start { iter, err := itgen.Get() prevts = itgen.Ts diff --git a/api/graphite.go b/api/graphite.go index a9707ca900..a48d5edb5d 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -59,44 +59,61 @@ func (s *Server) findSeries(ctx context.Context, orgId int, patterns []string, s return nil, err } log.Debug("HTTP findSeries for %v across %d instances", patterns, len(peers)) - errors := make([]error, 0) - series := make([]Series, 0) - - var mu sync.Mutex var wg sync.WaitGroup + + responses := make(chan struct { + series []Series + err error + }, 1) + findCtx, cancel := context.WithCancel(ctx) + defer cancel() for _, peer := range peers { log.Debug("HTTP findSeries getting results from %s", peer.Name) wg.Add(1) if peer.IsLocal() { go func() { - result, err := s.findSeriesLocal(ctx, orgId, patterns, seenAfter) - mu.Lock() + result, err := s.findSeriesLocal(findCtx, orgId, patterns, seenAfter) if err != nil { - errors = append(errors, err) + // cancel requests on all other peers. + cancel() } - series = append(series, result...) - mu.Unlock() + responses <- struct { + series []Series + err error + }{result, err} wg.Done() }() } else { go func(peer cluster.Node) { - result, err := s.findSeriesRemote(ctx, orgId, patterns, seenAfter, peer) - mu.Lock() + result, err := s.findSeriesRemote(findCtx, orgId, patterns, seenAfter, peer) if err != nil { - errors = append(errors, err) + // cancel requests on all other peers. + cancel() } - series = append(series, result...) - mu.Unlock() + responses <- struct { + series []Series + err error + }{result, err} wg.Done() }(peer) } } - wg.Wait() - if len(errors) > 0 { - err = errors[0] + + // wait for all findSeries goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + series := make([]Series, 0) + for resp := range responses { + if resp.err != nil { + return nil, err + } + series = append(series, resp.series...) } - return series, err + return series, nil } func (s *Server) findSeriesLocal(ctx context.Context, orgId int, patterns []string, seenAfter int64) ([]Series, error) { @@ -133,6 +150,12 @@ func (s *Server) findSeriesRemote(ctx context.Context, orgId int, patterns []str log.Error(4, "HTTP Render error querying %s/index/find: %q", peer.Name, err) return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } resp := models.NewIndexFindResp() _, err = resp.UnmarshalMsg(buf) if err != nil { @@ -249,6 +272,15 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR return } + // check to see if the request has been canceled, if so abort now. + select { + case <-newctx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + noDataPoints := true for _, o := range out { if len(o.Datapoints) != 0 { @@ -279,11 +311,22 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin return } nodes := make([]idx.Node, 0) - series, err := s.findSeries(ctx.Req.Context(), ctx.OrgId, []string{request.Query}, int64(fromUnix)) + reqCtx := ctx.Req.Context() + series, err := s.findSeries(reqCtx, ctx.OrgId, []string{request.Query}, int64(fromUnix)) if err != nil { response.Write(ctx, response.WrapError(err)) return } + + // check to see if the request has been canceled, if so abort now. + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + seenPaths := make(map[string]struct{}) // different nodes may have overlapping data in their index. // maybe because they used to receive a certain shard but now dont. or because they host metrics under branches @@ -320,6 +363,12 @@ func (s *Server) listRemote(ctx context.Context, orgId int, peer cluster.Node) ( log.Error(4, "HTTP IndexJson() error querying %s/index/list: %q", peer.Name, err) return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } result := make([]idx.Archive, 0) for len(buf) != 0 { var def idx.Archive @@ -339,53 +388,63 @@ func (s *Server) metricsIndex(ctx *middleware.Context) { response.Write(ctx, response.WrapError(err)) return } - errors := make([]error, 0) - series := make([]idx.Archive, 0) - seenDefs := make(map[string]struct{}) - var mu sync.Mutex + reqCtx := ctx.Req.Context() + responses := make(chan struct { + series []idx.Archive + err error + }, 1) var wg sync.WaitGroup for _, peer := range peers { wg.Add(1) if peer.IsLocal() { go func() { result := s.listLocal(ctx.OrgId) - mu.Lock() - for _, def := range result { - if _, ok := seenDefs[def.Id]; !ok { - series = append(series, def) - seenDefs[def.Id] = struct{}{} - } - } - mu.Unlock() + responses <- struct { + series []idx.Archive + err error + }{result, nil} wg.Done() }() } else { go func(peer cluster.Node) { - result, err := s.listRemote(ctx.Req.Context(), ctx.OrgId, peer) - mu.Lock() - if err != nil { - errors = append(errors, err) - } - for _, def := range result { - if _, ok := seenDefs[def.Id]; !ok { - series = append(series, def) - seenDefs[def.Id] = struct{}{} - } - } - mu.Unlock() + result, err := s.listRemote(reqCtx, ctx.OrgId, peer) + responses <- struct { + series []idx.Archive + err error + }{result, err} wg.Done() }(peer) } } - wg.Wait() - if len(errors) > 0 { - err = errors[0] + + // wait for all list goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + series := make([]idx.Archive, 0) + seenDefs := make(map[string]struct{}) + for resp := range responses { + if resp.err != nil { + response.Write(ctx, response.WrapError(err)) + return + } + for _, def := range resp.series { + if _, ok := seenDefs[def.Id]; !ok { + series = append(series, def) + seenDefs[def.Id] = struct{}{} + } + } } - if err != nil { - log.Error(3, "HTTP IndexJson() %s", err.Error()) - response.Write(ctx, response.WrapError(err)) + // check to see if the request has been canceled, if so abort now. + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) return + default: } response.Write(ctx, response.NewFastJson(200, models.MetricNames(series))) @@ -479,9 +538,13 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete peers := cluster.Manager.MemberList() peers = append(peers, cluster.Manager.ThisNode()) log.Debug("HTTP metricsDelete for %v across %d instances", req.Query, len(peers)) - errors := make([]error, 0) + + reqCtx := ctx.Req.Context() deleted := 0 - var mu sync.Mutex + responses := make(chan struct { + deleted int + err error + }, 1) var wg sync.WaitGroup for _, peer := range peers { log.Debug("HTTP metricsDelete getting results from %s", peer.Name) @@ -489,36 +552,53 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete if peer.IsLocal() { go func() { result, err := s.metricsDeleteLocal(ctx.OrgId, req.Query) - mu.Lock() + var e error if err != nil { - // errors can be due to bad user input or corrupt index. if strings.Contains(err.Error(), "Index is corrupt") { - errors = append(errors, response.NewError(http.StatusInternalServerError, err.Error())) + e = response.NewError(http.StatusInternalServerError, err.Error()) } else { - errors = append(errors, response.NewError(http.StatusBadRequest, err.Error())) + e = response.NewError(http.StatusBadRequest, err.Error()) } } - deleted += result - mu.Unlock() + responses <- struct { + deleted int + err error + }{result, e} wg.Done() }() } else { go func(peer cluster.Node) { - result, err := s.metricsDeleteRemote(ctx.Req.Context(), ctx.OrgId, req.Query, peer) - mu.Lock() - if err != nil { - errors = append(errors, err) - } - deleted += result - mu.Unlock() + result, err := s.metricsDeleteRemote(reqCtx, ctx.OrgId, req.Query, peer) + responses <- struct { + deleted int + err error + }{result, err} wg.Done() }(peer) } } - wg.Wait() - var err error - if len(errors) > 0 { - response.Write(ctx, response.WrapError(err)) + + // wait for all metricsDelete goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() + + for resp := range responses { + if resp.err != nil { + response.Write(ctx, response.WrapError(resp.err)) + return + } + deleted += resp.deleted + } + + // check to see if the request has been canceled, if so abort now. + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: } resp := models.MetricsDeleteResp{ @@ -545,6 +625,14 @@ func (s *Server) metricsDeleteRemote(ctx context.Context, orgId int, query strin log.Error(4, "HTTP metricDelete error querying %s/index/delete: %q", peer.Name, err) return 0, err } + + select { + case <-ctx.Done(): + //request canceled + return 0, nil + default: + } + resp := models.MetricsDeleteResp{} _, err = resp.UnmarshalMsg(buf) if err != nil { @@ -568,6 +656,12 @@ func (s *Server) executePlan(ctx context.Context, orgId int, plan expr.Plan) ([] // e.g. target=movingAvg(foo.*, "1h")&target=foo.* // note that in this case we fetch foo.* twice. can be optimized later for _, r := range plan.Reqs { + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } series, err := s.findSeries(ctx, orgId, []string{r.Query}, int64(r.From)) if err != nil { return nil, err @@ -600,6 +694,13 @@ func (s *Server) executePlan(ctx context.Context, orgId int, plan expr.Plan) ([] } } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + reqRenderSeriesCount.Value(len(reqs)) if len(reqs) == 0 { return nil, nil @@ -626,6 +727,7 @@ func (s *Server) executePlan(ctx context.Context, orgId int, plan expr.Plan) ([] log.Error(3, "HTTP Render %s", err.Error()) return nil, err } + out = mergeSeries(out) // instead of waiting for all data to come in and then start processing everything, we could consider starting processing earlier, at the risk of doing needless work diff --git a/api/response/error.go b/api/response/error.go index a58ce7cc65..3d7723eb83 100644 --- a/api/response/error.go +++ b/api/response/error.go @@ -51,3 +51,5 @@ func (r *ErrorResp) Headers() (headers map[string]string) { headers = map[string]string{"content-type": "text/plain"} return headers } + +var RequestCanceledErr = NewError(499, "request canceled") diff --git a/cluster/config.go b/cluster/config.go index cdacd080b7..e926454f04 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -23,7 +23,8 @@ var ( httpTimeout time.Duration minAvailableShards int - client http.Client + client http.Client + transport *http.Transport ) func ConfigSetup() { @@ -57,17 +58,17 @@ func ConfigProcess() { clusterPort = addr.Port Mode = ModeType(mode) - + transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: time.Second * 5, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: time.Second, + } client = http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: time.Second * 5, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: time.Second, - }, - Timeout: httpTimeout, + Transport: transport, + Timeout: httpTimeout, } } diff --git a/cluster/node.go b/cluster/node.go index 42b90e06ad..b88cf0d7f3 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -152,17 +152,45 @@ func (n Node) Post(ctx context.Context, name, path string, body Traceable) (ret log.Error(3, "CLU failed to inject span into headers: %s", err) } req.Header.Add("Content-Type", "application/json") - rsp, err := client.Do(req) - if err != nil { - log.Error(3, "CLU Node: %s unreachable. %s", n.Name, err.Error()) - return nil, NewError(http.StatusServiceUnavailable, fmt.Errorf("cluster node unavailable")) + + c := make(chan struct { + r *http.Response + err error + }, 1) + + go func() { + rsp, err := client.Do(req) + c <- struct { + r *http.Response + err error + }{rsp, err} + }() + + // wait for either our results from the http request or if out context has been canceled + // then abort the http request. + select { + case <-ctx.Done(): + log.Debug("CLU Node: context canceled. terminating request to peer %s", n.Name) + transport.CancelRequest(req) + <-c // Wait for client.Do but ignore result + case resp := <-c: + err := resp.err + rsp := resp.r + if err != nil { + tags.Error.Set(span, true) + log.Error(3, "CLU Node: %s unreachable. %s", n.Name, err.Error()) + return nil, NewError(http.StatusServiceUnavailable, fmt.Errorf("cluster node unavailable")) + } + return handleResp(rsp) } - return handleResp(rsp) + + return nil, nil } func handleResp(rsp *http.Response) ([]byte, error) { defer rsp.Body.Close() if rsp.StatusCode != 200 { + ioutil.ReadAll(rsp.Body) return nil, NewError(rsp.StatusCode, fmt.Errorf(rsp.Status)) } return ioutil.ReadAll(rsp.Body) diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index 048972fdaa..df241c1721 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://graphite log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index 7cb6f96242..3c938a4a02 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://graphite log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/mdata/cwr.go b/mdata/cwr.go index eea3b8c873..c9c97026e5 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -1,6 +1,7 @@ package mdata import ( + "context" "time" "github.com/grafana/metrictank/mdata/chunk" @@ -13,6 +14,7 @@ type ChunkReadRequest struct { p []interface{} timestamp time.Time out chan outcome + ctx context.Context } type ChunkWriteRequest struct { diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index 493c2bc050..24717cfc1e 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -416,6 +416,14 @@ func (o asc) Less(i, j int) bool { return o[i].sortKey < o[j].sortKey } func (c *CassandraStore) processReadQueue() { for crr := range c.readQueue { + // check to see if the request has been canceled, if so abort now. + select { + case <-crr.ctx.Done(): + //request canceled + crr.out <- outcome{omitted: true} + continue + default: + } waitDuration := time.Since(crr.timestamp) cassGetWaitDuration.Value(waitDuration) if waitDuration > c.omitReadTimeout { @@ -423,6 +431,7 @@ func (c *CassandraStore) processReadQueue() { crr.out <- outcome{omitted: true} continue } + pre := time.Now() iter := outcome{crr.month, crr.sortKey, c.Session.Query(crr.q, crr.p...).Iter(), false} cassGetExecDuration.Value(time.Since(pre)) @@ -460,7 +469,7 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key, table string, sta crrs := make([]*ChunkReadRequest, 0) query := func(month, sortKey uint32, q string, p ...interface{}) { - crrs = append(crrs, &ChunkReadRequest{month, sortKey, q, p, pre, nil}) + crrs = append(crrs, &ChunkReadRequest{month, sortKey, q, p, pre, nil, ctx}) } start_month := start - (start % Month_sec) // starting row has to be at, or before, requested start @@ -503,6 +512,10 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key, table string, sta for i := range crrs { crrs[i].out = results select { + case <-ctx.Done(): + // request has been canceled, so no need to continue queuing reads. + // reads already queued will be aborted when read from the queue. + return nil, nil case c.readQueue <- crrs[i]: default: cassReadQueueFull.Inc() @@ -514,17 +527,24 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key, table string, sta outcomes := make([]outcome, 0, numQueries) seen := 0 - for o := range results { - if o.omitted { - tracing.Failure(span) - tracing.Error(span, errReadTooOld) - return nil, errReadTooOld - } - seen += 1 - outcomes = append(outcomes, o) - if seen == numQueries { - close(results) - break +LOOP: + for { + select { + case <-ctx.Done(): + // request has been canceled, so no need to continue processing results + return nil, nil + case o := <-results: + if o.omitted { + tracing.Failure(span) + tracing.Error(span, errReadTooOld) + return nil, errReadTooOld + } + seen += 1 + outcomes = append(outcomes, o) + if seen == numQueries { + close(results) + break LOOP + } } } cassGetChunksDuration.Value(time.Since(pre)) diff --git a/metrictank-sample.ini b/metrictank-sample.ini index ce503eca8f..b8d98465c5 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -149,6 +149,8 @@ fallback-graphite-addr = http://localhost:8080 log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index 32dade5fe4..5b12044b74 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://graphite log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index 401708b22a..77b1780078 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -146,6 +146,8 @@ fallback-graphite-addr = http://localhost:8080 log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series. +get-targets-concurrency = 20 ## metric data inputs ## From c8207b28b9be113a952eda424c9d73780ccc1341 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 15 Sep 2017 11:37:27 +0800 Subject: [PATCH 02/11] add per request ratelimiting of getTargetsLocal limit the number of getTargetsLocal threads that can be running at once. This ensures that if a large query that spans 1000s of series is executed, 1000s of goroutines wont be run at once to hit the caches and cassandra. This also ensures that a single query cant fill the cassandra read queue. Mt only executes cassandra-read-concurrency requests at a time so requests will always need to be queued. With this new approach multiple render requests will share read queue and their read requests will be interspaced in the queue, rather then the old behaviour where a render request would have to complete all of its reads before the next render request would be processed. Overall this will lead to a more consistant performance profile for render requests. --- api/config.go | 3 +++ api/dataprocessor.go | 17 ++++++++++++++++- docs/config.md | 2 ++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/api/config.go b/api/config.go index 718ee718e6..64d5c5653e 100644 --- a/api/config.go +++ b/api/config.go @@ -27,6 +27,8 @@ var ( fallbackGraphite string timeZoneStr string + getTargetsConcurrency int + graphiteProxy *httputil.ReverseProxy timeZone *time.Location ) @@ -45,6 +47,7 @@ func ConfigSetup() { apiCfg.BoolVar(&multiTenant, "multi-tenant", true, "require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed") apiCfg.StringVar(&fallbackGraphite, "fallback-graphite-addr", "http://localhost:8080", "in case our /render endpoint does not support the requested processing, proxy the request to this graphite") apiCfg.StringVar(&timeZoneStr, "time-zone", "local", "timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone") + apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.") globalconf.Register("http", apiCfg) } diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 2fdf04f445..be6ee6b415 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -21,6 +21,15 @@ import ( "gopkg.in/raintank/schema.v1" ) +type limiter chan struct{} + +func (l limiter) enter() { l <- struct{}{} } +func (l limiter) leave() { <-l } + +func newLimiter(l int) limiter { + return make(chan struct{}, l) +} + type getTargetsResp struct { series []models.Series err error @@ -224,11 +233,14 @@ func (s *Server) getTargetsLocal(ctx context.Context, cancel context.CancelFunc, var wg sync.WaitGroup wg.Add(len(reqs)) + reqLimiter := newLimiter(getTargetsConcurrency) for _, req := range reqs { + // if there are already getDataConcurrency goroutines running, then block + // until a slot becomes free. + reqLimiter.enter() go func(req models.Req) { ctx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal") req.Trace(span) - defer span.Finish() pre := time.Now() points, interval, err := s.getTarget(ctx, req) if err != nil { @@ -249,6 +261,9 @@ func (s *Server) getTargetsLocal(ctx context.Context, cancel context.CancelFunc, }}, nil} } wg.Done() + // pop an item of our limiter so that other requests can be processed. + reqLimiter.leave() + span.Finish() }(req) } go func() { diff --git a/docs/config.md b/docs/config.md index 2200cfa6e2..720fd68859 100644 --- a/docs/config.md +++ b/docs/config.md @@ -187,6 +187,8 @@ fallback-graphite-addr = http://localhost:8080 log-min-dur = 5min # timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone. time-zone = local +# maximum number of concurrent threads for fetching data on the local node +get-targets-concurrency = 20 ``` ## metric data inputs ## From b4c06482e945665f4643714eea202649b849e5a6 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 21 Sep 2017 05:27:58 +0800 Subject: [PATCH 03/11] dont pass a cancelFunc to getTargetsRemote/Local --- api/cluster.go | 4 +--- api/dataprocessor.go | 37 ++++++++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/api/cluster.go b/api/cluster.go index 3867497f9b..1c87e632f0 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -1,7 +1,6 @@ package api import ( - "context" "errors" "fmt" "net/http" @@ -158,8 +157,7 @@ func (s *Server) indexList(ctx *middleware.Context, req models.IndexList) { } func (s *Server) getData(ctx *middleware.Context, request models.GetData) { - reqCtx, cancel := context.WithCancel(ctx.Req.Context()) - series, err := s.getTargetsLocal(reqCtx, cancel, request.Requests) + series, err := s.getTargetsLocal(ctx.Req.Context(), request.Requests) if err != nil { // the only errors returned are from us catching panics, so we should treat them // all as internalServerErrors diff --git a/api/dataprocessor.go b/api/dataprocessor.go index be6ee6b415..d0805c1619 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -148,7 +148,10 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se go func() { // the only errors returned are from us catching panics, so we should treat them // all as internalServerErrors - series, err := s.getTargetsLocal(getCtx, cancel, localReqs) + series, err := s.getTargetsLocal(getCtx, localReqs) + if err != nil { + cancel() + } responses <- getTargetsResp{series, err} wg.Done() }() @@ -157,7 +160,10 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se wg.Add(1) go func() { // all errors returned returned are *response.Error. - series, err := s.getTargetsRemote(getCtx, cancel, remoteReqs) + series, err := s.getTargetsRemote(getCtx, remoteReqs) + if err != nil { + cancel() + } responses <- getTargetsResp{series, err} wg.Done() }() @@ -180,8 +186,8 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se return out, nil } -func (s *Server) getTargetsRemote(ctx context.Context, cancel context.CancelFunc, remoteReqs map[string][]models.Req) ([]models.Series, error) { - responses := make(chan getTargetsResp, 1) +func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) { + responses := make(chan getTargetsResp, len(remoteReqs)) wg := sync.WaitGroup{} wg.Add(len(remoteReqs)) @@ -192,7 +198,6 @@ func (s *Server) getTargetsRemote(ctx context.Context, cancel context.CancelFunc node := reqs[0].Node buf, err := node.Post(ctx, "getTargetsRemote", "/getdata", models.GetData{Requests: reqs}) if err != nil { - cancel() // cancel all other requests. responses <- getTargetsResp{nil, err} return } @@ -200,7 +205,6 @@ func (s *Server) getTargetsRemote(ctx context.Context, cancel context.CancelFunc _, err = resp.UnmarshalMsg(buf) if err != nil { log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err) - cancel() // cancel all other requests. responses <- getTargetsResp{nil, err} return } @@ -227,22 +231,33 @@ func (s *Server) getTargetsRemote(ctx context.Context, cancel context.CancelFunc } // error is the error of the first failing target request -func (s *Server) getTargetsLocal(ctx context.Context, cancel context.CancelFunc, reqs []models.Req) ([]models.Series, error) { +func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]models.Series, error) { log.Debug("DP getTargetsLocal: handling %d reqs locally", len(reqs)) - responses := make(chan getTargetsResp, 1) + responses := make(chan getTargetsResp, len(reqs)) var wg sync.WaitGroup - wg.Add(len(reqs)) reqLimiter := newLimiter(getTargetsConcurrency) + + rCtx, cancel := context.WithCancel(ctx) + defer cancel() +LOOP: for _, req := range reqs { + // check to see if the request has been canceled, if so abort now. + select { + case <-rCtx.Done(): + //request canceled + break LOOP + default: + } // if there are already getDataConcurrency goroutines running, then block // until a slot becomes free. reqLimiter.enter() + wg.Add(1) go func(req models.Req) { - ctx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal") + rCtx, span := tracing.NewSpan(rCtx, s.Tracer, "getTargetsLocal") req.Trace(span) pre := time.Now() - points, interval, err := s.getTarget(ctx, req) + points, interval, err := s.getTarget(rCtx, req) if err != nil { tags.Error.Set(span, true) cancel() // cancel all other requests. From f5a5988d47306152d000467ecbe54655feb8a875 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 21 Sep 2017 17:15:31 +0800 Subject: [PATCH 04/11] cancel context when peer queries fail for metricsIndex and metricsDelete --- api/graphite.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/api/graphite.go b/api/graphite.go index a48d5edb5d..85d1974eec 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -388,7 +388,8 @@ func (s *Server) metricsIndex(ctx *middleware.Context) { response.Write(ctx, response.WrapError(err)) return } - reqCtx := ctx.Req.Context() + reqCtx, cancel := context.WithCancel(ctx.Req.Context()) + defer cancel() responses := make(chan struct { series []idx.Archive err error @@ -408,6 +409,9 @@ func (s *Server) metricsIndex(ctx *middleware.Context) { } else { go func(peer cluster.Node) { result, err := s.listRemote(reqCtx, ctx.OrgId, peer) + if err != nil { + cancel() + } responses <- struct { series []idx.Archive err error @@ -539,12 +543,13 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete peers = append(peers, cluster.Manager.ThisNode()) log.Debug("HTTP metricsDelete for %v across %d instances", req.Query, len(peers)) - reqCtx := ctx.Req.Context() + reqCtx, cancel := context.WithCancel(ctx.Req.Context()) + defer cancel() deleted := 0 responses := make(chan struct { deleted int err error - }, 1) + }, len(peers)) var wg sync.WaitGroup for _, peer := range peers { log.Debug("HTTP metricsDelete getting results from %s", peer.Name) @@ -554,6 +559,7 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete result, err := s.metricsDeleteLocal(ctx.OrgId, req.Query) var e error if err != nil { + cancel() if strings.Contains(err.Error(), "Index is corrupt") { e = response.NewError(http.StatusInternalServerError, err.Error()) } else { @@ -569,6 +575,9 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete } else { go func(peer cluster.Node) { result, err := s.metricsDeleteRemote(reqCtx, ctx.OrgId, req.Query, peer) + if err != nil { + cancel() + } responses <- struct { deleted int err error From d76353913fa7855aaf0590fe448baf82727d659b Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 22 Sep 2017 00:53:21 +0800 Subject: [PATCH 05/11] more consistant conetxt.WithCancel logic - the functions that spawn background work in new goroutines should be responsible for signalling to them that they should shutdown. So these functions should create a context.WithCancel and call cancel when the the first error is encounted. --- api/dataprocessor.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index d0805c1619..7f81e50a37 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -188,7 +188,8 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) { responses := make(chan getTargetsResp, len(remoteReqs)) - + rCtx, cancel := context.WithCancel(ctx) + defer cancel() wg := sync.WaitGroup{} wg.Add(len(remoteReqs)) for _, nodeReqs := range remoteReqs { @@ -196,14 +197,16 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m go func(reqs []models.Req) { defer wg.Done() node := reqs[0].Node - buf, err := node.Post(ctx, "getTargetsRemote", "/getdata", models.GetData{Requests: reqs}) + buf, err := node.Post(rCtx, "getTargetsRemote", "/getdata", models.GetData{Requests: reqs}) if err != nil { + cancel() responses <- getTargetsResp{nil, err} return } var resp models.GetDataResp _, err = resp.UnmarshalMsg(buf) if err != nil { + cancel() log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err) responses <- getTargetsResp{nil, err} return From e87b6a1fd29f9754f18d7c9d16778a1186e602cb Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 22 Sep 2017 01:34:11 +0800 Subject: [PATCH 06/11] check if context is canceled before searching idx --- api/graphite.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/graphite.go b/api/graphite.go index 85d1974eec..483eb79528 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -119,6 +119,12 @@ func (s *Server) findSeries(ctx context.Context, orgId int, patterns []string, s func (s *Server) findSeriesLocal(ctx context.Context, orgId int, patterns []string, seenAfter int64) ([]Series, error) { result := make([]Series, 0) for _, pattern := range patterns { + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } _, span := tracing.NewSpan(ctx, s.Tracer, "findSeriesLocal") span.SetTag("org", orgId) span.SetTag("pattern", pattern) From e08a3cabe845de37123b63414157807cc6435697 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Wed, 22 Nov 2017 00:02:32 +0800 Subject: [PATCH 07/11] use contexts correctly in tag methods --- api/cluster.go | 43 +++++++++++++++++------------ api/graphite.go | 73 ++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 95 insertions(+), 21 deletions(-) diff --git a/api/cluster.go b/api/cluster.go index 1c87e632f0..e81b7d5b01 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -1,6 +1,7 @@ package api import ( + "context" "errors" "fmt" "net/http" @@ -183,7 +184,8 @@ func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) { } // peerQuery takes a request and the path to request it on, then fans it out -// across the cluster, except to the local peer. +// across the cluster, except to the local peer. If any peer fails requests to +// other peers are aborted. // ctx: request context // data: request to be submitted // name: name to be used in logging & tracing @@ -196,11 +198,13 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa } log.Debug("HTTP %s across %d instances", name, len(peers)-1) - result := make([][]byte, 0, len(peers)-1) + reqCtx, cancel := context.WithCancel(ctx) + defer cancel() - var errors []error - var errLock sync.Mutex - var resLock sync.Mutex + responses := make(chan struct { + data []byte + err error + }, 1) var wg sync.WaitGroup for _, peer := range peers { if peer.IsLocal() { @@ -210,24 +214,29 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa go func(peer cluster.Node) { defer wg.Done() log.Debug("HTTP Render querying %s%s", peer.Name, path) - buf, err := peer.Post(ctx, name, path, data) + buf, err := peer.Post(reqCtx, name, path, data) if err != nil { + cancel() log.Error(4, "HTTP Render error querying %s%s: %q", peer.Name, path, err) - errLock.Lock() - errors = append(errors, err) - errLock.Unlock() - return } - - resLock.Lock() - result = append(result, buf) - resLock.Unlock() + responses <- struct { + data []byte + err error + }{buf, err} }(peer) } - wg.Wait() + // wait for all list goroutines to end, then close our responses channel + go func() { + wg.Wait() + close(responses) + }() - if len(errors) > 0 { - return nil, errors[0] + result := make([][]byte, 0, len(peers)-1) + for resp := range responses { + if resp.err != nil { + return nil, err + } + result = append(result, resp.data) } return result, nil diff --git a/api/graphite.go b/api/graphite.go index 483eb79528..e2a10b5489 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -801,13 +801,21 @@ func (s *Server) graphiteTagDetails(ctx *middleware.Context, request models.Grap response.Write(ctx, response.NewError(http.StatusBadRequest, "not tag specified")) return } - - tagValues, err := s.clusterTagDetails(ctx.Req.Context(), ctx.OrgId, tag, request.Filter, request.From) + reqCtx := ctx.Req.Context() + tagValues, err := s.clusterTagDetails(reqCtx, ctx.OrgId, tag, request.Filter, request.From) if err != nil { response.Write(ctx, response.WrapError(err)) return } + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + resp := models.GraphiteTagDetailsResp{ Tag: tag, Values: make([]models.GraphiteTagDetailsValueResp, 0, len(tagValues)), @@ -831,12 +839,24 @@ func (s *Server) clusterTagDetails(ctx context.Context, orgId int, tag, filter s if result == nil { result = make(map[string]uint64) } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } data := models.IndexTagDetails{OrgId: orgId, Tag: tag, Filter: filter, From: from} bufs, err := s.peerQuery(ctx, data, "clusterTagDetails", "/index/tag_details") if err != nil { return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } resp := models.IndexTagDetailsResp{} for _, buf := range bufs { _, err = resp.UnmarshalMsg(buf) @@ -852,12 +872,21 @@ func (s *Server) clusterTagDetails(ctx context.Context, orgId int, tag, filter s } func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.GraphiteTagFindSeries) { - series, err := s.clusterFindByTag(ctx.Req.Context(), ctx.OrgId, request.Expr, request.From) + reqCtx := ctx.Req.Context() + series, err := s.clusterFindByTag(reqCtx, ctx.OrgId, request.Expr, request.From) if err != nil { response.Write(ctx, response.WrapError(err)) return } + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + response.Write(ctx, response.NewJson(200, series, "")) } @@ -869,6 +898,13 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId int, expressions [] return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + for _, series := range result { seriesSet[series] = struct{}{} } @@ -879,6 +915,13 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId int, expressions [] return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + resp := models.IndexFindByTagResp{} for _, buf := range bufs { _, err = resp.UnmarshalMsg(buf) @@ -899,12 +942,21 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId int, expressions [] } func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTags) { - tags, err := s.clusterTags(ctx.Req.Context(), ctx.OrgId, request.Filter, request.From) + reqCtx := ctx.Req.Context() + tags, err := s.clusterTags(reqCtx, ctx.OrgId, request.Filter, request.From) if err != nil { response.Write(ctx, response.WrapError(err)) return } + select { + case <-reqCtx.Done(): + //request canceled + response.Write(ctx, response.RequestCanceledErr) + return + default: + } + var resp models.GraphiteTagsResp for _, tag := range tags { resp = append(resp, models.GraphiteTagResp{Tag: tag}) @@ -917,6 +969,12 @@ func (s *Server) clusterTags(ctx context.Context, orgId int, filter string, from if err != nil { return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } tagSet := make(map[string]struct{}, len(result)) for _, tag := range result { @@ -929,6 +987,13 @@ func (s *Server) clusterTags(ctx context.Context, orgId int, filter string, from return nil, err } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } + resp := models.IndexTagsResp{} for _, buf := range bufs { _, err = resp.UnmarshalMsg(buf) From 7d3e8036ff0bfb55bcf848f9f5cb938829a05b19 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Wed, 22 Nov 2017 20:48:42 +0800 Subject: [PATCH 08/11] pass errors instead of panicing in data fetch pipeline - Instead of calling panic and having to recover the panic later, just pass errors throught the data fetch (getTargets) pipeline. This allows us to correctly cancel requests via the request context. --- api/dataprocessor.go | 185 ++++++++++++++++++++++++++++---------- api/dataprocessor_test.go | 10 ++- mdata/store_cassandra.go | 24 +++-- 3 files changed, 163 insertions(+), 56 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 7f81e50a37..b55a8cbebe 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -21,20 +21,6 @@ import ( "gopkg.in/raintank/schema.v1" ) -type limiter chan struct{} - -func (l limiter) enter() { l <- struct{}{} } -func (l limiter) leave() { <-l } - -func newLimiter(l int) limiter { - return make(chan struct{}, l) -} - -type getTargetsResp struct { - series []models.Series - err error -} - // doRecover is the handler that turns panics into returns from the top level of getTarget. func doRecover(errp *error) { e := recover() @@ -53,6 +39,20 @@ func doRecover(errp *error) { return } +type limiter chan struct{} + +func (l limiter) enter() { l <- struct{}{} } +func (l limiter) leave() { <-l } + +func newLimiter(l int) limiter { + return make(chan struct{}, l) +} + +type getTargetsResp struct { + series []models.Series + err error +} + // Fix assures all points are nicely aligned (quantized) and padded with nulls in case there's gaps in data // graphite does this quantization before storing, we may want to do that as well at some point // note: values are quantized to the right because we can't lie about the future: @@ -301,7 +301,7 @@ LOOP: } func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema.Point, interval uint32, err error) { - defer doRecover(&err) + doRecover(&err) readRollup := req.Archive != 0 // do we need to read from a downsampled series? normalize := req.AggNum > 1 // do we need to normalize points at runtime? // normalize is runtime consolidation but only for the purpose of bringing high-res @@ -316,28 +316,93 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema } if !readRollup && !normalize { - return s.getSeriesFixed(ctx, req, consolidation.None), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) + if err != nil { + return nil, req.OutInterval, err + } + return fixed, req.OutInterval, err } else if !readRollup && normalize { - return consolidation.Consolidate(s.getSeriesFixed(ctx, req, consolidation.None), req.AggNum, req.Consolidator), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) + if err != nil { + return nil, req.OutInterval, err + } + select { + case <-ctx.Done(): + //request canceled + return nil, req.OutInterval, nil + default: + } + return consolidation.Consolidate(fixed, req.AggNum, req.Consolidator), req.OutInterval, nil } else if readRollup && !normalize { if req.Consolidator == consolidation.Avg { + sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum) + if err != nil { + return nil, req.OutInterval, err + } + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.Done(): + //request canceled + return nil, req.OutInterval, nil + default: + } + cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt) + if err != nil { + return nil, req.OutInterval, err + } + select { + case <-ctx.Done(): + //request canceled + return nil, req.OutInterval, nil + default: + } return divide( - s.getSeriesFixed(ctx, req, consolidation.Sum), - s.getSeriesFixed(ctx, req, consolidation.Cnt), + sumFixed, + cntFixed, ), req.OutInterval, nil } else { - return s.getSeriesFixed(ctx, req, req.Consolidator), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) + return fixed, req.OutInterval, err } } else { // readRollup && normalize if req.Consolidator == consolidation.Avg { + sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum) + if err != nil { + return nil, req.OutInterval, err + } + select { + case <-ctx.Done(): + //request canceled + return nil, req.OutInterval, nil + default: + } + cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt) + if err != nil { + return nil, req.OutInterval, err + } + select { + case <-ctx.Done(): + //request canceled + return nil, req.OutInterval, nil + default: + } return divide( - consolidation.Consolidate(s.getSeriesFixed(ctx, req, consolidation.Sum), req.AggNum, consolidation.Sum), - consolidation.Consolidate(s.getSeriesFixed(ctx, req, consolidation.Cnt), req.AggNum, consolidation.Sum), + consolidation.Consolidate(sumFixed, req.AggNum, consolidation.Sum), + consolidation.Consolidate(cntFixed, req.AggNum, consolidation.Sum), ), req.OutInterval, nil } else { - return consolidation.Consolidate( - s.getSeriesFixed(ctx, req, req.Consolidator), req.AggNum, req.Consolidator), req.OutInterval, nil + fixed, err := s.getSeriesFixed(ctx, req, req.Consolidator) + if err != nil { + return nil, req.OutInterval, err + } + select { + case <-ctx.Done(): + //request canceled + return nil, req.OutInterval, nil + default: + } + return consolidation.Consolidate(fixed, req.AggNum, req.Consolidator), req.OutInterval, nil } } } @@ -352,38 +417,49 @@ func AggMetricKey(key, archive string, aggSpan uint32) string { return fmt.Sprintf("%s_%s_%d", key, archive, aggSpan) } -func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) []schema.Point { +func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) { rctx := newRequestContext(ctx, &req, consolidator) - res := s.getSeries(rctx) + res, err := s.getSeries(rctx) + if err != nil { + return nil, err + } + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } res.Points = append(s.itersToPoints(rctx, res.Iters), res.Points...) - return Fix(res.Points, req.From, req.To, req.ArchInterval) + return Fix(res.Points, req.From, req.To, req.ArchInterval), nil } -func (s *Server) getSeries(ctx *requestContext) mdata.Result { +func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { res := s.getSeriesAggMetrics(ctx) + select { + case <-ctx.ctx.Done(): + //request canceled + return res, nil + default: + } + log.Debug("oldest from aggmetrics is %d", res.Oldest) span := opentracing.SpanFromContext(ctx.ctx) span.SetTag("oldest_in_ring", res.Oldest) if res.Oldest <= ctx.From { reqSpanMem.ValueUint32(ctx.To - ctx.From) - return res - } - - // check to see if the request has been canceled, if so abort now. - select { - case <-ctx.ctx.Done(): - //request canceled - return res - default: + return res, nil } // if oldest < to -> search until oldest, we already have the rest from mem // if to < oldest -> no need to search until oldest, only search until to until := util.Min(res.Oldest, ctx.To) - - res.Iters = append(s.getSeriesCachedStore(ctx, until), res.Iters...) - return res + fromCache, err := s.getSeriesCachedStore(ctx, until) + if err != nil { + return res, err + } + res.Iters = append(fromCache, res.Iters...) + return res, nil } // getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive) @@ -433,7 +509,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) mdata.Result { } // will only fetch until until, but uses ctx.To for debug logging -func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk.Iter { +func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chunk.Iter, error) { var iters []chunk.Iter var prevts uint32 @@ -459,7 +535,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk select { case <-ctx.ctx.Done(): //request canceled - return nil + return iters, nil default: } @@ -470,17 +546,32 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk // TODO(replay) figure out what to do if one piece is corrupt tracing.Failure(span) tracing.Errorf(span, "itergen: error getting iter from Start list %+v", err) - continue + return iters, err } iters = append(iters, *iter) } + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.ctx.Done(): + //request canceled + return iters, nil + default: + } + // the request cannot completely be served from cache, it will require cassandra involvement if !cacheRes.Complete { if cacheRes.From != cacheRes.Until { storeIterGens, err := s.BackendStore.Search(ctx.ctx, key, ctx.Req.TTL, cacheRes.From, cacheRes.Until) if err != nil { - panic(err) + return iters, err + } + // check to see if the request has been canceled, if so abort now. + select { + case <-ctx.ctx.Done(): + //request canceled + return iters, nil + default: } for _, itgen := range storeIterGens { @@ -489,7 +580,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk // TODO(replay) figure out what to do if one piece is corrupt tracing.Failure(span) tracing.Errorf(span, "itergen: error getting iter from cassandra slice %+v", err) - continue + return iters, err } // it's important that the itgens get added in chronological order, // currently we rely on cassandra returning results in order @@ -505,13 +596,13 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk if err != nil { // TODO(replay) figure out what to do if one piece is corrupt log.Error(3, "itergen: error getting iter from cache result end slice %+v", err) - continue + return iters, err } iters = append(iters, *it) } } - return iters + return iters, nil } // check for duplicate series names for the same query. If found merge the results. diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 894e93c14e..efa2bfb22a 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -368,7 +368,10 @@ func TestGetSeriesFixed(t *testing.T) { metric.Add(40+offset, 50) // this point will always be quantized to 50 req := models.NewReq(name, name, name, from, to, 1000, 10, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0) req.ArchInterval = 10 - points := srv.getSeriesFixed(test.NewContext(), req, consolidation.None) + points, err := srv.getSeriesFixed(test.NewContext(), req, consolidation.None) + if err != nil { + t.Errorf("case %q - error: %s", name, err) + } if !reflect.DeepEqual(expected, points) { t.Errorf("case %q - exp: %v - got %v", name, expected, points) } @@ -637,7 +640,10 @@ func TestGetSeriesCachedStore(t *testing.T) { req := reqRaw(metric, from, to, span, 1, consolidation.None, 0, 0) req.ArchInterval = 1 ctx := newRequestContext(test.NewContext(), &req, consolidation.None) - iters := srv.getSeriesCachedStore(ctx, to) + iters, err := srv.getSeriesCachedStore(ctx, to) + if err != nil { + t.Fatalf("Pattern %s From %d To %d: error %s", pattern, from, to, err) + } // expecting the first returned timestamp to be the T0 of the chunk containing "from" expectResFrom := from - (from % span) diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index 24717cfc1e..83e466aea5 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -44,6 +44,7 @@ var ( errReadQueueFull = errors.New("the read queue is full") errReadTooOld = errors.New("the read is too old") errTableNotFound = errors.New("table for given TTL not found") + errCxtCanceled = errors.New("context canceled") // metric store.cassandra.get.exec is the duration of getting from cassandra store cassGetExecDuration = stats.NewLatencyHistogram15s32("store.cassandra.get.exec") @@ -406,7 +407,7 @@ type outcome struct { month uint32 sortKey uint32 i *gocql.Iter - omitted bool + err error } type asc []outcome @@ -420,7 +421,7 @@ func (c *CassandraStore) processReadQueue() { select { case <-crr.ctx.Done(): //request canceled - crr.out <- outcome{omitted: true} + crr.out <- outcome{err: errCxtCanceled} continue default: } @@ -428,12 +429,17 @@ func (c *CassandraStore) processReadQueue() { cassGetWaitDuration.Value(waitDuration) if waitDuration > c.omitReadTimeout { cassOmitOldRead.Inc() - crr.out <- outcome{omitted: true} + crr.out <- outcome{err: errReadTooOld} continue } pre := time.Now() - iter := outcome{crr.month, crr.sortKey, c.Session.Query(crr.q, crr.p...).Iter(), false} + iter := outcome{ + month: crr.month, + sortKey: crr.sortKey, + i: c.Session.Query(crr.q, crr.p...).Iter(), + err: nil, + } cassGetExecDuration.Value(time.Since(pre)) crr.out <- iter } @@ -534,10 +540,14 @@ LOOP: // request has been canceled, so no need to continue processing results return nil, nil case o := <-results: - if o.omitted { + if o.err != nil { + if o.err == errCxtCanceled { + // context was canceled, return immediately. + return nil, nil + } tracing.Failure(span) - tracing.Error(span, errReadTooOld) - return nil, errReadTooOld + tracing.Error(span, o.err) + return nil, o.err } seen += 1 outcomes = append(outcomes, o) From 2e98a0d6b2eaf91391f1ebbb3146333c520c58a5 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 4 Dec 2017 17:51:58 -0500 Subject: [PATCH 09/11] fix --- api/dataprocessor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index b55a8cbebe..083a783902 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -301,7 +301,7 @@ LOOP: } func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema.Point, interval uint32, err error) { - doRecover(&err) + defer doRecover(&err) readRollup := req.Archive != 0 // do we need to read from a downsampled series? normalize := req.AggNum > 1 // do we need to normalize points at runtime? // normalize is runtime consolidation but only for the purpose of bringing high-res From 04b955e13b57c458f22f462fdee929206200e717 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 30 Nov 2017 12:51:15 -0500 Subject: [PATCH 10/11] cleanup Server.getTarget() * no need to wrap an if err clause around s.getSeriesFixed(), as the output is always correct (in particular, when it returns an error, the series is nil) * add consolidation.ConsolidateContext() wrapper so that the caller doesn't need to check the context prior to a call to consolidation.Consolidate() * ditto for divideContext() -> divide() * ditto for getSeriesFixed, except it doesn't need a wrapper, it was already checking the ctx.Done for most of its work (but not all) --- api/dataprocessor.go | 67 +++++++++++++----------------------- consolidation/consolidate.go | 13 +++++++ 2 files changed, 36 insertions(+), 44 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 083a783902..2473a678fb 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -116,6 +116,17 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point { return out } +// divideContext wraps a Consolidate() call with a context.Context condition +func divideContext(ctx context.Context, pointsA, pointsB []schema.Point) []schema.Point { + select { + case <-ctx.Done(): + //request canceled + return nil + default: + } + return divide(pointsA, pointsB) +} + func divide(pointsA, pointsB []schema.Point) []schema.Point { if len(pointsA) != len(pointsB) { panic(fmt.Errorf("divide of a series with len %d by a series with len %d", len(pointsA), len(pointsB))) @@ -317,46 +328,25 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema if !readRollup && !normalize { fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) - if err != nil { - return nil, req.OutInterval, err - } return fixed, req.OutInterval, err } else if !readRollup && normalize { fixed, err := s.getSeriesFixed(ctx, req, consolidation.None) if err != nil { return nil, req.OutInterval, err } - select { - case <-ctx.Done(): - //request canceled - return nil, req.OutInterval, nil - default: - } - return consolidation.Consolidate(fixed, req.AggNum, req.Consolidator), req.OutInterval, nil + return consolidation.ConsolidateContext(ctx, fixed, req.AggNum, req.Consolidator), req.OutInterval, nil } else if readRollup && !normalize { if req.Consolidator == consolidation.Avg { sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum) if err != nil { return nil, req.OutInterval, err } - // check to see if the request has been canceled, if so abort now. - select { - case <-ctx.Done(): - //request canceled - return nil, req.OutInterval, nil - default: - } cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt) if err != nil { return nil, req.OutInterval, err } - select { - case <-ctx.Done(): - //request canceled - return nil, req.OutInterval, nil - default: - } - return divide( + return divideContext( + ctx, sumFixed, cntFixed, ), req.OutInterval, nil @@ -371,23 +361,12 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema if err != nil { return nil, req.OutInterval, err } - select { - case <-ctx.Done(): - //request canceled - return nil, req.OutInterval, nil - default: - } cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt) if err != nil { return nil, req.OutInterval, err } - select { - case <-ctx.Done(): - //request canceled - return nil, req.OutInterval, nil - default: - } - return divide( + return divideContext( + ctx, consolidation.Consolidate(sumFixed, req.AggNum, consolidation.Sum), consolidation.Consolidate(cntFixed, req.AggNum, consolidation.Sum), ), req.OutInterval, nil @@ -396,13 +375,7 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema if err != nil { return nil, req.OutInterval, err } - select { - case <-ctx.Done(): - //request canceled - return nil, req.OutInterval, nil - default: - } - return consolidation.Consolidate(fixed, req.AggNum, req.Consolidator), req.OutInterval, nil + return consolidation.ConsolidateContext(ctx, fixed, req.AggNum, req.Consolidator), req.OutInterval, nil } } } @@ -418,6 +391,12 @@ func AggMetricKey(key, archive string, aggSpan uint32) string { } func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) { + select { + case <-ctx.Done(): + //request canceled + return nil, nil + default: + } rctx := newRequestContext(ctx, &req, consolidator) res, err := s.getSeries(rctx) if err != nil { diff --git a/consolidation/consolidate.go b/consolidation/consolidate.go index 6fa5989af1..cfabda329a 100644 --- a/consolidation/consolidate.go +++ b/consolidation/consolidate.go @@ -1,9 +1,22 @@ package consolidation import ( + "context" + "gopkg.in/raintank/schema.v1" ) +// ConsolidateContext wraps a Consolidate() call with a context.Context condition +func ConsolidateContext(ctx context.Context, in []schema.Point, aggNum uint32, consolidator Consolidator) []schema.Point { + select { + case <-ctx.Done(): + //request canceled + return nil + default: + } + return Consolidate(in, aggNum, consolidator) +} + // Consolidate consolidates `in`, aggNum points at a time via the given function // note: the returned slice repurposes in's backing array. func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []schema.Point { From e8eefaf361903c1b35657057dbe355ed91cd56fc Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 4 Dec 2017 18:29:11 -0500 Subject: [PATCH 11/11] typo --- mdata/store_cassandra.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index 83e466aea5..9c15773182 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -44,7 +44,7 @@ var ( errReadQueueFull = errors.New("the read queue is full") errReadTooOld = errors.New("the read is too old") errTableNotFound = errors.New("table for given TTL not found") - errCxtCanceled = errors.New("context canceled") + errCtxCanceled = errors.New("context canceled") // metric store.cassandra.get.exec is the duration of getting from cassandra store cassGetExecDuration = stats.NewLatencyHistogram15s32("store.cassandra.get.exec") @@ -421,7 +421,7 @@ func (c *CassandraStore) processReadQueue() { select { case <-crr.ctx.Done(): //request canceled - crr.out <- outcome{err: errCxtCanceled} + crr.out <- outcome{err: errCtxCanceled} continue default: } @@ -541,7 +541,7 @@ LOOP: return nil, nil case o := <-results: if o.err != nil { - if o.err == errCxtCanceled { + if o.err == errCtxCanceled { // context was canceled, return immediately. return nil, nil }