Skip to content

Commit

Permalink
Refactor kubeletClient
Browse files Browse the repository at this point in the history
Move address resover into client and merge all config setup.
  • Loading branch information
serathius committed Apr 28, 2020
1 parent d09af53 commit ec48583
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 227 deletions.
29 changes: 21 additions & 8 deletions cmd/metrics-server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,10 @@ func (o Options) MetricsServerConfig() (*metric_server.Config, error) {
if err != nil {
return nil, err
}
kubelet := o.kubeletConfig(restConfig)
addressResolver := o.addressResolverConfig()
return &metric_server.Config{
Apiserver: apiserver,
Rest: restConfig,
Kubelet: kubelet,
AddresResolver: addressResolver,
Kubelet: o.kubeletConfig(restConfig),
MetricResolution: o.MetricResolution,
ScrapeTimeout: time.Duration(float64(o.MetricResolution) * 0.90), // scrape timeout is 90% of the scrape interval
}, nil
Expand Down Expand Up @@ -167,12 +164,28 @@ func (o Options) restConfig() (*rest.Config, error) {
}

func (o Options) kubeletConfig(restConfig *rest.Config) *scraper.KubeletClientConfig {
kubeletRestCfg := rest.CopyConfig(restConfig)
config := &scraper.KubeletClientConfig{
Port: o.KubeletPort,
AddressTypePriority: o.addressResolverConfig(),
}
if len(o.KubeletCAFile) > 0 {
kubeletRestCfg.TLSClientConfig.CAFile = o.KubeletCAFile
kubeletRestCfg.TLSClientConfig.CAData = nil
config.RESTConfig.TLSClientConfig.CAFile = o.KubeletCAFile
config.RESTConfig.TLSClientConfig.CAData = nil
}
if o.DeprecatedCompletelyInsecureKubelet {
config.Scheme = "http"
config.RESTConfig = rest.AnonymousClientConfig(config.RESTConfig) // don't use auth to avoid leaking auth details to insecure endpoints
config.RESTConfig.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS
} else {
config.Scheme = "https"
config.RESTConfig = rest.CopyConfig(restConfig)
}
if o.InsecureKubeletTLS {
config.RESTConfig.TLSClientConfig.Insecure = true
config.RESTConfig.TLSClientConfig.CAData = nil
config.RESTConfig.TLSClientConfig.CAFile = ""
}
return scraper.GetKubeletConfig(kubeletRestCfg, o.KubeletPort, o.InsecureKubeletTLS, o.DeprecatedCompletelyInsecureKubelet)
return config
}

func (o Options) addressResolverConfig() []corev1.NodeAddressType {
Expand Down
24 changes: 3 additions & 21 deletions pkg/metrics-server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -27,14 +26,12 @@ import (
"sigs.k8s.io/metrics-server/pkg/api"
"sigs.k8s.io/metrics-server/pkg/scraper"
"sigs.k8s.io/metrics-server/pkg/storage"
"sigs.k8s.io/metrics-server/pkg/utils"
)

type Config struct {
Apiserver *genericapiserver.Config
Rest *rest.Config
Kubelet *scraper.KubeletClientConfig
AddresResolver []corev1.NodeAddressType
MetricResolution time.Duration
ScrapeTimeout time.Duration
}
Expand All @@ -44,14 +41,12 @@ func (c Config) Complete() (*MetricsServer, error) {
if err != nil {
return nil, err
}
kubeletClient, err := c.kubeletClient()
kubeletClient, err := c.Kubelet.Complete()
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to construct a client to connect to the kubelets: %v", err)
}
addressResolver := c.addressResolver()

nodes := informer.Core().V1().Nodes()
scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, addressResolver, c.ScrapeTimeout)
scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, c.ScrapeTimeout)

scraper.RegisterScraperMetrics(c.ScrapeTimeout)
RegisterServerMetrics(c.MetricResolution)
Expand Down Expand Up @@ -86,16 +81,3 @@ func (c Config) informer() (informers.SharedInformerFactory, error) {
// so set the default resync interval to 0
return informers.NewSharedInformerFactory(kubeClient, 0), nil
}

func (c Config) kubeletClient() (scraper.KubeletInterface, error) {
kubeletClient, err := scraper.KubeletClientFor(c.Kubelet)
if err != nil {
return nil, fmt.Errorf("unable to construct a client to connect to the kubelets: %v", err)
}
return kubeletClient, nil
}

func (c Config) addressResolver() utils.NodeAddressResolver {
// set up an address resolver according to the user's priorities
return utils.NewPriorityNodeAddressResolver(c.AddresResolver)
}
33 changes: 17 additions & 16 deletions pkg/metrics-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,25 @@ func (ms *MetricsServer) RunUntil(stopCh <-chan struct{}) error {
if !shutdown {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go ms.runScrape(ctx)
return ms.GenericAPIServer.PrepareRun().Run(stopCh)
}

go func() {
ticker := time.NewTicker(ms.resolution)
defer ticker.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ms.scrape(ctx, time.Now())

for {
select {
case startTime := <-ticker.C:
ms.scrape(ctx, startTime)
case <-stopCh:
return
}
func (ms *MetricsServer) runScrape(ctx context.Context) {
ticker := time.NewTicker(ms.resolution)
defer ticker.Stop()
ms.scrape(ctx, time.Now())

for {
select {
case startTime := <-ticker.C:
ms.scrape(ctx, startTime)
case <-ctx.Done():
return
}
}()
return ms.GenericAPIServer.PrepareRun().Run(stopCh)
}
}

func (ms *MetricsServer) scrape(ctx context.Context, startTime time.Time) {
Expand Down
42 changes: 16 additions & 26 deletions pkg/scraper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@ import (

"github.com/mailru/easyjson"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"

"sigs.k8s.io/metrics-server/pkg/utils"
)

// KubeletInterface knows how to fetch metrics from the Kubelet
type KubeletInterface interface {
// GetSummary fetches summary metrics from the given Kubelet
GetSummary(ctx context.Context, host string) (*stats.Summary, error)
GetSummary(ctx context.Context, node *corev1.Node) (*stats.Summary, error)
}

type kubeletClient struct {
port int
deprecatedNoTLS bool
client *http.Client
scheme string
addrResolver utils.NodeAddressResolver
port int
client *http.Client
}

var _ KubeletInterface = (*kubeletClient)(nil)

type ErrNotFound struct {
endpoint string
}
Expand All @@ -49,11 +55,6 @@ func (err *ErrNotFound) Error() string {
return fmt.Sprintf("%q not found", err.endpoint)
}

func IsNotFoundError(err error) bool {
_, isNotFound := err.(*ErrNotFound)
return isNotFound
}

func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.Request, value easyjson.Unmarshaler) error {
// TODO(directxman12): support validating certs by hostname
response, err := client.Do(req)
Expand Down Expand Up @@ -84,14 +85,14 @@ func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.R
return nil
}

func (kc *kubeletClient) GetSummary(ctx context.Context, host string) (*stats.Summary, error) {
scheme := "https"
if kc.deprecatedNoTLS {
scheme = "http"
func (kc *kubeletClient) GetSummary(ctx context.Context, node *corev1.Node) (*stats.Summary, error) {
addr, err := kc.addrResolver.NodeAddress(node)
if err != nil {
return nil, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err)
}
url := url.URL{
Scheme: scheme,
Host: net.JoinHostPort(host, strconv.Itoa(kc.port)),
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(kc.port)),
Path: "/stats/summary",
RawQuery: "only_cpu_and_memory=true",
}
Expand All @@ -108,14 +109,3 @@ func (kc *kubeletClient) GetSummary(ctx context.Context, host string) (*stats.Su
err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary)
return summary, err
}

func NewKubeletClient(transport http.RoundTripper, port int, deprecatedNoTLS bool) (KubeletInterface, error) {
c := &http.Client{
Transport: transport,
}
return &kubeletClient{
port: port,
client: c,
deprecatedNoTLS: deprecatedNoTLS,
}, nil
}
44 changes: 19 additions & 25 deletions pkg/scraper/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,36 @@ package scraper

import (
"fmt"
"net/http"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
)

// GetKubeletConfig fetches connection config for connecting to the Kubelet.
func GetKubeletConfig(cfg *rest.Config, port int, insecureTLS bool, completelyInsecure bool) *KubeletClientConfig {
if completelyInsecure {
cfg = rest.AnonymousClientConfig(cfg) // don't use auth to avoid leaking auth details to insecure endpoints
cfg.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS
} else if insecureTLS {
cfg.TLSClientConfig.Insecure = true
cfg.TLSClientConfig.CAData = nil
cfg.TLSClientConfig.CAFile = ""
}
kubeletConfig := &KubeletClientConfig{
Port: port,
RESTConfig: cfg,
DeprecatedCompletelyInsecure: completelyInsecure,
}

return kubeletConfig
}
"sigs.k8s.io/metrics-server/pkg/utils"
)

// KubeletClientConfig represents configuration for connecting to Kubelets.
type KubeletClientConfig struct {
Port int
RESTConfig *rest.Config
DeprecatedCompletelyInsecure bool
Port int
RESTConfig *rest.Config
AddressTypePriority []corev1.NodeAddressType
Scheme string
}

// KubeletClientFor constructs a new KubeletInterface for the given configuration.
func KubeletClientFor(config *KubeletClientConfig) (KubeletInterface, error) {
// Complete constructs a new kubeletCOnfig for the given configuration.
func (config KubeletClientConfig) Complete() (*kubeletClient, error) {
transport, err := rest.TransportFor(config.RESTConfig)
if err != nil {
return nil, fmt.Errorf("unable to construct transport: %v", err)
}

return NewKubeletClient(transport, config.Port, config.DeprecatedCompletelyInsecure)
c := &http.Client{
Transport: transport,
}
return &kubeletClient{
addrResolver: utils.NewPriorityNodeAddressResolver(config.AddressTypePriority),
port: config.Port,
client: c,
scheme: config.Scheme,
}, nil
}
46 changes: 6 additions & 40 deletions pkg/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,17 @@ func RegisterScraperMetrics(scrapeTimeout time.Duration) {
legacyregistry.MustRegister(scraperDuration)
}

func NewScraper(nodeLister v1listers.NodeLister, client KubeletInterface, addrResolver utils.NodeAddressResolver, scrapeTimeout time.Duration) *Scraper {
func NewScraper(nodeLister v1listers.NodeLister, client KubeletInterface, scrapeTimeout time.Duration) *Scraper {
return &Scraper{
nodeLister: nodeLister,
kubeletClient: client,
addrResolver: addrResolver,
scrapeTimeout: scrapeTimeout,
}
}

type Scraper struct {
nodeLister v1listers.NodeLister
kubeletClient KubeletInterface
addrResolver utils.NodeAddressResolver
scrapeTimeout time.Duration
}

Expand All @@ -121,7 +119,7 @@ type NodeInfo struct {
}

func (c *Scraper) Scrape(baseCtx context.Context) (*storage.MetricsBatch, error) {
nodes, err := c.GetNodes()
nodes, err := c.nodeLister.List(labels.Everything())
var errs []error
if err != nil {
// save the error, and continue on in case of partial results
Expand All @@ -143,7 +141,7 @@ func (c *Scraper) Scrape(baseCtx context.Context) (*storage.MetricsBatch, error)
}

for _, node := range nodes {
go func(node NodeInfo) {
go func(node *corev1.Node) {
// Prevents network congestion.
sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond
time.Sleep(sleepDuration)
Expand Down Expand Up @@ -184,55 +182,23 @@ func (c *Scraper) Scrape(baseCtx context.Context) (*storage.MetricsBatch, error)
return res, utilerrors.NewAggregate(errs)
}

func (c *Scraper) collectNode(ctx context.Context, node NodeInfo) (*storage.MetricsBatch, error) {
func (c *Scraper) collectNode(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) {
startTime := myClock.Now()
defer func() {
scraperDuration.WithLabelValues(node.Name).Observe(float64(myClock.Since(startTime)) / float64(time.Second))
summaryRequestLatency.WithLabelValues(node.Name).Observe(float64(myClock.Since(startTime)) / float64(time.Second))
lastScrapeTimestamp.WithLabelValues(node.Name).Set(float64(myClock.Now().Unix()))
}()
summary, err := c.kubeletClient.GetSummary(ctx, node.ConnectAddress)
summary, err := c.kubeletClient.GetSummary(ctx, node)

if err != nil {
scrapeTotal.WithLabelValues("false").Inc()
return nil, fmt.Errorf("unable to fetch metrics from Kubelet %s (%s): %v", node.Name, node.ConnectAddress, err)
return nil, fmt.Errorf("unable to fetch metrics from node %s: %v", node.Name, err)
}
scrapeTotal.WithLabelValues("true").Inc()
return decodeBatch(summary), nil
}

func (c *Scraper) GetNodes() ([]NodeInfo, error) {
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("unable to list nodes: %v", err)
}

var errs []error
result := make([]NodeInfo, 0, len(nodes))
for _, node := range nodes {
info, err := c.getNodeInfo(node)
if err != nil {
errs = append(errs, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err))
continue
}
result = append(result, info)
}
return result, utilerrors.NewAggregate(errs)
}

func (c *Scraper) getNodeInfo(node *corev1.Node) (NodeInfo, error) {
addr, err := c.addrResolver.NodeAddress(node)
if err != nil {
return NodeInfo{}, err
}
info := NodeInfo{
Name: node.Name,
ConnectAddress: addr,
}

return info, nil
}

type clock interface {
Now() time.Time
Since(time.Time) time.Duration
Expand Down
Loading

0 comments on commit ec48583

Please sign in to comment.