Skip to content

Commit

Permalink
Improve the logger situation in metrics collection (#5428)
Browse files Browse the repository at this point in the history
We should not pass logger to the scrape and other fixes.

/assign mattmoor @nak3
Part of #5304
  • Loading branch information
vagababov authored and knative-prow-robot committed Sep 8, 2019
1 parent 52acee0 commit 0ef4a10
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 37 deletions.
12 changes: 7 additions & 5 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func main() {
}
}

func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer, metricClient autoscaler.MetricClient) func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) {
func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer,
metricClient autoscaler.MetricClient) autoscaler.UniScalerFactory {
return func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) {
if v, ok := decider.Labels[serving.ConfigurationLabelKey]; !ok || v == "" {
return nil, fmt.Errorf("label %q not found or empty in Decider %s", serving.ConfigurationLabelKey, decider.Name)
Expand All @@ -192,10 +193,11 @@ func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer, m
}
}

func statsScraperFactoryFunc(endpointsLister corev1listers.EndpointsLister) func(metric *av1alpha1.Metric) (autoscaler.StatsScraper, error) {
return func(metric *av1alpha1.Metric) (autoscaler.StatsScraper, error) {
podCounter := resources.NewScopedEndpointsCounter(endpointsLister, metric.Namespace, metric.Spec.ScrapeTarget)
return autoscaler.NewServiceScraper(metric, podCounter)
func statsScraperFactoryFunc(endpointsLister corev1listers.EndpointsLister) autoscaler.StatsScraperFactory {
return func(metric *av1alpha1.Metric, l *zap.SugaredLogger) (autoscaler.StatsScraper, error) {
podCounter := resources.NewScopedEndpointsCounter(
endpointsLister, metric.Namespace, metric.Spec.ScrapeTarget)
return autoscaler.NewServiceScraper(metric, podCounter, l)
}
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/autoscaler/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/logging/logkey"
"knative.dev/serving/pkg/autoscaler/aggregation"

"go.uber.org/zap"
Expand All @@ -46,7 +47,7 @@ var (
)

// StatsScraperFactory creates a StatsScraper for a given Metric.
type StatsScraperFactory func(*av1alpha1.Metric) (StatsScraper, error)
type StatsScraperFactory func(*av1alpha1.Metric, *zap.SugaredLogger) (StatsScraper, error)

// Stat defines a single measurement at a point in time
type Stat struct {
Expand Down Expand Up @@ -127,7 +128,8 @@ func NewMetricCollector(statsScraperFactory StatsScraperFactory, logger *zap.Sug
// it already exist.
// Map access optimized via double-checked locking.
func (c *MetricCollector) CreateOrUpdate(metric *av1alpha1.Metric) error {
scraper, err := c.statsScraperFactory(metric)
l := c.logger.With(zap.String(logkey.Key, metric.Namespace+"/"+metric.Name))
scraper, err := c.statsScraperFactory(metric, l)
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +261,7 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.S
scrapeTicker.Stop()
return
case <-scrapeTicker.C:
message, err := c.getScraper().Scrape(logger)
message, err := c.getScraper().Scrape()
if err != nil {
copy := metric.DeepCopy()
switch {
Expand Down
5 changes: 2 additions & 3 deletions pkg/autoscaler/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ func TestMetricCollectorRecord(t *testing.T) {
}

func TestMetricCollectorError(t *testing.T) {

testCases := []struct {
name string
scraper *testScraper
Expand Down Expand Up @@ -349,7 +348,7 @@ func TestMetricCollectorError(t *testing.T) {
}

func scraperFactory(scraper StatsScraper, err error) StatsScraperFactory {
return func(*av1alpha1.Metric) (StatsScraper, error) {
return func(*av1alpha1.Metric, *zap.SugaredLogger) (StatsScraper, error) {
return scraper, err
}
}
Expand All @@ -359,6 +358,6 @@ type testScraper struct {
url string
}

func (s *testScraper) Scrape(logger *zap.SugaredLogger) (*StatMessage, error) {
func (s *testScraper) Scrape() (*StatMessage, error) {
return s.s()
}
20 changes: 12 additions & 8 deletions pkg/autoscaler/stats_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package autoscaler
import (
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/types"
"github.com/pkg/errors"

av1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1"
"knative.dev/serving/pkg/apis/networking"
Expand Down Expand Up @@ -62,7 +63,7 @@ var (
// StatsScraper defines the interface for collecting Revision metrics
type StatsScraper interface {
// Scrape scrapes the Revision queue metric endpoint.
Scrape(logger *zap.SugaredLogger) (*StatMessage, error)
Scrape() (*StatMessage, error)
}

// scrapeClient defines the interface for collecting Revision metrics for a given
Expand Down Expand Up @@ -92,22 +93,24 @@ type ServiceScraper struct {
namespace string
metricKey types.NamespacedName
url string
logger *zap.SugaredLogger
}

// NewServiceScraper creates a new StatsScraper for the Revision which
// the given Metric is responsible for.
func NewServiceScraper(metric *av1alpha1.Metric, counter resources.ReadyPodCounter) (*ServiceScraper, error) {
func NewServiceScraper(metric *av1alpha1.Metric, counter resources.ReadyPodCounter, logger *zap.SugaredLogger) (*ServiceScraper, error) {
sClient, err := newHTTPScrapeClient(cacheDisabledClient)
if err != nil {
return nil, err
}
return newServiceScraperWithClient(metric, counter, sClient)
return newServiceScraperWithClient(metric, counter, sClient, logger)
}

func newServiceScraperWithClient(
metric *av1alpha1.Metric,
counter resources.ReadyPodCounter,
sClient scrapeClient) (*ServiceScraper, error) {
sClient scrapeClient,
logger *zap.SugaredLogger) (*ServiceScraper, error) {
if metric == nil {
return nil, errors.New("metric must not be nil")
}
Expand All @@ -128,6 +131,7 @@ func newServiceScraperWithClient(
url: urlFromTarget(metric.Spec.ScrapeTarget, metric.ObjectMeta.Namespace),
metricKey: types.NamespacedName{Namespace: metric.Namespace, Name: metric.Name},
namespace: metric.Namespace,
logger: logger,
}, nil
}

Expand All @@ -139,10 +143,10 @@ func urlFromTarget(t, ns string) string {

// Scrape calls the destination service then sends it
// to the given stats channel.
func (s *ServiceScraper) Scrape(logger *zap.SugaredLogger) (*StatMessage, error) {
func (s *ServiceScraper) Scrape() (*StatMessage, error) {
readyPodsCount, err := s.counter.ReadyCount()
if err != nil {
logger.Errorw(ErrFailedGetEndpoints.Error(), zap.Error(err))
s.logger.Errorw(ErrFailedGetEndpoints.Error(), zap.Error(err))
return nil, ErrFailedGetEndpoints
}

Expand Down Expand Up @@ -174,7 +178,7 @@ func (s *ServiceScraper) Scrape(logger *zap.SugaredLogger) (*StatMessage, error)

// Return the inner error, if any.
if err := grp.Wait(); err != nil {
logger.Errorw(fmt.Sprintf("unsuccessful scrape, sampleSize=%d", sampleSize), zap.Error(err))
s.logger.Errorw("unsuccessful scrape, sampleSize="+strconv.Itoa(sampleSize), zap.Error(err))
return nil, err
}
close(statCh)
Expand Down
38 changes: 20 additions & 18 deletions pkg/autoscaler/stats_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
. "knative.dev/pkg/logging/testing"
Expand Down Expand Up @@ -63,8 +64,10 @@ var (
)

func TestNewServiceScraperWithClientHappyCase(t *testing.T) {
defer ClearAll()
logger := TestLogger(t)
client := newTestScrapeClient(testStats, []error{nil})
if scraper, err := serviceScraperForTest(client); err != nil {
if scraper, err := serviceScraperForTest(client, logger); err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
} else {
if scraper.url != testURL {
Expand All @@ -83,6 +86,8 @@ func TestNewServiceScraperWithClientErrorCases(t *testing.T) {
client := newTestScrapeClient(testStats, []error{nil})
lister := kubeInformer.Core().V1().Endpoints().Lister()
counter := resources.NewScopedEndpointsCounter(lister, testNamespace, testService)
logger := TestLogger(t)
defer ClearAll()

testCases := []struct {
name string
Expand Down Expand Up @@ -116,7 +121,7 @@ func TestNewServiceScraperWithClientErrorCases(t *testing.T) {

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if _, err := newServiceScraperWithClient(test.metric, test.counter, test.client); err != nil {
if _, err := newServiceScraperWithClient(test.metric, test.counter, test.client, logger); err != nil {
got := err.Error()
want := test.expectedErr
if got != want {
Expand All @@ -134,7 +139,7 @@ func TestScrapeReportStatWhenAllCallsSucceed(t *testing.T) {
logger := TestLogger(t)

client := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(client)
scraper, err := serviceScraperForTest(client, logger)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}
Expand All @@ -144,7 +149,7 @@ func TestScrapeReportStatWhenAllCallsSucceed(t *testing.T) {

// Scrape will set a timestamp bigger than this.
now := time.Now()
got, err := scraper.Scrape(logger)
got, err := scraper.Scrape()
if err != nil {
t.Fatalf("unexpected error from scraper.Scrape(logger): %v", err)
}
Expand Down Expand Up @@ -181,71 +186,68 @@ func TestScrapeReportStatWhenAllCallsSucceed(t *testing.T) {
func TestScrapeReportErrorCannotFindEnoughPods(t *testing.T) {
defer ClearAll()
logger := TestLogger(t)

client := newTestScrapeClient(testStats[2:], []error{nil})
scraper, err := serviceScraperForTest(client)
scraper, err := serviceScraperForTest(client, logger)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}

// Make an Endpoints with 2 pods.
endpoints(2, testService)

_, err = scraper.Scrape(logger)
_, err = scraper.Scrape()
if err == nil {
t.Errorf("scrape.Scrape(logger) = nil, expected an error")
t.Errorf("scrape.Scrape() = nil, expected an error")
}
}

func TestScrapeReportErrorIfAnyFails(t *testing.T) {
defer ClearAll()
logger := TestLogger(t)

errTest := errors.New("test")

// 1 success and 10 failures so one scrape fails permanently through retries.
client := newTestScrapeClient(testStats, []error{nil,
errTest, errTest, errTest, errTest, errTest, errTest, errTest, errTest, errTest, errTest})
scraper, err := serviceScraperForTest(client)
scraper, err := serviceScraperForTest(client, logger)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}

// Make an Endpoints with 2 pods.
endpoints(2, testService)

_, err = scraper.Scrape(logger)
_, err = scraper.Scrape()
if errors.Cause(err) != errTest {
t.Errorf("scraper.Scrape(logger) = %v, want %v", err, errTest)
t.Errorf("scraper.Scrape() = %v, want %v", err, errTest)
}
}

func TestScrapeDoNotScrapeIfNoPodsFound(t *testing.T) {
defer ClearAll()
logger := TestLogger(t)

client := newTestScrapeClient(testStats, nil)
scraper, err := serviceScraperForTest(client)
scraper, err := serviceScraperForTest(client, logger)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}

// Make an Endpoints with 0 pods.
endpoints(0, testService)

stat, err := scraper.Scrape(logger)
stat, err := scraper.Scrape()
if err != nil {
t.Fatalf("got error from scraper.Scrape(logger) = %v", err)
t.Fatalf(" scraper.Scrape() returned error: %v", err)
}
if stat != nil {
t.Error("Received unexpected StatMessage.")
}
}

func serviceScraperForTest(sClient scrapeClient) (*ServiceScraper, error) {
func serviceScraperForTest(sClient scrapeClient, l *zap.SugaredLogger) (*ServiceScraper, error) {
metric := testMetric()
counter := resources.NewScopedEndpointsCounter(kubeInformer.Core().V1().Endpoints().Lister(), testNamespace, testService)
return newServiceScraperWithClient(metric, counter, sClient)
return newServiceScraperWithClient(metric, counter, sClient, l)
}

func testMetric() *av1alpha1.Metric {
Expand Down

0 comments on commit 0ef4a10

Please sign in to comment.