Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ DynamicRestMapper: return NoMatchError when resource doesn't exist #1151

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 10 additions & 48 deletions pkg/client/apiutil/dynamicrestmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package apiutil
import (
"errors"
"sync"
"time"

"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -29,34 +28,12 @@ import (
"k8s.io/client-go/restmapper"
)

// ErrRateLimited is returned by a RESTMapper method if the number of API
// calls has exceeded a limit within a certain time period.
type ErrRateLimited struct {
// Duration to wait until the next API call can be made.
Delay time.Duration
}

func (e ErrRateLimited) Error() string {
return "too many API calls to the RESTMapper within a timeframe"
}

// DelayIfRateLimited returns the delay time until the next API call is
// allowed and true if err is of type ErrRateLimited. The zero
// time.Duration value and false are returned if err is not a ErrRateLimited.
func DelayIfRateLimited(err error) (time.Duration, bool) {
timebertt marked this conversation as resolved.
Show resolved Hide resolved
var rlerr ErrRateLimited
if errors.As(err, &rlerr) {
return rlerr.Delay, true
}
return 0, false
}

// dynamicRESTMapper is a RESTMapper that dynamically discovers resource
// types at runtime.
type dynamicRESTMapper struct {
mu sync.RWMutex // protects the following fields
staticMapper meta.RESTMapper
limiter *dynamicLimiter
limiter *rate.Limiter
newMapper func() (meta.RESTMapper, error)

lazy bool
Expand All @@ -70,7 +47,7 @@ type DynamicRESTMapperOption func(*dynamicRESTMapper) error
// WithLimiter sets the RESTMapper's underlying limiter to lim.
func WithLimiter(lim *rate.Limiter) DynamicRESTMapperOption {
return func(drm *dynamicRESTMapper) error {
drm.limiter = &dynamicLimiter{lim}
drm.limiter = lim
return nil
}
}
Expand Down Expand Up @@ -103,9 +80,7 @@ func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (me
return nil, err
}
drm := &dynamicRESTMapper{
limiter: &dynamicLimiter{
rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
},
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
newMapper: func() (meta.RESTMapper, error) {
groupResources, err := restmapper.GetAPIGroupResources(client)
if err != nil {
Expand Down Expand Up @@ -161,12 +136,13 @@ func (drm *dynamicRESTMapper) init() (err error) {
// checkAndReload attempts to call the given callback, which is assumed to be dependent
// on the data in the restmapper.
//
// If the callback returns a NoKindMatchError, it will attempt to reload
// If the callback returns an error that matches the given error, it will attempt to reload
// the RESTMapper's data and re-call the callback once that's occurred.
// If the callback returns any other error, the function will return immediately regardless.
//
// It will take care
// ensuring that reloads are rate-limitted and that extraneous calls aren't made.
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
// the callback.
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
// not need to attempt to lock the restmapper).
func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsReload func() error) error {
Expand Down Expand Up @@ -199,7 +175,9 @@ func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsRel
}

// we're still stale, so grab a rate-limit token if we can...
if err := drm.limiter.checkRate(); err != nil {
if !drm.limiter.Allow() {
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
return err
}

Expand Down Expand Up @@ -305,19 +283,3 @@ func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, err
})
return singular, err
}

// dynamicLimiter holds a rate limiter used to throttle chatty RESTMapper users.
type dynamicLimiter struct {
*rate.Limiter
}

// checkRate returns an ErrRateLimited if too many API calls have been made
// within the set limit.
func (b *dynamicLimiter) checkRate() error {
res := b.Reserve()
if res.Delay() == 0 {
return nil
}
res.Cancel()
return ErrRateLimited{res.Delay()}
}
75 changes: 47 additions & 28 deletions pkg/client/apiutil/dynamicrestmapper_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package apiutil_test

import (
"errors"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/format"
"github.com/onsi/gomega/types"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -57,53 +58,49 @@ var _ = Describe("Dynamic REST Mapper", func() {
})

It("should reload if not present in the cache", func() {
By("reading successfully once")
By("reading target successfully once")
Expect(callWithTarget()).To(Succeed())
Expect(callWithOther()).NotTo(Succeed())

By("asking for a something that didn't exist previously after adding it to the mapper")
By("reading other not successfully")
count := 0
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
count++
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
}
Expect(callWithOther()).To(Succeed())
Expect(callWithTarget()).To(Succeed())
})
Expect(callWithOther()).To(beNoMatchError())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this fail on the first attempt now and only works on the second?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was also failing on the first attempt before (see L62) as the secondGVK is not present by default (see L33)
I just moved down the first callWithOther and added count to ensure, that the RESTMapper really tries to rediscover for the call with the unknown resource.

Expect(count).To(Equal(1), "should reload exactly once")

It("should rate-limit reloads so that we don't get more than a certain number per second", func() {
By("setting a small limit")
*lim = *rate.NewLimiter(rate.Limit(1), 1)

By("forcing a reload after changing the mapper")
By("reading both successfully now")
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
}
Expect(callWithOther()).To(Succeed())

By("calling another time that would need a requery and failing")
Eventually(func() bool {
return errors.As(callWithTarget(), &apiutil.ErrRateLimited{})
}, "10s").Should(BeTrue())
Expect(callWithTarget()).To(Succeed())
})

It("should rate-limit then allow more at 1rps", func() {
It("should rate-limit then allow more at configured rate", func() {
By("setting a small limit")
*lim = *rate.NewLimiter(rate.Limit(1), 1)
*lim = *rate.NewLimiter(rate.Every(100*time.Millisecond), 1)

By("forcing a reload after changing the mapper")
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
}

By("calling twice to trigger rate limiting")
Expect(callWithOther()).To(Succeed())
Expect(callWithTarget()).NotTo(Succeed())

// by 2nd call loop should succeed because we canceled our 1st rate-limited token, then waited a full second
By("calling until no longer rate-limited, 2nd call should succeed")
Eventually(func() bool {
return errors.As(callWithTarget(), &apiutil.ErrRateLimited{})
}, "2.5s", "1s").Should(BeFalse())
By("calling another time to trigger rate limiting")
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
}
// if call consistently fails, we are sure, that it was rate-limited,
// otherwise it would have reloaded and succeeded
Consistently(callWithTarget, "90ms", "10ms").Should(beNoMatchError())

By("calling until no longer rate-limited")
// once call succeeds, we are sure, that it was no longer rate-limited,
// as it was allowed to reload and found matching kind/resource
Eventually(callWithTarget, "30ms", "10ms").Should(And(Succeed(), Not(beNoMatchError())))
})

It("should avoid reloading twice if two requests for the same thing come in", func() {
Expand Down Expand Up @@ -251,3 +248,25 @@ var _ = Describe("Dynamic REST Mapper", func() {
})
})
})

func beNoMatchError() types.GomegaMatcher {
return noMatchErrorMatcher{}
}

type noMatchErrorMatcher struct{}

func (k noMatchErrorMatcher) Match(actual interface{}) (success bool, err error) {
actualErr, actualOk := actual.(error)
if !actualOk {
return false, nil
}

return meta.IsNoMatchError(actualErr), nil
}

func (k noMatchErrorMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, "to be a NoMatchError")
}
func (k noMatchErrorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, "not to be a NoMatchError")
}