Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

use Context's properly to stop executing when a request is canceled #728

Merged
merged 11 commits into from
Dec 5, 2017

Conversation

woodsaj
Copy link
Member

@woodsaj woodsaj commented Sep 14, 2017

  • The ctx was already being passed between the different methods
    within the query pipeline, but the context was never being inspected
    to see if it had been canceled. This commit fixes that.
  • Now when an error occurs, whether that error is local or on a remote
    peer, the request is immediately canceled.
  • if the client disconnects before a response is returned (whether
    that is due to a timeout or the client canceling the request), that
    cancelation will propagate throught and abort any work.

@woodsaj woodsaj force-pushed the reqCancelation branch 2 times, most recently from 88878c8 to b791d24 Compare September 15, 2017 03:11
@woodsaj woodsaj changed the title WIP: use Context's properly to stop executing when a request is canceled use Context's properly to stop executing when a request is canceled Sep 15, 2017
Copy link
Contributor

@Dieterbe Dieterbe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not done reviewing yet.
an overall comment at this point, is that we should be consistent in where we call cancel.
e.g. looking at findSeries and getTargets both calling cancel, but also within getTargetsRemote and getTargetsLocal, but not within s.findSeriesLocal and s.findSeriesRemote is weird.

perhaps we should have all helper functions like getTargetsRemote etc bubble up all errors as fast as they can, and leave all the canceling up to the macaron Handlers only, and they would cancel when they get errors bubbled up into them.

not sure yet if that's the best approach, i just want us to be consistent

wg.Add(len(reqs))
limiter := make(chan struct{}, getTargetsConcurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a bit cleaner if the limiter was factored out like in https://github.com/go-graphite/carbonapi/blob/master/limiter.go btw this is probably my favorite 10 lines of Go code ever.

api/config.go Outdated
@@ -45,6 +47,7 @@ func ConfigSetup() {
apiCfg.BoolVar(&multiTenant, "multi-tenant", true, "require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed")
apiCfg.StringVar(&fallbackGraphite, "fallback-graphite-addr", "http://localhost:8080", "in case our /render endpoint does not support the requested processing, proxy the request to this graphite")
apiCfg.StringVar(&timeZoneStr, "time-zone", "local", "timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone")
apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is not clear enough. in particular we need to clarify how this ties in to the store and the store's own tunables.

perhaps "maximum number of concurrent timeseries reads issued to the store (note that a read of an avg-aggregated series counts as 1 but issues 2 series reads)"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is completely independent of the store. The behaviour is exactly as described, it is a cap on the number of threads used for fetching data on the local node, whether that data comes from the numerous caches or from cassandra.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i just need to add that each execution thread handles only 1 series.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that works

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually let's clarify that this limit is per-request


responses := make(chan getTargetsResp, 1)
getCtx, cancel := context.WithCancel(ctx)
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this deferred cancel? in particular, triggering cancel() when there was no error. is this a safeguard against programming errors (e.g. that we forgot that there may still be workers running that should have exited)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://godoc.org/golang.org/x/net/context#WithCancel

cancel should always be called otherwise resources will leak. go vet will warn if you dont call cancel with something like

api/dataprocessor.go:135: the cancel function is not used on all paths (possible context leak)

if o.omitted {
tracing.Failure(span)
tracing.Error(span, errReadTooOld)
return nil, errReadTooOld
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this error still accurate? seems like we can hit this condition when ctx was canceled during processReadQueue, not due to the CRR being too old.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well it is too old in the sense that the CRR outlived the originating "/render" request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also be noted that with the default cassandra-omit-read-timeout config, unless the client waits longer than 60seconds for a request to complete (not possible in hosted-metrics as the ingress-controller will timeout after 60seconds), o.omitted will only ever be set for requests that are cancelled (due to timeout or other error)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. the reason i brought this up is because it seemed useful to separate these 2 cases in the error being logged in the trace, but actually it doesn't seem to matter much so this looks fine.

@@ -459,8 +468,8 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key, table string, sta

crrs := make([]*ChunkReadRequest, 0)

query := func(month, sortKey uint32, q string, p ...interface{}) {
crrs = append(crrs, &ChunkReadRequest{month, sortKey, q, p, pre, nil})
query := func(ctx context.Context, month, sortKey uint32, q string, p ...interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since ctx is the same for all invocations of query, we don't need to pass it in each time. that'll clean up the diff a bit. (please rebase -i and fixup into b791d24, so that we maintain clean history )

for {
select {
case <-ctx.Done():
// request has been canceled, so no need to process the results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correction: "no need to continue processing the results"

}

func handleResp(rsp *http.Response) ([]byte, error) {
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
ioutil.ReadAll(rsp.Body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this an unrelated fix to allow for connection reuse? did you just notice in the code or did you observe a symptom that led you to fixing this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just correct behaviour for allowing connection re-use. I noticed it missing so added it.

@woodsaj woodsaj force-pushed the reqCancelation branch 3 times, most recently from fd5c709 to ee4c050 Compare September 19, 2017 18:24
@woodsaj
Copy link
Member Author

woodsaj commented Sep 19, 2017

an overall comment at this point, is that we should be consistent in where we call cancel.

That is not possible.
findSeriesRemote and findSeriesLocal execute single goroutines. If an error occurs it is returned immediately to the caller (findSeries) and we can call cancel().
getTargetsRemote and getTargetsLocal generate lots of goroutines. If errors occur they are not returned ot the caller (getTargets) until all goroutines are complete.

api/graphite.go Outdated
seenDefs := make(map[string]struct{})
var mu sync.Mutex
reqCtx := ctx.Req.Context()
responses := make(chan struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik types can have a function scope, so it might be worth it to declare a type for that response struct instead of redefining it 3 times

@Dieterbe
Copy link
Contributor

Dieterbe commented Sep 20, 2017

an overall comment at this point, is that we should be consistent in where we call cancel.

That is not possible.
findSeriesRemote and findSeriesLocal execute single goroutines. If an error occurs it is returned immediately to the caller (findSeries) and we can call cancel().
getTargetsRemote and getTargetsLocal generate lots of goroutines. If errors occur they are not returned ot the caller (getTargets) until all goroutines are complete.

but we can change getTargetsRemote and getTargets to simply return the err as soon as they hit one, and the caller then calls cancel, canceling all other pending routines
this seems simple and consistent, though one could argue these 2 functions should clean up the goroutines they create themselves instead of letting the context do it.

update: here's one of the go devs sharing a pattern where it is deemed OK for functions to leak goroutines and cleaning them up by calling cancel in the caller : https://rakyll.org/leakingctx/

Copy link
Contributor

@replay replay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool

@woodsaj
Copy link
Member Author

woodsaj commented Sep 20, 2017

@Dieterbe I refactored so that getTargetsLocal and getTargetsRemote both return errors to their callers as soon as they happen, allowing the caller to cancel the context

@Dieterbe
Copy link
Contributor

getTargetsLocal still calls cancel in 2 places. wouldn't it make sense to remove those, and add a cancel in Server.getData() after calling getTargetsLocal ?

@woodsaj
Copy link
Member Author

woodsaj commented Sep 20, 2017

getTargetsLocal still calls cancel in 2 places. wouldn't it make sense to remove those, and add a cancel in Server.getData() after calling getTargetsLocal ?

getTargetsLocal uses its own contextWithCancel, derived from the passed in context.

If we dont call "cancel()" inside the goroutines that are processing a req, then every req will be dispactched before we even start to read the reposnes channel for any errors. If there are 1000 reqs, and the first one returns an error, getTarget() will be called for the other 999 reqs before we notice that the first one failed. Because of the limiting, len(reqs) - getTargetsConcurrency reqs will be processed to completion before we read the responses channel.

@Dieterbe
Copy link
Contributor

Dieterbe commented Sep 20, 2017

since our call graphs are pretty complicated, and with the various places where we call cancel and check the Done channel, it gets especially complicated I think. I'm not sure what's the best way to keep it easy to understand. For now i just created a manual diagram by going over the source. (would be nice if we can generate this)
callgraph

red underline means calls cancel.
everything in a grey ellipsis is something that checks Done channel and can return early.
seems like there's a few left todo (e.g. findSeriesLocal and everything calling into MetricIndex)

@woodsaj
Copy link
Member Author

woodsaj commented Sep 21, 2017

i have added the withCancel contexts to metricsIndex and metricsDelete so that requests to peers are immediately canceled if any peer returns an error.

here is a more detailed diagram of the context flow. Each colour represents a different context.
Contexts withCancel are only canceled by the function that creates them and cancelations flow downwards.

image

@@ -476,46 +545,72 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete
peers := cluster.Manager.MemberList()
peers = append(peers, cluster.Manager.ThisNode())
log.Debug("HTTP metricsDelete for %v across %d instances", req.Query, len(peers))
errors := make([]error, 0)

reqCtx, cancel := context.WithCancel(ctx.Req.Context())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a specific naming scheme at play here, e.g. why not call this ctx ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind. i see now that ctx is already used

if err != nil {
tags.Error.Set(span, true)
errorsChan <- err
cancel() // cancel all other requests.
responses <- getTargetsResp{nil, err}
} else {
getTargetDuration.Value(time.Now().Sub(pre))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if getTarget() returned early due to a cancel, than we will report an incorrect (and possibly unrealistically low) getTargetDuration here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@woodsaj ^^ ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric will correctly reflect the time spent running getTarget().
Is getTargetDuration meant to represent something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. the metric represents the getting of a target. if getTarget is cancelled due to whatever reason and is not actually getting targets, then we shouldn't report this duration as it does not apply.

@Dieterbe
Copy link
Contributor

Dieterbe commented Sep 22, 2017

generally throughout this new code, cancellations caused by client disconnect, result in non-erroreous returns (typically return nil, nil) and ultimately result in a http 499. i notice an exception to this:
in CassandraStore, omitted is set true both when (A) request can't be served due to saturated resources , as well as (B) when client connection in closed, resulting in SearchTable returning an error as well as marking the trace as failed. since getSeriesCachedStore calls panic when it gets an error from s.BackendStore.Search, doRecover will make getTarget() return an error in this case causing more spans to be marked as failed, which I think is fair for A but not for B. it's also inconsistent since other places don't log errors when they read from Done, in particular there's 2 such places in SearchTable, so depending on when exactly a context is cancelled, the outcome can vary quite a bit (reporting an error or not)

the end result is that executePlan returns an error and a http 500 is reported instead of 499. (and executePlan logs the "error" after calling getTargets)

@Dieterbe Dieterbe added this to the 0.8 milestone Sep 30, 2017
woodsaj added 6 commits November 21, 2017 23:36
- The ctx was already being passed between the different methods
  within the query pipeline, but the context was never being inspected
  to see if it had been canceled. This commit fixes that.
- Now when an error occurs, whether that error is local or on a remote
  peer, the request is immediately canceled.
- if the client disconnects before a response is returned (whether
  that is due to a timeout or the client canceling the request), that
  cancelation will propagate throught and abort any work.
limit the number of getTargetsLocal threads that can be running at
once. This ensures that if a large query that spans 1000s of series
is executed, 1000s of goroutines wont be run at once to hit the
caches and cassandra.
This also ensures that a single query cant fill the cassandra
read queue.  Mt only executes cassandra-read-concurrency requests
at a time so requests will always need to be queued. With this new
approach multiple render requests will share read queue and their
read requests will be interspaced in the queue, rather then the old
behaviour where a render request would have to complete all of its
reads before the next render request would be processed.

Overall this will lead to a more consistant performance profile
for render requests.
- the functions that spawn background work in new goroutines should
  be responsible for signalling to them that they should shutdown.
  So these functions should create a context.WithCancel and call cancel
  when the the first error is encounted.
@woodsaj
Copy link
Member Author

woodsaj commented Nov 22, 2017

This is ready for review again.

@Dieterbe i have removed the panics and doRecover calls in favour of passing errors. So now, the first error will cause the context to be cancelled triggering all other goroutines to quickly return (without errors).
There are still a few places in the code that panic(), but these are limited to the consolidation package and should not ever be reached. If they do panic, Macaron will catch the panic and return a 500 error to the user (and print the stacktrace in the logs)

@woodsaj woodsaj force-pushed the reqCancelation branch 2 times, most recently from 4da9cc7 to 591a709 Compare November 22, 2017 13:40
@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 30, 2017

i have removed the panics and doRecover calls in favour of passing errors. So now, the first error will cause the context to be cancelled triggering all other goroutines to quickly return (without errors).

I don't think it is necessary to remove the panic-doRecover mechanism to achieve that: whether you bubble up an error through a callchain, or take the fast path of panicing and turning the panic into an error, the result is the same : getTarget returns an error to getTargetsLocal, which can then cancel the goroutintes.
I'm open for a discussion about removing the doRecover-fast-path (as I think you're generally not a fan of it) but AFAICT this is an issue that is separate from the requirements of this PR, so i suggest to do it in a separate ticket.

There are still a few places in the code that panic(), but these are limited to the consolidation package and should not ever be reached.

there's many, many more. in particular the ones in api/dataprocessor.go and in mdata stand out, because many of those are in the code called by getTarget.

r 'panic\(' | rv cmd | rv vendor | rv test.go
consolidation/consolidation.go:	panic(fmt.Sprintf("Consolidator.String(): unknown consolidator %d", c))
consolidation/consolidation.go:		panic("cannot get an archive for no consolidation")
consolidation/consolidation.go:		panic("avg consolidator has no matching Archive(). you need sum and cnt")
consolidation/consolidation.go:	panic(fmt.Sprintf("Consolidator.Archive(): unknown consolidator %q", c))
mdata/aggregator.go:		panic("NewAggregator called without aggregations. this should never happen")
mdata/aggregator.go:		panic("aggregator: boundary < agg.currentBoundary. ts > lastSeen should already have been asserted")
mdata/store_cassandra.go:		panic(fmt.Sprintf("Chunk span invalid: %d", span))
mdata/aggmetric.go:				panic("cannot get an archive for no consolidation")
mdata/aggmetric.go:				panic("avg consolidator has no matching Archive(). you need sum and cnt")
mdata/aggmetric.go:				panic(fmt.Sprintf("internal error: no such consolidator %q with span %d", consolidator, aggSpan))
mdata/aggmetric.go:		panic(fmt.Sprintf("aggmetric %s queried for chunk %d out of %d chunks", a.Key, pos, len(a.Chunks)))
mdata/aggmetric.go:				panic("cannot get an archive for no consolidation")
mdata/aggmetric.go:				panic("avg consolidator has no matching Archive(). you need sum and cnt")
mdata/aggmetric.go:			panic(fmt.Sprintf("AggMetric.GetAggregated(): unknown consolidator %q", consolidator))
mdata/aggmetric.go:	panic(fmt.Sprintf("GetAggregated called with unknown aggSpan %d", aggSpan))
mdata/aggmetric.go:		panic("invalid request. to must > from")
mdata/aggmetric.go:			panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos 0 failed: %q", ts, val, err))
mdata/aggmetric.go:				panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos %d failed: %q", ts, val, a.CurrentChunkPos, err))
mdata/aggmetric.go:				panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos %d failed: %q", ts, val, a.CurrentChunkPos, err))
batch/aggregator.go:		panic("avg() called in aggregator with 0 terms")
batch/aggregator.go:		panic("last() called in aggregator with 0 terms")
batch/aggregator.go:		panic("min() called in aggregator with 0 terms")
batch/aggregator.go:		panic("max() called in aggregator with 0 terms")
stats/registry.go:		panic(fmt.Sprintf(errFmtMetricExists, name, existing))
expr/parse.go:		panic("arg list should start with paren. calling code should have asserted this")
expr/parse.go:		panic("string should start with open quote. calling code should have asserted this")
api/dataprocessor.go:		panic(fmt.Errorf("divide of a series with len %d by a series with len %d", len(pointsA), len(pointsB)))

Macaron will catch the panic and return a 500 error to the user (and print the stacktrace in the logs)

since s.getTargetsLocal() is run in separate goroutines, macaron doesn't have the ability to catch them and handle them cleanly. If you remove the doRecover , many of these problems will result in MT crashing, which is bad.

@woodsaj
Copy link
Member Author

woodsaj commented Dec 1, 2017

Ill re-add the doRecover back in to catch the remaining panics that can be thrown inside the getTarget() goroutine. But the refactor to use errors instead of calling panic going to stay. It is well established that panics should be avoided whenever possible.

https://golang.org/doc/effective_go.html#errors
https://dave.cheney.net/2012/01/18/why-go-gets-exceptions-right

- Instead of calling panic and having to recover the panic later,
  just pass errors throught the data fetch (getTargets) pipeline.
  This allows us to correctly cancel requests via the request
  context.
* no need to wrap an if err clause around s.getSeriesFixed(), as the
  output is always correct (in particular, when it returns an error, the
  series is nil)
* add consolidation.ConsolidateContext() wrapper so that the caller
  doesn't need to check the context prior to a call to
  consolidation.Consolidate()
* ditto for divideContext() -> divide()
* ditto for getSeriesFixed, except it doesn't need a wrapper, it was
  already checking the ctx.Done for most of its work (but not all)
@Dieterbe
Copy link
Contributor

Dieterbe commented Dec 4, 2017

It is well established that panics should be avoided whenever possible.

and yet, the doRecover mechanism is also an established practice.
You can find this mechanism in plenty of open source projects (e.g. youtube's vitess, bosun, ...), and even the standard library (text/template package)
it is documented at https://github.com/golang/go/wiki/PanicAndRecover

By convention, no explicit panic() should be allowed to cross a package boundary. Indicating error conditions to callers should be done by returning error value. Within a package, however, especially if there are deeply nested calls to non-exported functions, it can be useful (and improve readability) to use panic to indicate error conditions which should be translated into error for the calling function.

that is exactly the use case we use it for. If anything, the thing we're doing improperly here, according to this document, is that we use it across package boundaries.

I may not agree with your reasons, but I am OK with migrating away from this approach though.
the main reasons in my view are:

  • supposed to be within 1 package, but we don't have significant stack depth within only the api package
  • it will make the code a bit simpler to reason about (albeit potentially more verbose when all is said and done, but that's a worthy trade-off), especially for newcomers.

I have just pushed 3 commits to clean things up a bit, please let me know what you think.

@Dieterbe
Copy link
Contributor

Dieterbe commented Dec 4, 2017

If you approve of the 3 commits I've pushed, and you can address the one remark that's still pending, then I think this is good to go. (don't worry about the conflicts, i can resolve them when i merge)

@woodsaj
Copy link
Member Author

woodsaj commented Dec 5, 2017

LGTM

@woodsaj
Copy link
Member Author

woodsaj commented Dec 5, 2017 via email

@Dieterbe Dieterbe merged commit e8eefaf into master Dec 5, 2017
@Dieterbe
Copy link
Contributor

Dieterbe commented Dec 5, 2017

ok that works for me

@woodsaj woodsaj deleted the reqCancelation branch December 5, 2017 19:03
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants