From 8fa9300fda00a4efaadaa0782e6d4267b5ebcb8f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 28 Apr 2020 21:51:43 +0200 Subject: [PATCH] Refactor kubeletClient Move address resover into client and merge all config setup. --- cmd/metrics-server/app/options/options.go | 30 +++-- pkg/metrics-server/config.go | 24 +--- pkg/metrics-server/server.go | 33 +++--- pkg/scraper/client.go | 52 ++++----- pkg/scraper/configs.go | 48 ++++---- pkg/scraper/scraper.go | 48 +------- pkg/scraper/scraper_test.go | 132 +++++++--------------- 7 files changed, 132 insertions(+), 235 deletions(-) diff --git a/cmd/metrics-server/app/options/options.go b/cmd/metrics-server/app/options/options.go index e1fa78dfc..b4de83e73 100644 --- a/cmd/metrics-server/app/options/options.go +++ b/cmd/metrics-server/app/options/options.go @@ -112,13 +112,10 @@ func (o Options) MetricsServerConfig() (*metric_server.Config, error) { if err != nil { return nil, err } - kubelet := o.kubeletConfig(restConfig) - addressResolver := o.addressResolverConfig() return &metric_server.Config{ Apiserver: apiserver, Rest: restConfig, - Kubelet: kubelet, - AddresResolver: addressResolver, + Kubelet: o.kubeletConfig(restConfig), MetricResolution: o.MetricResolution, ScrapeTimeout: time.Duration(float64(o.MetricResolution) * 0.90), // scrape timeout is 90% of the scrape interval }, nil @@ -169,12 +166,29 @@ func (o Options) restConfig() (*rest.Config, error) { } func (o Options) kubeletConfig(restConfig *rest.Config) *scraper.KubeletClientConfig { - kubeletRestCfg := rest.CopyConfig(restConfig) + config := &scraper.KubeletClientConfig{ + DefaultPort: o.KubeletPort, + AddressTypePriority: o.addressResolverConfig(), + UseNodeStatusPort: o.KubeletUseNodeStatusPort, + } if len(o.KubeletCAFile) > 0 { - kubeletRestCfg.TLSClientConfig.CAFile = o.KubeletCAFile - kubeletRestCfg.TLSClientConfig.CAData = nil + config.RESTConfig.TLSClientConfig.CAFile = o.KubeletCAFile + config.RESTConfig.TLSClientConfig.CAData = nil + } + if o.DeprecatedCompletelyInsecureKubelet { + config.Scheme = "http" + config.RESTConfig = rest.AnonymousClientConfig(config.RESTConfig) // don't use auth to avoid leaking auth details to insecure endpoints + config.RESTConfig.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS + } else { + config.Scheme = "https" + config.RESTConfig = rest.CopyConfig(restConfig) + } + if o.InsecureKubeletTLS { + config.RESTConfig.TLSClientConfig.Insecure = true + config.RESTConfig.TLSClientConfig.CAData = nil + config.RESTConfig.TLSClientConfig.CAFile = "" } - return scraper.GetKubeletConfig(kubeletRestCfg, o.KubeletPort, o.KubeletUseNodeStatusPort, o.InsecureKubeletTLS, o.DeprecatedCompletelyInsecureKubelet) + return config } func (o Options) addressResolverConfig() []corev1.NodeAddressType { diff --git a/pkg/metrics-server/config.go b/pkg/metrics-server/config.go index 2fed9e57e..2469f605d 100644 --- a/pkg/metrics-server/config.go +++ b/pkg/metrics-server/config.go @@ -17,7 +17,6 @@ import ( "fmt" "time" - corev1 "k8s.io/api/core/v1" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -27,14 +26,12 @@ import ( "sigs.k8s.io/metrics-server/pkg/api" "sigs.k8s.io/metrics-server/pkg/scraper" "sigs.k8s.io/metrics-server/pkg/storage" - "sigs.k8s.io/metrics-server/pkg/utils" ) type Config struct { Apiserver *genericapiserver.Config Rest *rest.Config Kubelet *scraper.KubeletClientConfig - AddresResolver []corev1.NodeAddressType MetricResolution time.Duration ScrapeTimeout time.Duration } @@ -44,14 +41,12 @@ func (c Config) Complete() (*MetricsServer, error) { if err != nil { return nil, err } - kubeletClient, err := c.kubeletClient() + kubeletClient, err := c.Kubelet.Complete() if err != nil { - return nil, err + return nil, fmt.Errorf("unable to construct a client to connect to the kubelets: %v", err) } - addressResolver := c.addressResolver() - nodes := informer.Core().V1().Nodes() - scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, addressResolver, c.ScrapeTimeout) + scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, c.ScrapeTimeout) scraper.RegisterScraperMetrics(c.ScrapeTimeout) RegisterServerMetrics(c.MetricResolution) @@ -86,16 +81,3 @@ func (c Config) informer() (informers.SharedInformerFactory, error) { // so set the default resync interval to 0 return informers.NewSharedInformerFactory(kubeClient, 0), nil } - -func (c Config) kubeletClient() (scraper.KubeletInterface, error) { - kubeletClient, err := scraper.KubeletClientFor(c.Kubelet) - if err != nil { - return nil, fmt.Errorf("unable to construct a client to connect to the kubelets: %v", err) - } - return kubeletClient, nil -} - -func (c Config) addressResolver() utils.NodeAddressResolver { - // set up an address resolver according to the user's priorities - return utils.NewPriorityNodeAddressResolver(c.AddresResolver) -} diff --git a/pkg/metrics-server/server.go b/pkg/metrics-server/server.go index a3d9f5976..115c6a1c0 100644 --- a/pkg/metrics-server/server.go +++ b/pkg/metrics-server/server.go @@ -78,24 +78,25 @@ func (ms *MetricsServer) RunUntil(stopCh <-chan struct{}) error { if !shutdown { return nil } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go ms.runScrape(ctx) + return ms.GenericAPIServer.PrepareRun().Run(stopCh) +} - go func() { - ticker := time.NewTicker(ms.resolution) - defer ticker.Stop() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ms.scrape(ctx, time.Now()) - - for { - select { - case startTime := <-ticker.C: - ms.scrape(ctx, startTime) - case <-stopCh: - return - } +func (ms *MetricsServer) runScrape(ctx context.Context) { + ticker := time.NewTicker(ms.resolution) + defer ticker.Stop() + ms.scrape(ctx, time.Now()) + + for { + select { + case startTime := <-ticker.C: + ms.scrape(ctx, startTime) + case <-ctx.Done(): + return } - }() - return ms.GenericAPIServer.PrepareRun().Run(stopCh) + } } func (ms *MetricsServer) scrape(ctx context.Context, startTime time.Time) { diff --git a/pkg/scraper/client.go b/pkg/scraper/client.go index f1f7f5f5e..28f8477e9 100644 --- a/pkg/scraper/client.go +++ b/pkg/scraper/client.go @@ -25,22 +25,28 @@ import ( "github.com/mailru/easyjson" + corev1 "k8s.io/api/core/v1" "k8s.io/klog" + + "sigs.k8s.io/metrics-server/pkg/utils" ) // KubeletInterface knows how to fetch metrics from the Kubelet type KubeletInterface interface { // GetSummary fetches summary metrics from the given Kubelet - GetSummary(ctx context.Context, info NodeInfo) (*Summary, error) + GetSummary(ctx context.Context, node *corev1.Node) (*Summary, error) } type kubeletClient struct { - defaultPort int - kubeletUseNodeStatusPort bool - deprecatedNoTLS bool - client *http.Client + defaultPort int + useNodeStatusPort bool + client *http.Client + scheme string + addrResolver utils.NodeAddressResolver } +var _ KubeletInterface = (*kubeletClient)(nil) + type ErrNotFound struct { endpoint string } @@ -49,11 +55,6 @@ func (err *ErrNotFound) Error() string { return fmt.Sprintf("%q not found", err.endpoint) } -func IsNotFoundError(err error) bool { - _, isNotFound := err.(*ErrNotFound) - return isNotFound -} - func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.Request, value easyjson.Unmarshaler) error { // TODO(directxman12): support validating certs by hostname response, err := client.Do(req) @@ -84,18 +85,19 @@ func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.R return nil } -func (kc *kubeletClient) GetSummary(ctx context.Context, info NodeInfo) (*Summary, error) { - scheme := "https" - if kc.deprecatedNoTLS { - scheme = "http" +func (kc *kubeletClient) GetSummary(ctx context.Context, node *corev1.Node) (*Summary, error) { + port := kc.defaultPort + nodeStatusPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) + if kc.useNodeStatusPort && nodeStatusPort != 0 { + port = nodeStatusPort } - kubeletPort := kc.defaultPort - if kc.kubeletUseNodeStatusPort && info.KubeletPort != 0 { - kubeletPort = info.KubeletPort + addr, err := kc.addrResolver.NodeAddress(node) + if err != nil { + return nil, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err) } url := url.URL{ - Scheme: scheme, - Host: net.JoinHostPort(info.ConnectAddress, strconv.Itoa(kubeletPort)), + Scheme: kc.scheme, + Host: net.JoinHostPort(addr, strconv.Itoa(port)), Path: "/stats/summary", RawQuery: "only_cpu_and_memory=true", } @@ -112,15 +114,3 @@ func (kc *kubeletClient) GetSummary(ctx context.Context, info NodeInfo) (*Summar err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary) return summary, err } - -func NewKubeletClient(transport http.RoundTripper, port int, kubeletUseNodeStatusPort bool, deprecatedNoTLS bool) (KubeletInterface, error) { - c := &http.Client{ - Transport: transport, - } - return &kubeletClient{ - defaultPort: port, - kubeletUseNodeStatusPort: kubeletUseNodeStatusPort, - client: c, - deprecatedNoTLS: deprecatedNoTLS, - }, nil -} diff --git a/pkg/scraper/configs.go b/pkg/scraper/configs.go index 4e95ef13c..c56115487 100644 --- a/pkg/scraper/configs.go +++ b/pkg/scraper/configs.go @@ -16,44 +16,38 @@ package scraper import ( "fmt" + "net/http" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" -) - -// GetKubeletConfig fetches connection config for connecting to the Kubelet. -func GetKubeletConfig(cfg *rest.Config, port int, kubeletUseNodeStatusPort bool, insecureTLS bool, completelyInsecure bool) *KubeletClientConfig { - if completelyInsecure { - cfg = rest.AnonymousClientConfig(cfg) // don't use auth to avoid leaking auth details to insecure endpoints - cfg.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS - } else if insecureTLS { - cfg.TLSClientConfig.Insecure = true - cfg.TLSClientConfig.CAData = nil - cfg.TLSClientConfig.CAFile = "" - } - kubeletConfig := &KubeletClientConfig{ - Port: port, - KubeletUseNodeStatusPort: kubeletUseNodeStatusPort, - RESTConfig: cfg, - DeprecatedCompletelyInsecure: completelyInsecure, - } - return kubeletConfig -} + "sigs.k8s.io/metrics-server/pkg/utils" +) // KubeletClientConfig represents configuration for connecting to Kubelets. type KubeletClientConfig struct { - KubeletUseNodeStatusPort bool - Port int - RESTConfig *rest.Config - DeprecatedCompletelyInsecure bool + RESTConfig *rest.Config + AddressTypePriority []corev1.NodeAddressType + Scheme string + DefaultPort int + UseNodeStatusPort bool } -// KubeletClientFor constructs a new KubeletInterface for the given configuration. -func KubeletClientFor(config *KubeletClientConfig) (KubeletInterface, error) { +// Complete constructs a new kubeletCOnfig for the given configuration. +func (config KubeletClientConfig) Complete() (*kubeletClient, error) { transport, err := rest.TransportFor(config.RESTConfig) if err != nil { return nil, fmt.Errorf("unable to construct transport: %v", err) } - return NewKubeletClient(transport, config.Port, config.KubeletUseNodeStatusPort, config.DeprecatedCompletelyInsecure) + c := &http.Client{ + Transport: transport, + } + return &kubeletClient{ + addrResolver: utils.NewPriorityNodeAddressResolver(config.AddressTypePriority), + defaultPort: config.DefaultPort, + client: c, + scheme: config.Scheme, + useNodeStatusPort: config.UseNodeStatusPort, + }, nil } diff --git a/pkg/scraper/scraper.go b/pkg/scraper/scraper.go index 1c92cfe49..9caa72983 100644 --- a/pkg/scraper/scraper.go +++ b/pkg/scraper/scraper.go @@ -97,11 +97,10 @@ func RegisterScraperMetrics(scrapeTimeout time.Duration) { legacyregistry.MustRegister(scraperDuration) } -func NewScraper(nodeLister v1listers.NodeLister, client KubeletInterface, addrResolver utils.NodeAddressResolver, scrapeTimeout time.Duration) *Scraper { +func NewScraper(nodeLister v1listers.NodeLister, client KubeletInterface, scrapeTimeout time.Duration) *Scraper { return &Scraper{ nodeLister: nodeLister, kubeletClient: client, - addrResolver: addrResolver, scrapeTimeout: scrapeTimeout, } } @@ -109,7 +108,6 @@ func NewScraper(nodeLister v1listers.NodeLister, client KubeletInterface, addrRe type Scraper struct { nodeLister v1listers.NodeLister kubeletClient KubeletInterface - addrResolver utils.NodeAddressResolver scrapeTimeout time.Duration } @@ -118,11 +116,10 @@ type Scraper struct { type NodeInfo struct { Name string ConnectAddress string - KubeletPort int } func (c *Scraper) Scrape(baseCtx context.Context) (*storage.MetricsBatch, error) { - nodes, err := c.GetNodes() + nodes, err := c.nodeLister.List(labels.Everything()) var errs []error if err != nil { // save the error, and continue on in case of partial results @@ -144,7 +141,7 @@ func (c *Scraper) Scrape(baseCtx context.Context) (*storage.MetricsBatch, error) } for _, node := range nodes { - go func(node NodeInfo) { + go func(node *corev1.Node) { // Prevents network congestion. sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond time.Sleep(sleepDuration) @@ -153,7 +150,7 @@ func (c *Scraper) Scrape(baseCtx context.Context) (*storage.MetricsBatch, error) ctx, cancelTimeout := context.WithTimeout(baseCtx, c.scrapeTimeout-sleepDuration) defer cancelTimeout() - klog.V(2).Infof("Querying source: {%s %s %d}", node.Name, node.ConnectAddress, node.KubeletPort) + klog.V(2).Infof("Querying source: %s", node) metrics, err := c.collectNode(ctx, node) if err != nil { err = fmt.Errorf("unable to fully scrape metrics from node %s: %v", node.Name, err) @@ -185,7 +182,7 @@ func (c *Scraper) Scrape(baseCtx context.Context) (*storage.MetricsBatch, error) return res, utilerrors.NewAggregate(errs) } -func (c *Scraper) collectNode(ctx context.Context, node NodeInfo) (*storage.MetricsBatch, error) { +func (c *Scraper) collectNode(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) { startTime := myClock.Now() defer func() { scraperDuration.WithLabelValues(node.Name).Observe(float64(myClock.Since(startTime)) / float64(time.Second)) @@ -196,45 +193,12 @@ func (c *Scraper) collectNode(ctx context.Context, node NodeInfo) (*storage.Metr if err != nil { scrapeTotal.WithLabelValues("false").Inc() - return nil, fmt.Errorf("unable to fetch metrics from Kubelet %s (%s): %v", node.Name, node.ConnectAddress, err) + return nil, fmt.Errorf("unable to fetch metrics from node %s: %v", node.Name, err) } scrapeTotal.WithLabelValues("true").Inc() return decodeBatch(summary), nil } -func (c *Scraper) GetNodes() ([]NodeInfo, error) { - nodes, err := c.nodeLister.List(labels.Everything()) - if err != nil { - return nil, fmt.Errorf("unable to list nodes: %v", err) - } - - var errs []error - result := make([]NodeInfo, 0, len(nodes)) - for _, node := range nodes { - info, err := c.getNodeInfo(node) - if err != nil { - errs = append(errs, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err)) - continue - } - result = append(result, info) - } - return result, utilerrors.NewAggregate(errs) -} - -func (c *Scraper) getNodeInfo(node *corev1.Node) (NodeInfo, error) { - addr, err := c.addrResolver.NodeAddress(node) - if err != nil { - return NodeInfo{}, err - } - info := NodeInfo{ - Name: node.Name, - ConnectAddress: addr, - KubeletPort: int(node.Status.DaemonEndpoints.KubeletEndpoint.Port), - } - - return info, nil -} - type clock interface { Now() time.Time Since(time.Time) time.Duration diff --git a/pkg/scraper/scraper_test.go b/pkg/scraper/scraper_test.go index 91fdc7c10..bc5040fb6 100644 --- a/pkg/scraper/scraper_test.go +++ b/pkg/scraper/scraper_test.go @@ -21,16 +21,16 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/component-base/metrics/testutil" - "sigs.k8s.io/metrics-server/pkg/storage" - "sigs.k8s.io/metrics-server/pkg/utils" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" v1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/component-base/metrics/testutil" + + "sigs.k8s.io/metrics-server/pkg/storage" ) const timeDrift = 50 * time.Millisecond @@ -40,9 +40,9 @@ func TestScraper(t *testing.T) { RunSpecs(t, "Scraper Suite") } -func nodeStats(host string, cpu, memory int, scrapeTime time.Time) NodeStats { +func nodeStats(node *corev1.Node, cpu, memory int, scrapeTime time.Time) NodeStats { return NodeStats{ - NodeName: host, + NodeName: node.Name, CPU: cpuStats(100, scrapeTime.Add(100*time.Millisecond)), Memory: memStats(200, scrapeTime.Add(200*time.Millisecond)), } @@ -52,12 +52,15 @@ var _ = Describe("Scraper", func() { var ( scrapeTime = time.Now() nodeLister fakeNodeLister - resolver utils.NodeAddressResolver client fakeKubeletClient + node1 = makeNode("node1", "node1.somedomain", "10.0.1.2", true) + node2 = makeNode("node-no-host", "", "10.0.1.3", true) + node3 = makeNode("node3", "node3.somedomain", "10.0.1.4", false) + node4 = makeNode("node4", "node4.somedomain", "10.0.1.5", true) ) BeforeEach(func() { summary := &Summary{ - Node: nodeStats("node1", 100, 200, scrapeTime), + Node: nodeStats(node1, 100, 200, scrapeTime), Pods: []PodStats{ podStats("ns1", "pod1", containerStats("container1", 300, 400, scrapeTime.Add(10*time.Millisecond)), @@ -70,20 +73,14 @@ var _ = Describe("Scraper", func() { containerStats("container1", 1100, 1200, scrapeTime.Add(50*time.Millisecond))), }, } - resolver = utils.NewPriorityNodeAddressResolver(utils.DefaultAddressTypePriority) - nodeLister = fakeNodeLister{nodes: []*corev1.Node{ - makeNode("node1", "node1.somedomain", "10.0.1.2", true), - makeNode("node-no-host", "", "10.0.1.3", true), - makeNode("node3", "node3.somedomain", "10.0.1.4", false), - makeNode("node4", "node4.somedomain", "10.0.1.5", true), - }} + nodeLister = fakeNodeLister{nodes: []*corev1.Node{node1, node2, node3, node4}} client = fakeKubeletClient{ - delay: map[string]time.Duration{}, - metrics: map[string]*Summary{ - "node1.somedomain": summary, - "10.0.1.3": {Node: nodeStats("node-no-host", 100, 200, scrapeTime)}, - "node3.somedomain": {Node: nodeStats("node3", 100, 200, scrapeTime)}, - "node4.somedomain": {Node: nodeStats("node4", 100, 200, scrapeTime)}, + delay: map[*corev1.Node]time.Duration{}, + metrics: map[*corev1.Node]*Summary{ + node1: summary, + node2: {Node: nodeStats(node2, 100, 200, scrapeTime)}, + node3: {Node: nodeStats(node3, 100, 200, scrapeTime)}, + node4: {Node: nodeStats(node4, 100, 200, scrapeTime)}, }, } }) @@ -95,7 +92,7 @@ var _ = Describe("Scraper", func() { By("running the scraper with a context timeout of 3*seconds") start := time.Now() - scraper := NewScraper(&nodeLister, &client, resolver, 3*time.Second) + scraper := NewScraper(&nodeLister, &client, 3*time.Second) timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 4*time.Second) dataBatch, errs := scraper.Scrape(timeoutCtx) doneWithWork() @@ -114,12 +111,12 @@ var _ = Describe("Scraper", func() { Context("when some clients take too long", func() { It("should pass the scrape timeout to the source context, so that sources can time out", func() { By("setting up one source to take 4 seconds, and another to take 2") - client.delay["node1.somedomain"] = 4 * time.Second + client.delay[node1] = 4 * time.Second client.defaultDelay = 2 * time.Second By("running the source scraper with a scrape timeout of 3 seconds") start := time.Now() - scraper := NewScraper(&nodeLister, &client, resolver, 3*time.Second) + scraper := NewScraper(&nodeLister, &client, 3*time.Second) dataBatch, errs := scraper.Scrape(context.Background()) Expect(errs).To(HaveOccurred()) @@ -138,7 +135,7 @@ var _ = Describe("Scraper", func() { By("running the source scraper with a scrape timeout of 5 seconds, but a context timeout of 1 second") start := time.Now() - scraper := NewScraper(&nodeLister, &client, resolver, 5*time.Second) + scraper := NewScraper(&nodeLister, &client, 5*time.Second) timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 1*time.Second) dataBatch, errs := scraper.Scrape(timeoutCtx) doneWithWork() @@ -160,11 +157,9 @@ var _ = Describe("Scraper", func() { now: time.Time{}, later: time.Time{}.Add(time.Second), } - nodes := fakeNodeLister{nodes: []*corev1.Node{ - makeNode("node1", "node1.somedomain", "10.0.1.2", true), - }} + nodes := fakeNodeLister{nodes: []*corev1.Node{node1}} - scraper := NewScraper(&nodes, &client, resolver, 3*time.Second) + scraper := NewScraper(&nodes, &client, 3*time.Second) _, errs := scraper.Scrape(context.Background()) Expect(errs).NotTo(HaveOccurred()) @@ -204,86 +199,43 @@ var _ = Describe("Scraper", func() { }) It("should continue on error fetching node information for a particular node", func() { - By("deleting the IP of a node") + By("deleting node") nodeLister.nodes[0].Status.Addresses = nil - scraper := NewScraper(&nodeLister, &client, resolver, 5*time.Second) + delete(client.metrics, node1) + scraper := NewScraper(&nodeLister, &client, 5*time.Second) - By("listing the nodes") - nodes, err := scraper.GetNodes() - Expect(err).To(HaveOccurred()) + By("running the scraper") + dataBatch, errs := scraper.Scrape(context.Background()) + Expect(errs).To(HaveOccurred()) - By("verifying that a source is present for each node") - Expect(nodes).To(HaveLen(3)) + By("ensuring that all other node were scraped") + Expect(nodeNames(dataBatch.Nodes)).To(ConsistOf([]string{"node4", "node-no-host", "node3"})) }) It("should gracefully handle list errors", func() { By("setting a fake error from the lister") nodeLister.listErr = fmt.Errorf("something went wrong, expectedly") - scraper := NewScraper(&nodeLister, &client, resolver, 5*time.Second) - - By("listing the sources") - _, err := scraper.GetNodes() - Expect(err).To(HaveOccurred()) - }) - - It("should prefer addresses according to the order of the types first", func() { - By("setting the first node to have multiple addresses and setting all nodeLister to ready") - nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{ - {Type: utils.DefaultAddressTypePriority[3], Address: "skip-val1"}, - {Type: utils.DefaultAddressTypePriority[2], Address: "skip-val2"}, - {Type: utils.DefaultAddressTypePriority[1], Address: "correct-val"}, - } - scraper := NewScraper(&nodeLister, &client, resolver, 5*time.Second) - By("listing all sources") - nodes, err := scraper.GetNodes() - Expect(err).NotTo(HaveOccurred()) - - By("making sure that the first source scrapes from the correct location") - Expect(nodes[0].ConnectAddress).To(Equal("correct-val")) - }) - - It("should prefer the first address that matches within a given type", func() { - By("setting the first node to have multiple addresses and setting all nodeLister to ready") - nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{ - {Type: utils.DefaultAddressTypePriority[1], Address: "skip-val1"}, - {Type: utils.DefaultAddressTypePriority[0], Address: "correct-val"}, - {Type: utils.DefaultAddressTypePriority[1], Address: "skip-val2"}, - {Type: utils.DefaultAddressTypePriority[0], Address: "second-val"}, - } - scraper := NewScraper(&nodeLister, &client, resolver, 5*time.Second) - - By("listing all sources") - nodes, err := scraper.GetNodes() - Expect(err).NotTo(HaveOccurred()) - - By("making sure that the first source scrapes from the correct location") - Expect(nodes[0].ConnectAddress).To(Equal("correct-val")) - }) - - It("should return an error if no preferred addresses are found", func() { - By("wiping out the addresses of one of the nodeLister and setting all nodeLister to ready") - nodeLister.nodes[0].Status.Addresses = nil - scraper := NewScraper(&nodeLister, &client, resolver, 5*time.Second) + scraper := NewScraper(&nodeLister, &client, 5*time.Second) - By("asking for scraper for all nodeLister") - _, err := scraper.GetNodes() + By("running the scraper") + _, err := scraper.Scrape(context.Background()) Expect(err).To(HaveOccurred()) }) }) type fakeKubeletClient struct { - delay map[string]time.Duration - metrics map[string]*Summary + delay map[*corev1.Node]time.Duration + metrics map[*corev1.Node]*Summary defaultDelay time.Duration } -func (c *fakeKubeletClient) GetSummary(ctx context.Context, info NodeInfo) (*Summary, error) { - delay, ok := c.delay[info.ConnectAddress] +func (c *fakeKubeletClient) GetSummary(ctx context.Context, node *corev1.Node) (*Summary, error) { + delay, ok := c.delay[node] if !ok { delay = c.defaultDelay } - metrics, ok := c.metrics[info.ConnectAddress] + metrics, ok := c.metrics[node] if !ok { - return nil, fmt.Errorf("Unknown host %q", info.ConnectAddress) + return nil, fmt.Errorf("Unknown node %q", node.Name) } select {