Skip to content

Commit

Permalink
Fix go routines leaks (#564)
Browse files Browse the repository at this point in the history
* Fix contexts, use "fresh" context after reload of config

* Transport and use the ctx to synced-json

* Fix possible leaks in tests

* Fix more leaks

* Draft

* Fix OIDC related jwks renewal without cleanup; recreate on changed jwks only

* Extend oidc test to count goroutines

* Fix data race which is related to early jsonSync on startup

* Fixup wait time for numgoroutines

* remove obsolete test for testing leaks

* Add changelog entry

* fixup changelog

* rm goleak dep

* Test numGoroutines explicitly by stack funcName

* Apply suggestions from code review

Co-authored-by: Marcel Ludwig <marcel.ludwig@avenga.com>
Co-authored-by: Johannes Koch <53434855+johakoch@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 9, 2022
1 parent e9ab32c commit b70d3e4
Show file tree
Hide file tree
Showing 21 changed files with 202 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Unreleased changes are available as `avenga/couper:edge` container.
* Basic Auth client authentication with OAuth2 (client ID and secret must be URL encoded) ([#537](https://github.com/avenga/couper/pull/537))
* Config validation, e.g. label-uniqueness checks ([#563](https://github.com/avenga/couper/pull/563))
* [OIDC](https://docs.couper.io/configuration/block/oidc) not using referenced backends, if only specific backends (`configuration_backend`, `jwks_uri_backend`, `token_backend`, `userinfo_backend`) were configured ([#570](https://github.com/avenga/couper/pull/570))
* [OIDC](https://docs.couper.io/configuration/block/oidc) configuration related go-routine leak after retrieving a new payload due to config ttl ([#564](https://github.com/avenga/couper/pull/564))

* **Removed**
* Endpoint path normalization to better match OpenAPI behavior ([#526](https://github.com/avenga/couper/pull/526))
Expand Down
5 changes: 3 additions & 2 deletions accesscontrol/jwk/jwks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jwk

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -30,7 +31,7 @@ type JWKS struct {
syncedJSON *jsn.SyncedJSON
}

func NewJWKS(uri string, ttl string, maxStale string, transport http.RoundTripper) (*JWKS, error) {
func NewJWKS(ctx context.Context, uri string, ttl string, maxStale string, transport http.RoundTripper) (*JWKS, error) {
timetolive, err := config.ParseDuration("jwks_ttl", ttl, time.Hour)
if err != nil {
return nil, err
Expand All @@ -47,7 +48,7 @@ func NewJWKS(uri string, ttl string, maxStale string, transport http.RoundTrippe
}

jwks := &JWKS{}
jwks.syncedJSON, err = jsn.NewSyncedJSON(file, "jwks_url", uri, transport, "jwks", timetolive, maxStaleTime, jwks)
jwks.syncedJSON, err = jsn.NewSyncedJSON(ctx, file, "jwks_url", uri, transport, "jwks", timetolive, maxStaleTime, jwks)
return jwks, err
}

Expand Down
10 changes: 5 additions & 5 deletions accesscontrol/jwk/jwks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Test_JWKS(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
_, err := jwk.NewJWKS(tt.url, "", "", backend)
_, err := jwk.NewJWKS(context.TODO(), tt.url, "", "", backend)
if err == nil && tt.error != "" {
subT.Errorf("Missing error:\n\tWant: %v\n\tGot: %v", tt.error, nil)
}
Expand All @@ -64,7 +64,7 @@ func Test_JWKS_Load(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
jwks, err := jwk.NewJWKS("file:"+tt.file, "", "", nil)
jwks, err := jwk.NewJWKS(context.TODO(), "file:"+tt.file, "", "", nil)
helper.Must(err)
_, err = jwks.Data()
if err != nil && tt.expParsed {
Expand Down Expand Up @@ -93,7 +93,7 @@ func Test_JWKS_GetSigKeyForToken(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
helper := test.New(subT)
jwks, err := jwk.NewJWKS("file:"+tt.file, "", "", nil)
jwks, err := jwk.NewJWKS(context.TODO(), "file:"+tt.file, "", "", nil)
helper.Must(err)
_, err = jwks.Data()
helper.Must(err)
Expand Down Expand Up @@ -145,7 +145,7 @@ func Test_JWKS_GetKey(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
helper := test.New(subT)
jwks, err := jwk.NewJWKS("file:"+tt.file, "", "", nil)
jwks, err := jwk.NewJWKS(context.TODO(), "file:"+tt.file, "", "", nil)
helper.Must(err)
_, err = jwks.Data()
helper.Must(err)
Expand Down Expand Up @@ -175,7 +175,7 @@ func Test_JWKS_LoadSynced(t *testing.T) {
io.Copy(writer, bytes.NewReader(f))
}))

jwks, err := jwk.NewJWKS(jwksOrigin.URL, "10s", "", http.DefaultTransport)
jwks, err := jwk.NewJWKS(context.TODO(), jwksOrigin.URL, "10s", "", http.DefaultTransport)
helper.Must(err)

wg := sync.WaitGroup{}
Expand Down
4 changes: 4 additions & 0 deletions accesscontrol/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,10 @@ func TestJwtConfig(t *testing.T) {
tmpStoreCh := make(chan struct{})
defer close(tmpStoreCh)

ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, logger, cache.New(logger, tmpStoreCh))
}

Expand Down
6 changes: 6 additions & 0 deletions command/verify.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package command

import (
"context"

"github.com/avenga/couper/cache"
"github.com/avenga/couper/config"
"github.com/avenga/couper/config/configload"
Expand Down Expand Up @@ -36,6 +38,10 @@ func (v Verify) Execute(args Args, conf *config.Couper, logger *logrus.Entry) er

tmpMemStore := cache.New(logger, tmpStoreCh)

ctx, cancel := context.WithCancel(cf.Context)
cf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(cf, logger, tmpMemStore)
if err != nil {
logger.WithError(err).Error()
Expand Down
8 changes: 8 additions & 0 deletions config/configload/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func TestHealthCheck(t *testing.T) {
memStore := cache.New(log, closeCh)

if conf != nil {
ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, log, memStore)
}

Expand Down Expand Up @@ -257,6 +261,10 @@ func TestRateLimit(t *testing.T) {
memStore := cache.New(log, closeCh)

if conf != nil {
ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, log, memStore)
}

Expand Down
4 changes: 4 additions & 0 deletions config/configload/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func TestLabels(t *testing.T) {
tmpStoreCh := make(chan struct{})
defer close(tmpStoreCh)

ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, log, cache.New(log, tmpStoreCh))
}

Expand Down
5 changes: 5 additions & 0 deletions config/runtime/access_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ func TestDuplicateEndpoint(t *testing.T) {
logger := log.WithContext(context.TODO())
tmpStoreCh := make(chan struct{})
defer close(tmpStoreCh)

ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

server, err := runtime.NewServerConfiguration(conf, logger, cache.New(logger, tmpStoreCh))

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions config/runtime/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func configureOidcConfigs(conf *config.Couper, confCtx *hcl.EvalContext, log *lo
}
}

oidcConfig, err := oidc.NewConfig(oidcConf, backends)
oidcConfig, err := oidc.NewConfig(conf.Context, oidcConf, backends)
if err != nil {
return nil, confErr.With(err)
}
Expand Down Expand Up @@ -583,7 +583,7 @@ func configureJWKS(jwtConf *config.JWT, confContext *hcl.EvalContext, log *logru
return nil, err
}

return jwk.NewJWKS(jwtConf.JWKsURL, jwtConf.JWKsTTL, jwtConf.JWKsMaxStale, backend)
return jwk.NewJWKS(conf.Context, jwtConf.JWKsURL, jwtConf.JWKsTTL, jwtConf.JWKsMaxStale, backend)
}

type protectedOptions struct {
Expand Down
65 changes: 42 additions & 23 deletions eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Context struct {
jwtSigningConfigs map[string]*lib.JWTSigningConfig
saml []*config.SAML
syncedVariables *SyncedVariables

cloneMu sync.RWMutex
}

func NewContext(srcBytes [][]byte, defaults *config.Defaults, environment string) *Context {
Expand Down Expand Up @@ -112,6 +114,9 @@ func (c *Context) Value(key interface{}) interface{} {

func (c *Context) WithClientRequest(req *http.Request) *Context {
c.backendsFn.Do(func() {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

if c.memStore == nil {
return
}
Expand All @@ -124,17 +129,7 @@ func (c *Context) WithClientRequest(req *http.Request) *Context {
}
})

ctx := &Context{
backends: c.backends,
eval: c.cloneEvalContext(),
inner: c.inner,
memStore: c.memStore,
memorize: make(map[string]interface{}),
oauth2: c.oauth2,
jwtSigningConfigs: c.jwtSigningConfigs,
saml: c.saml[:],
syncedVariables: NewSyncedVariables(),
}
ctx := c.clone()

if rc := req.Context(); rc != nil {
rc = context.WithValue(rc, request.ContextVariablesSynced, ctx.syncedVariables)
Expand Down Expand Up @@ -195,17 +190,7 @@ func (c *Context) WithClientRequest(req *http.Request) *Context {
}

func (c *Context) WithBeresp(beresp *http.Response, readBody bool) *Context {
ctx := &Context{
backends: c.backends,
eval: c.cloneEvalContext(),
inner: c.inner,
memStore: c.memStore,
memorize: c.memorize,
oauth2: c.oauth2,
jwtSigningConfigs: c.jwtSigningConfigs,
saml: c.saml[:],
syncedVariables: c.syncedVariables,
}
ctx := c.clone()
ctx.inner = context.WithValue(c.inner, request.ContextType, ctx)

resps := make(ContextMap)
Expand Down Expand Up @@ -234,6 +219,25 @@ func (c *Context) WithBeresp(beresp *http.Response, readBody bool) *Context {
return ctx
}

// clone returns a new copy of Context with possible field updates in mind.
// Especially during startup some requests may be fired which use this cloned base Context.
func (c *Context) clone() *Context {
c.cloneMu.RLock()
defer c.cloneMu.RUnlock()

return &Context{
backends: c.backends,
eval: c.cloneEvalContext(),
inner: c.inner,
memStore: c.memStore,
memorize: make(map[string]interface{}),
oauth2: c.oauth2,
jwtSigningConfigs: c.jwtSigningConfigs,
saml: c.saml[:],
syncedVariables: NewSyncedVariables(),
}
}

func newBerespValues(ctx context.Context, readBody bool, beresp *http.Response) (name string, bereqVal cty.Value, berespVal cty.Value) {
bereq := beresp.Request
name = "default"
Expand Down Expand Up @@ -296,7 +300,7 @@ func newBerespValues(ctx context.Context, readBody bool, beresp *http.Response)

func (c *Context) syncBackendVariables() map[string]cty.Value {
backendsVariable := make(map[string]cty.Value)
for _, backend := range c.backends {
for _, backend := range c.backends[:] {
b, ok := backend.(seetie.Object)
if !ok {
continue
Expand All @@ -310,6 +314,9 @@ func (c *Context) syncBackendVariables() map[string]cty.Value {

// WithJWTSigningConfigs initially sets up the lib.FnJWTSign function.
func (c *Context) WithJWTSigningConfigs(configs map[string]*lib.JWTSigningConfig) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

c.jwtSigningConfigs = configs
if c.jwtSigningConfigs == nil {
c.jwtSigningConfigs = make(map[string]*lib.JWTSigningConfig)
Expand All @@ -320,6 +327,9 @@ func (c *Context) WithJWTSigningConfigs(configs map[string]*lib.JWTSigningConfig

// WithOAuth2AC adds the OAuth2AC config structs.
func (c *Context) WithOAuth2AC(os []*config.OAuth2AC) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

if c.oauth2 == nil {
c.oauth2 = make(map[string]config.OAuth2Authorization)
}
Expand All @@ -331,6 +341,9 @@ func (c *Context) WithOAuth2AC(os []*config.OAuth2AC) *Context {

// WithOidcConfig adds the OidcConfig config structs.
func (c *Context) WithOidcConfig(confs oidc.Configs) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

if c.oauth2 == nil {
c.oauth2 = make(map[string]config.OAuth2Authorization)
}
Expand All @@ -341,12 +354,18 @@ func (c *Context) WithOidcConfig(confs oidc.Configs) *Context {
}

func (c *Context) WithMemStore(store *cache.MemoryStore) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

c.memStore = store
return c
}

// WithSAML initially set up the saml configuration.
func (c *Context) WithSAML(s []*config.SAML) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

c.saml = s
if c.saml == nil {
c.saml = make([]*config.SAML, 0)
Expand Down
4 changes: 4 additions & 0 deletions eval/lib/oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ definitions {
defer close(quitCh)
memStore := cache.New(logger, quitCh)

ctx, cancel := context.WithCancel(couperConf.Context)
couperConf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(couperConf, logger, memStore)
helper.Must(err)

Expand Down
37 changes: 37 additions & 0 deletions internal/test/goroutine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package test

import (
"bufio"
"bytes"
"io"
"regexp"
"runtime/pprof"
"strconv"
)

func NumGoroutines(filter string) (numRoutine int) {
profile := pprof.Lookup("goroutine")
profileBuf := &bytes.Buffer{}
_ = profile.WriteTo(profileBuf, 1)
pr := bufio.NewReader(profileBuf)

stackRegex := regexp.MustCompile(`(\d+)\s@\s0x`)
for {
line, _, readErr := pr.ReadLine()
if readErr != nil {
if readErr == io.EOF {
break
}
panic(readErr)
}
match := stackRegex.FindSubmatch(line)
if len(match) > 1 {
numRoutine, _ = strconv.Atoi(string(match[1]))
continue
}
if bytes.Contains(line, []byte(filter)) {
return numRoutine
}
}
return -1
}
Loading

0 comments on commit b70d3e4

Please sign in to comment.