Skip to content

Commit

Permalink
make *http.Client configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Jan 8, 2023
1 parent b756161 commit d5ec191
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 26 deletions.
12 changes: 11 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
"net/http"
"reflect"
"time"

Expand Down Expand Up @@ -158,6 +159,15 @@ var defaultResyncTime = 10 * time.Hour

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, err
}
return NewForConfigAndClient(config, httpClient, opts)
}

// NewForConfigAndClient initializes and returns a new Cache.
func NewForConfigAndClient(config *rest.Config, httpClient *http.Client, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
Expand All @@ -183,7 +193,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {

return &informerCache{
scheme: opts.Scheme,
InformersMap: internal.NewInformersMap(config, &internal.InformersMapOptions{
InformersMap: internal.NewInformersMapForConfigAndClient(config, httpClient, &internal.InformersMapOptions{
Scheme: opts.Scheme,
Mapper: opts.Mapper,
ResyncPeriod: *opts.Resync,
Expand Down
38 changes: 32 additions & 6 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -60,11 +61,33 @@ type InformersMapOptionsByGVK struct {
}

// NewInformersMap creates a new InformersMap that can create informers under the hood.
// Deprecated: use NewInformersMapForConfig instead.
func NewInformersMap(config *rest.Config, options *InformersMapOptions) *InformersMap {
informersMap, err := NewInformersMapForConfig(config, options)
if err != nil {
panic(fmt.Sprintf("unable to create informers map: %v", err))
}
return informersMap
}

// NewInformersMapForConfig creates a new InformersMap that can create informers under the hood.
// It differs from NewInformersMap in that allows you to pass in a custom http.Client.
func NewInformersMapForConfig(config *rest.Config, options *InformersMapOptions) (*InformersMap, error) {
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, err
}
return NewInformersMapForConfigAndClient(config, httpClient, options), nil
}

// NewInformersMapForConfigAndClient creates a new InformersMap that can create informers under the hood.
// It differs from NewInformersMapForConfig in that it allows you to pass in a custom http.Client.
func NewInformersMapForConfigAndClient(config *rest.Config, h *http.Client, options *InformersMapOptions) *InformersMap {
return &InformersMap{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
httpClient: h,
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
informers: informers{
Structured: make(map[schema.GroupVersionKind]*MapEntry),
Unstructured: make(map[schema.GroupVersionKind]*MapEntry),
Expand Down Expand Up @@ -99,6 +122,9 @@ type informers struct {
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type InformersMap struct {
// httpClient is used to create a new REST client
httpClient *http.Client

// scheme maps runtime.Objects to GroupVersionKinds
scheme *runtime.Scheme

Expand Down Expand Up @@ -343,7 +369,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
// we should remove it and use the one that the dynamic client sets for us.
cfg := rest.CopyConfig(ip.config)
cfg.NegotiatedSerializer = nil
dynamicClient, err := dynamic.NewForConfig(cfg)
dynamicClient, err := dynamic.NewForConfigAndClient(cfg, ip.httpClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -373,7 +399,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
cfg.NegotiatedSerializer = nil

// Grab the metadata metadataClient.
metadataClient, err := metadata.NewForConfig(cfg)
metadataClient, err := metadata.NewForConfigAndClient(cfg, ip.httpClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -414,7 +440,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
// Structured.
//
default:
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
client, err := apiutil.RESTClientForGVKAndClient(gvk, false, ip.config, ip.codecs, ip.httpClient)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/apiutil/apimachinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package apiutil

import (
"fmt"
"net/http"
"reflect"
"sync"

Expand Down Expand Up @@ -122,6 +123,14 @@ func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
return rest.RESTClientFor(createRestConfig(gvk, isUnstructured, baseConfig, codecs))
}

// RESTClientForGVKAndClient constructs a new rest.Interface capable of accessing the resource associated
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
// baseConfig, if set, otherwise a default serializer will be set.
// Unlike RESTClientForGVK, this function allows the caller to specify a custom http.Client.
func RESTClientForGVKAndClient(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory, httpClient *http.Client) (rest.Interface, error) {
return rest.RESTClientForConfigAndClient(createRestConfig(gvk, isUnstructured, baseConfig, codecs), httpClient)
}

// serializerWithDecodedGVK is a CodecFactory that overrides the DecoderToVersion of a WithoutConversionCodecFactory
// in order to avoid clearing the GVK from the decoded object.
//
Expand Down
15 changes: 14 additions & 1 deletion pkg/client/apiutil/dynamicrestmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apiutil

import (
"net/http"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -76,7 +77,19 @@ func WithCustomMapper(newMapper func() (meta.RESTMapper, error)) DynamicRESTMapp
// RESTMapper dynamically discovers resource types at runtime. opts
// configure the RESTMapper.
func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {
client, err := discovery.NewDiscoveryClientForConfig(cfg)
httpClient, err := rest.HTTPClientFor(cfg)
if err != nil {
return nil, err
}
return NewDynamicRESTMapperForConfigAndClient(cfg, httpClient, opts...)
}

// NewDynamicRESTMapperForConfigAndClient returns a dynamic RESTMapper for cfg. The dynamic
// RESTMapper dynamically discovers resource types at runtime. opts
// configure the RESTMapper.
// Unlike NewDynamicRESTMapper, this function allows you to specify a custom http client.
func NewDynamicRESTMapperForConfigAndClient(cfg *rest.Config, httpClient *http.Client, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {
client, err := discovery.NewDiscoveryClientForConfigAndClient(cfg, httpClient)
if err != nil {
return nil, err
}
Expand Down
39 changes: 32 additions & 7 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"strings"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -74,14 +75,37 @@ type Options struct {
// case of unstructured types, the group, version, and kind will be extracted
// from the corresponding fields on the object.
func New(config *rest.Config, options Options) (Client, error) {
return newClient(config, options)
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, err
}
return newClient(config, httpClient, options)
}

func newClient(config *rest.Config, options Options) (*client, error) {
// NewForConfigAndClient returns a new Client using the provided config and Options.
// The returned client reads *and* writes directly from the server
// (it doesn't use object caches). It understands how to work with
// normal types (both custom resources and aggregated/built-in resources),
// as well as unstructured types.
//
// In the case of normal types, the scheme will be used to look up the
// corresponding group, version, and kind for the given type. In the
// case of unstructured types, the group, version, and kind will be extracted
// from the corresponding fields on the object.
// Unlike New, this function allows you to provide your own http.Client.
func NewForConfigAndClient(config *rest.Config, httpClient *http.Client, options Options) (Client, error) {
return newClient(config, httpClient, options)
}

func newClient(config *rest.Config, httpClient *http.Client, options Options) (*client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}

if httpClient == nil {
return nil, fmt.Errorf("must provide non-nil http.Client to client.NewForConfigAndClient")
}

if !options.Opts.SuppressWarnings {
// surface warnings
logger := log.Log.WithName("KubeAPIWarningLogger")
Expand Down Expand Up @@ -113,16 +137,17 @@ func newClient(config *rest.Config, options Options) (*client, error) {
}

resources := &clientRestResources{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
httpClient: httpClient,
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),

structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
}

rawMetaClient, err := metadata.NewForConfig(config)
rawMetaClient, err := metadata.NewForConfigAndClient(config, httpClient)
if err != nil {
return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/client_rest_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package client

import (
"net/http"
"strings"
"sync"

Expand All @@ -32,6 +33,9 @@ import (

// clientRestResources creates and stores rest clients and metadata for Kubernetes types.
type clientRestResources struct {
// httpClient is the http client to use for requests
httpClient *http.Client

// config is the rest.Config to talk to an apiserver
config *rest.Config

Expand Down Expand Up @@ -59,7 +63,7 @@ func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, isList, i
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
client, err := apiutil.RESTClientForGVKAndClient(gvk, isUnstructured, c.config, c.codecs, c.httpClient)
if err != nil {
return nil, err
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"context"
"net/http"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -29,11 +30,21 @@ import (

// NewWithWatch returns a new WithWatch.
func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) {
client, err := newClient(config, options)
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(config)
return NewWithWatchForConfigAndClient(config, httpClient, options)
}

// NewWithWatchForConfigAndClient is a client that can watch objects.
// It differs from NewForConfigAndClient in that it returns a client that can watch objects.
func NewWithWatchForConfigAndClient(config *rest.Config, httpClient *http.Client, options Options) (WithWatch, error) {
client, err := newClient(config, httpClient, options)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfigAndClient(config, httpClient)
if err != nil {
return nil, err
}
Expand Down
37 changes: 30 additions & 7 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cluster
import (
"context"
"errors"
"net/http"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -105,6 +106,11 @@ type Options struct {
// will only hold objects from the desired namespace.
Namespace string

// HTTPClient is the http client that will be used to create the default
// Cache and Client. If not set the rest.HTTPClientFor function will be used
// to create the http client.
HTTPClient *http.Client

// NewCache is the function that will create the cache to be used
// by the manager. If not set this will use the default new cache function.
NewCache cache.NewCacheFunc
Expand Down Expand Up @@ -153,7 +159,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
for _, opt := range opts {
opt(&options)
}
options = setOptionsDefaults(options)
options = setOptionsDefaults(options, config)

// Create the mapper provider
mapper, err := options.MapperProvider(config)
Expand Down Expand Up @@ -206,31 +212,47 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
}

// setOptionsDefaults set default values for Options fields.
func setOptionsDefaults(options Options) Options {
func setOptionsDefaults(options Options, config *rest.Config) Options {
if options.HTTPClient == nil {
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
panic(err)
}
options.HTTPClient = httpClient
}

// Use the Kubernetes client-go scheme if none is specified
if options.Scheme == nil {
options.Scheme = scheme.Scheme
}

if options.MapperProvider == nil {
options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) {
return apiutil.NewDynamicRESTMapper(c)
return apiutil.NewDynamicRESTMapperForConfigAndClient(c, options.HTTPClient)
}
}

// Allow users to define how to create a new client
if options.NewClient == nil {
options.NewClient = DefaultNewClient
options.NewClient = func(cache cache.Cache, config *rest.Config, clientOptions client.Options, uncachedObjects ...client.Object) (client.Client, error) {
return ClientBuilderWithOptions(ClientOptions{
HTTPClient: options.HTTPClient,
})(cache, config, clientOptions, uncachedObjects...)
}
}

// Allow newCache to be mocked
if options.NewCache == nil {
options.NewCache = cache.New
options.NewCache = func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
return cache.NewForConfigAndClient(config, options.HTTPClient, opts)
}
}

// Allow newRecorderProvider to be mocked
if options.newRecorderProvider == nil {
options.newRecorderProvider = intrec.NewProvider
options.newRecorderProvider = func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) {
return intrec.NewProviderForConfigAndClient(config, options.HTTPClient, scheme, logger, makeBroadcaster)
}
}

// This is duplicated with pkg/manager, we need it here to provide
Expand Down Expand Up @@ -260,6 +282,7 @@ type NewClientFunc func(cache cache.Cache, config *rest.Config, options client.O
type ClientOptions struct {
UncachedObjects []client.Object
CacheUnstructured bool
HTTPClient *http.Client
}

// DefaultNewClient creates the default caching client, that will never cache Unstructured.
Expand All @@ -273,7 +296,7 @@ func ClientBuilderWithOptions(options ClientOptions) NewClientFunc {
return func(cache cache.Cache, config *rest.Config, clientOpts client.Options, uncachedObjects ...client.Object) (client.Client, error) {
options.UncachedObjects = append(options.UncachedObjects, uncachedObjects...)

c, err := client.New(config, clientOpts)
c, err := client.NewForConfigAndClient(config, options.HTTPClient, clientOpts)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d5ec191

Please sign in to comment.