Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix go routines leaks #564

Merged
merged 16 commits into from
Sep 9, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Unreleased changes are available as `avenga/couper:edge` container.
* Basic Auth client authentication with OAuth2 (client ID and secret must be URL encoded) ([#537](https://github.com/avenga/couper/pull/537))
* Config validation, e.g. label-uniqueness checks ([#563](https://github.com/avenga/couper/pull/563))
* [OIDC](https://docs.couper.io/configuration/block/oidc) not using referenced backends, if only specific backends (`configuration_backend`, `jwks_uri_backend`, `token_backend`, `userinfo_backend`) were configured ([#570](https://github.com/avenga/couper/pull/570))
* [OIDC](https://docs.couper.io/configuration/block/oidc) configuration related go-routine leak after retrieving a new payload due to config ttl ([#564](https://github.com/avenga/couper/pull/564))

* **Removed**
* Endpoint path normalization to better match OpenAPI behavior ([#526](https://github.com/avenga/couper/pull/526))
Expand Down
5 changes: 3 additions & 2 deletions accesscontrol/jwk/jwks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jwk

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -30,7 +31,7 @@ type JWKS struct {
syncedJSON *jsn.SyncedJSON
}

func NewJWKS(uri string, ttl string, maxStale string, transport http.RoundTripper) (*JWKS, error) {
func NewJWKS(ctx context.Context, uri string, ttl string, maxStale string, transport http.RoundTripper) (*JWKS, error) {
timetolive, err := config.ParseDuration("jwks_ttl", ttl, time.Hour)
if err != nil {
return nil, err
Expand All @@ -47,7 +48,7 @@ func NewJWKS(uri string, ttl string, maxStale string, transport http.RoundTrippe
}

jwks := &JWKS{}
jwks.syncedJSON, err = jsn.NewSyncedJSON(file, "jwks_url", uri, transport, "jwks", timetolive, maxStaleTime, jwks)
jwks.syncedJSON, err = jsn.NewSyncedJSON(ctx, file, "jwks_url", uri, transport, "jwks", timetolive, maxStaleTime, jwks)
return jwks, err
}

Expand Down
10 changes: 5 additions & 5 deletions accesscontrol/jwk/jwks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Test_JWKS(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
_, err := jwk.NewJWKS(tt.url, "", "", backend)
_, err := jwk.NewJWKS(context.TODO(), tt.url, "", "", backend)
if err == nil && tt.error != "" {
subT.Errorf("Missing error:\n\tWant: %v\n\tGot: %v", tt.error, nil)
}
Expand All @@ -64,7 +64,7 @@ func Test_JWKS_Load(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
jwks, err := jwk.NewJWKS("file:"+tt.file, "", "", nil)
jwks, err := jwk.NewJWKS(context.TODO(), "file:"+tt.file, "", "", nil)
helper.Must(err)
_, err = jwks.Data()
if err != nil && tt.expParsed {
Expand Down Expand Up @@ -93,7 +93,7 @@ func Test_JWKS_GetSigKeyForToken(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
helper := test.New(subT)
jwks, err := jwk.NewJWKS("file:"+tt.file, "", "", nil)
jwks, err := jwk.NewJWKS(context.TODO(), "file:"+tt.file, "", "", nil)
helper.Must(err)
_, err = jwks.Data()
helper.Must(err)
Expand Down Expand Up @@ -145,7 +145,7 @@ func Test_JWKS_GetKey(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(subT *testing.T) {
helper := test.New(subT)
jwks, err := jwk.NewJWKS("file:"+tt.file, "", "", nil)
jwks, err := jwk.NewJWKS(context.TODO(), "file:"+tt.file, "", "", nil)
helper.Must(err)
_, err = jwks.Data()
helper.Must(err)
Expand Down Expand Up @@ -175,7 +175,7 @@ func Test_JWKS_LoadSynced(t *testing.T) {
io.Copy(writer, bytes.NewReader(f))
}))

jwks, err := jwk.NewJWKS(jwksOrigin.URL, "10s", "", http.DefaultTransport)
jwks, err := jwk.NewJWKS(context.TODO(), jwksOrigin.URL, "10s", "", http.DefaultTransport)
helper.Must(err)

wg := sync.WaitGroup{}
Expand Down
4 changes: 4 additions & 0 deletions accesscontrol/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,10 @@ func TestJwtConfig(t *testing.T) {
tmpStoreCh := make(chan struct{})
defer close(tmpStoreCh)

ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, logger, cache.New(logger, tmpStoreCh))
}

Expand Down
6 changes: 6 additions & 0 deletions command/verify.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package command

import (
"context"

"github.com/avenga/couper/cache"
"github.com/avenga/couper/config"
"github.com/avenga/couper/config/configload"
Expand Down Expand Up @@ -36,6 +38,10 @@ func (v Verify) Execute(args Args, conf *config.Couper, logger *logrus.Entry) er

tmpMemStore := cache.New(logger, tmpStoreCh)

ctx, cancel := context.WithCancel(cf.Context)
cf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(cf, logger, tmpMemStore)
if err != nil {
logger.WithError(err).Error()
Expand Down
8 changes: 8 additions & 0 deletions config/configload/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func TestHealthCheck(t *testing.T) {
memStore := cache.New(log, closeCh)

if conf != nil {
ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, log, memStore)
}

Expand Down Expand Up @@ -257,6 +261,10 @@ func TestRateLimit(t *testing.T) {
memStore := cache.New(log, closeCh)

if conf != nil {
ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, log, memStore)
}

Expand Down
4 changes: 4 additions & 0 deletions config/configload/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func TestLabels(t *testing.T) {
tmpStoreCh := make(chan struct{})
defer close(tmpStoreCh)

ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(conf, log, cache.New(log, tmpStoreCh))
}

Expand Down
5 changes: 5 additions & 0 deletions config/runtime/access_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ func TestDuplicateEndpoint(t *testing.T) {
logger := log.WithContext(context.TODO())
tmpStoreCh := make(chan struct{})
defer close(tmpStoreCh)

ctx, cancel := context.WithCancel(conf.Context)
conf.Context = ctx
defer cancel()

server, err := runtime.NewServerConfiguration(conf, logger, cache.New(logger, tmpStoreCh))

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions config/runtime/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func configureOidcConfigs(conf *config.Couper, confCtx *hcl.EvalContext, log *lo
}
}

oidcConfig, err := oidc.NewConfig(oidcConf, backends)
oidcConfig, err := oidc.NewConfig(conf.Context, oidcConf, backends)
if err != nil {
return nil, confErr.With(err)
}
Expand Down Expand Up @@ -583,7 +583,7 @@ func configureJWKS(jwtConf *config.JWT, confContext *hcl.EvalContext, log *logru
return nil, err
}

return jwk.NewJWKS(jwtConf.JWKsURL, jwtConf.JWKsTTL, jwtConf.JWKsMaxStale, backend)
return jwk.NewJWKS(conf.Context, jwtConf.JWKsURL, jwtConf.JWKsTTL, jwtConf.JWKsMaxStale, backend)
}

type protectedOptions struct {
Expand Down
65 changes: 42 additions & 23 deletions eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Context struct {
jwtSigningConfigs map[string]*lib.JWTSigningConfig
saml []*config.SAML
syncedVariables *SyncedVariables

cloneMu sync.RWMutex
}

func NewContext(srcBytes [][]byte, defaults *config.Defaults, environment string) *Context {
Expand Down Expand Up @@ -112,6 +114,9 @@ func (c *Context) Value(key interface{}) interface{} {

func (c *Context) WithClientRequest(req *http.Request) *Context {
c.backendsFn.Do(func() {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

if c.memStore == nil {
return
}
Expand All @@ -124,17 +129,7 @@ func (c *Context) WithClientRequest(req *http.Request) *Context {
}
})

ctx := &Context{
backends: c.backends,
eval: c.cloneEvalContext(),
inner: c.inner,
memStore: c.memStore,
memorize: make(map[string]interface{}),
oauth2: c.oauth2,
jwtSigningConfigs: c.jwtSigningConfigs,
saml: c.saml[:],
syncedVariables: NewSyncedVariables(),
}
ctx := c.clone()

if rc := req.Context(); rc != nil {
rc = context.WithValue(rc, request.ContextVariablesSynced, ctx.syncedVariables)
Expand Down Expand Up @@ -195,17 +190,7 @@ func (c *Context) WithClientRequest(req *http.Request) *Context {
}

func (c *Context) WithBeresp(beresp *http.Response, readBody bool) *Context {
ctx := &Context{
backends: c.backends,
eval: c.cloneEvalContext(),
inner: c.inner,
memStore: c.memStore,
memorize: c.memorize,
oauth2: c.oauth2,
jwtSigningConfigs: c.jwtSigningConfigs,
saml: c.saml[:],
syncedVariables: c.syncedVariables,
}
ctx := c.clone()
ctx.inner = context.WithValue(c.inner, request.ContextType, ctx)

resps := make(ContextMap)
Expand Down Expand Up @@ -234,6 +219,25 @@ func (c *Context) WithBeresp(beresp *http.Response, readBody bool) *Context {
return ctx
}

// clone returns a new copy of Context with possible field updates in mind.
// Especially during startup some requests may be fired which uses this cloned base Context.
johakoch marked this conversation as resolved.
Show resolved Hide resolved
func (c *Context) clone() *Context {
c.cloneMu.RLock()
defer c.cloneMu.RUnlock()

return &Context{
backends: c.backends,
eval: c.cloneEvalContext(),
inner: c.inner,
memStore: c.memStore,
memorize: make(map[string]interface{}),
oauth2: c.oauth2,
jwtSigningConfigs: c.jwtSigningConfigs,
saml: c.saml[:],
syncedVariables: NewSyncedVariables(),
}
}

func newBerespValues(ctx context.Context, readBody bool, beresp *http.Response) (name string, bereqVal cty.Value, berespVal cty.Value) {
bereq := beresp.Request
name = "default"
Expand Down Expand Up @@ -296,7 +300,7 @@ func newBerespValues(ctx context.Context, readBody bool, beresp *http.Response)

func (c *Context) syncBackendVariables() map[string]cty.Value {
backendsVariable := make(map[string]cty.Value)
for _, backend := range c.backends {
for _, backend := range c.backends[:] {
b, ok := backend.(seetie.Object)
if !ok {
continue
Expand All @@ -310,6 +314,9 @@ func (c *Context) syncBackendVariables() map[string]cty.Value {

// WithJWTSigningConfigs initially sets up the lib.FnJWTSign function.
func (c *Context) WithJWTSigningConfigs(configs map[string]*lib.JWTSigningConfig) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

c.jwtSigningConfigs = configs
if c.jwtSigningConfigs == nil {
c.jwtSigningConfigs = make(map[string]*lib.JWTSigningConfig)
Expand All @@ -320,6 +327,9 @@ func (c *Context) WithJWTSigningConfigs(configs map[string]*lib.JWTSigningConfig

// WithOAuth2AC adds the OAuth2AC config structs.
func (c *Context) WithOAuth2AC(os []*config.OAuth2AC) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

if c.oauth2 == nil {
c.oauth2 = make(map[string]config.OAuth2Authorization)
}
Expand All @@ -331,6 +341,9 @@ func (c *Context) WithOAuth2AC(os []*config.OAuth2AC) *Context {

// WithOidcConfig adds the OidcConfig config structs.
func (c *Context) WithOidcConfig(confs oidc.Configs) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

if c.oauth2 == nil {
c.oauth2 = make(map[string]config.OAuth2Authorization)
}
Expand All @@ -341,12 +354,18 @@ func (c *Context) WithOidcConfig(confs oidc.Configs) *Context {
}

func (c *Context) WithMemStore(store *cache.MemoryStore) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

c.memStore = store
return c
}

// WithSAML initially set up the saml configuration.
func (c *Context) WithSAML(s []*config.SAML) *Context {
c.cloneMu.Lock()
defer c.cloneMu.Unlock()

c.saml = s
if c.saml == nil {
c.saml = make([]*config.SAML, 0)
Expand Down
4 changes: 4 additions & 0 deletions eval/lib/oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ definitions {
defer close(quitCh)
memStore := cache.New(logger, quitCh)

ctx, cancel := context.WithCancel(couperConf.Context)
couperConf.Context = ctx
defer cancel()

_, err = runtime.NewServerConfiguration(couperConf, logger, memStore)
helper.Must(err)

Expand Down
37 changes: 37 additions & 0 deletions internal/test/goroutine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package test

import (
"bufio"
"bytes"
"io"
"regexp"
"runtime/pprof"
"strconv"
)

func NumGoroutines(filter string) (numRoutine int) {
profile := pprof.Lookup("goroutine")
profileBuf := &bytes.Buffer{}
_ = profile.WriteTo(profileBuf, 1)
pr := bufio.NewReader(profileBuf)

stackRegex := regexp.MustCompile(`(\d+)\s@\s0x`)
for {
line, _, readErr := pr.ReadLine()
if readErr != nil {
if readErr == io.EOF {
break
}
panic(readErr)
}
match := stackRegex.FindSubmatch(line)
if len(match) > 1 {
numRoutine, _ = strconv.Atoi(string(match[1]))
continue
}
if bytes.Contains(line, []byte(filter)) {
return numRoutine
}
}
return -1
}
Loading