Skip to content

Commit

Permalink
context: use fewer goroutines in WithCancel/WithTimeout
Browse files Browse the repository at this point in the history
If the parent context passed to WithCancel or WithTimeout
is a known context implementation (one created by this package),
we attach the child to the parent by editing data structures directly;
otherwise, for unknown parent implementations, we make a
goroutine that watches for the parent to finish and propagates
the cancellation.

A common problem with this scheme, before this CL, is that
users who write custom context implementations to manage
their value sets cause WithCancel/WithTimeout to start
goroutines that would have not been started before.

This CL changes the way we map a parent context back to the
underlying data structure. Instead of walking up through
known context implementations to reach the *cancelCtx,
we look up parent.Value(&cancelCtxKey) to return the
innermost *cancelCtx, which we use if it matches parent.Done().

This way, a custom context implementation wrapping a
*cancelCtx but not changing Done-ness (and not refusing
to return wrapped keys) will not require a goroutine anymore
in WithCancel/WithTimeout.

For #28728.

Change-Id: Idba2f435c81b19fe38d0dbf308458ca87c7381e9
Reviewed-on: https://go-review.googlesource.com/c/go/+/196521
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
  • Loading branch information
rsc committed Sep 26, 2019
1 parent ae024d9 commit 0ad3686
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 17 deletions.
61 changes: 46 additions & 15 deletions src/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"errors"
"internal/reflectlite"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -239,11 +240,24 @@ func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

// goroutines counts the number of goroutines ever created; for testing.
var goroutines int32

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
done := parent.Done()
if done == nil {
return // parent is never canceled
}

select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}

if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
Expand All @@ -257,6 +271,7 @@ func propagateCancel(parent Context, child canceler) {
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
Expand All @@ -267,22 +282,31 @@ func propagateCancel(parent Context, child canceler) {
}
}

// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
// &cancelCtxKey is the key that a cancelCtx returns itself for.
var cancelCtxKey int

// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
p.mu.Lock()
ok = p.done == done
p.mu.Unlock()
if !ok {
return nil, false
}
return p, true
}

// removeChild removes a context from its parent.
Expand Down Expand Up @@ -323,6 +347,13 @@ type cancelCtx struct {
err error // set to non-nil by the first cancel call
}

func (c *cancelCtx) Value(key interface{}) interface{} {
if key == &cancelCtxKey {
return c
}
return c.Context.Value(key)
}

func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
Expand Down
82 changes: 81 additions & 1 deletion src/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -21,6 +22,7 @@ type testingT interface {
Failed() bool
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Helper()
Log(args ...interface{})
Logf(format string, args ...interface{})
Name() string
Expand Down Expand Up @@ -401,7 +403,7 @@ func XTestAllocs(t testingT, testingShort func() bool, testingAllocsPerRun func(
c, _ := WithTimeout(bg, 15*time.Millisecond)
<-c.Done()
},
limit: 8,
limit: 12,
gccgoLimit: 15,
},
{
Expand Down Expand Up @@ -648,3 +650,81 @@ func XTestDeadlineExceededSupportsTimeout(t testingT) {
t.Fatal("wrong value for timeout")
}
}

type myCtx struct {
Context
}

type myDoneCtx struct {
Context
}

func (d *myDoneCtx) Done() <-chan struct{} {
c := make(chan struct{})
return c
}

func XTestCustomContextGoroutines(t testingT) {
g := atomic.LoadInt32(&goroutines)
checkNoGoroutine := func() {
t.Helper()
now := atomic.LoadInt32(&goroutines)
if now != g {
t.Fatalf("%d goroutines created", now-g)
}
}
checkCreatedGoroutine := func() {
t.Helper()
now := atomic.LoadInt32(&goroutines)
if now != g+1 {
t.Fatalf("%d goroutines created, want 1", now-g)
}
g = now
}

_, cancel0 := WithCancel(&myDoneCtx{Background()})
cancel0()
checkCreatedGoroutine()

_, cancel0 = WithTimeout(&myDoneCtx{Background()}, 1*time.Hour)
cancel0()
checkCreatedGoroutine()

checkNoGoroutine()
defer checkNoGoroutine()

ctx1, cancel1 := WithCancel(Background())
defer cancel1()
checkNoGoroutine()

ctx2 := &myCtx{ctx1}
ctx3, cancel3 := WithCancel(ctx2)
defer cancel3()
checkNoGoroutine()

_, cancel3b := WithCancel(&myDoneCtx{ctx2})
defer cancel3b()
checkCreatedGoroutine() // ctx1 is not providing Done, must not be used

ctx4, cancel4 := WithTimeout(ctx3, 1*time.Hour)
defer cancel4()
checkNoGoroutine()

ctx5, cancel5 := WithCancel(ctx4)
defer cancel5()
checkNoGoroutine()

cancel5()
checkNoGoroutine()

_, cancel6 := WithTimeout(ctx5, 1*time.Hour)
defer cancel6()
checkNoGoroutine()

// Check applied to cancelled context.
cancel6()
cancel1()
_, cancel7 := WithCancel(ctx5)
defer cancel7()
checkNoGoroutine()
}
1 change: 1 addition & 0 deletions src/context/x_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ func TestCancelRemoves(t *testing.T) { XTestCancelRemoves(t) }
func TestWithCancelCanceledParent(t *testing.T) { XTestWithCancelCanceledParent(t) }
func TestWithValueChecksKey(t *testing.T) { XTestWithValueChecksKey(t) }
func TestDeadlineExceededSupportsTimeout(t *testing.T) { XTestDeadlineExceededSupportsTimeout(t) }
func TestCustomContextGoroutines(t *testing.T) { XTestCustomContextGoroutines(t) }
2 changes: 1 addition & 1 deletion src/go/build/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ var pkgDeps = map[string][]string{
"compress/gzip": {"L4", "compress/flate"},
"compress/lzw": {"L4"},
"compress/zlib": {"L4", "compress/flate"},
"context": {"errors", "internal/reflectlite", "sync", "time"},
"context": {"errors", "internal/reflectlite", "sync", "sync/atomic", "time"},
"database/sql": {"L4", "container/list", "context", "database/sql/driver", "database/sql/internal"},
"database/sql/driver": {"L4", "context", "time", "database/sql/internal"},
"debug/dwarf": {"L4"},
Expand Down

0 comments on commit 0ad3686

Please sign in to comment.