diff --git a/pkg/apis/autoscaling/v1alpha1/metric_lifecycle.go b/pkg/apis/autoscaling/v1alpha1/metric_lifecycle.go index e785084716b5..61f73d60ef8c 100644 --- a/pkg/apis/autoscaling/v1alpha1/metric_lifecycle.go +++ b/pkg/apis/autoscaling/v1alpha1/metric_lifecycle.go @@ -18,9 +18,56 @@ package v1alpha1 import ( "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/pkg/apis" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" +) + +const ( + // MetricConditionReady is set when the Metric's latest + // underlying revision has reported readiness. + MetricConditionReady = apis.ConditionReady +) + +var condSet = apis.NewLivingConditionSet( + MetricConditionReady, ) // GetGroupVersionKind implements OwnerRefable. func (m *Metric) GetGroupVersionKind() schema.GroupVersionKind { return SchemeGroupVersion.WithKind("Metric") } + +// GetCondition gets the condition `t`. +func (ms *MetricStatus) GetCondition(t apis.ConditionType) *apis.Condition { + return condSet.Manage(ms).GetCondition(t) +} + +// InitializeConditions initializes the conditions of the Metric. +func (ms *MetricStatus) InitializeConditions() { + condSet.Manage(ms).InitializeConditions() +} + +// MarkMetricReady marks the metric status as ready +func (ms *MetricStatus) MarkMetricReady() { + condSet.Manage(ms).MarkTrue(MetricConditionReady) +} + +// MarkMetricNotReady marks the metric status as ready == Unknown +func (ms *MetricStatus) MarkMetricNotReady(reason, message string) { + condSet.Manage(ms).MarkUnknown(MetricConditionReady, reason, message) +} + +// MarkMetricFailed marks the metric status as failed +func (ms *MetricStatus) MarkMetricFailed(reason, message string) { + condSet.Manage(ms).MarkFalse(MetricConditionReady, reason, message) +} + +// IsReady looks at the conditions and if the condition MetricConditionReady +// is true +func (ms *MetricStatus) IsReady() bool { + return condSet.Manage(ms.duck()).IsHappy() +} + +func (ms *MetricStatus) duck() *duckv1beta1.Status { + return (*duckv1beta1.Status)(&ms.Status) +} diff --git a/pkg/apis/autoscaling/v1alpha1/metric_lifecycle_test.go b/pkg/apis/autoscaling/v1alpha1/metric_lifecycle_test.go index a8598e6eda43..e3f825282031 100644 --- a/pkg/apis/autoscaling/v1alpha1/metric_lifecycle_test.go +++ b/pkg/apis/autoscaling/v1alpha1/metric_lifecycle_test.go @@ -18,19 +18,158 @@ package v1alpha1 import ( "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" - + "knative.dev/pkg/apis" + "knative.dev/pkg/apis/duck" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + apitest "knative.dev/pkg/apis/testing" "knative.dev/serving/pkg/apis/autoscaling" ) +func TestMetricDuckTypes(t *testing.T) { + tests := []struct { + name string + t duck.Implementable + }{{ + name: "conditions", + t: &duckv1beta1.Conditions{}, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := duck.VerifyType(&Metric{}, test.t) + if err != nil { + t.Errorf("VerifyType(Metric, %T) = %v", test.t, err) + } + }) + } +} + +func TestMetricIsReady(t *testing.T) { + cases := []struct { + name string + status MetricStatus + isReady bool + }{{ + name: "empty status should not be ready", + status: MetricStatus{}, + isReady: false, + }, { + name: "Different condition type should not be ready", + status: MetricStatus{ + Status: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: "FooCondition", + Status: corev1.ConditionTrue, + }}, + }, + }, + isReady: false, + }, { + name: "False condition status should not be ready", + status: MetricStatus{ + Status: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: MetricConditionReady, + Status: corev1.ConditionFalse, + }}, + }, + }, + isReady: false, + }, { + name: "Unknown condition status should not be ready", + status: MetricStatus{ + Status: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: MetricConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + isReady: false, + }, { + name: "Missing condition status should not be ready", + status: MetricStatus{ + Status: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: MetricConditionReady, + }}, + }, + }, + isReady: false, + }, { + name: "True condition status should be ready", + status: MetricStatus{ + Status: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: MetricConditionReady, + Status: corev1.ConditionTrue, + }}, + }, + }, + isReady: true, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if e, a := tc.isReady, tc.status.IsReady(); e != a { + t.Errorf("Ready = %v, want: %v", a, e) + } + }) + } +} + +func TestMetricGetSetCondition(t *testing.T) { + ms := &MetricStatus{} + if a := ms.GetCondition(MetricConditionReady); a != nil { + t.Errorf("empty MetricStatus returned %v when expected nil", a) + } + mc := &apis.Condition{ + Type: MetricConditionReady, + Status: corev1.ConditionTrue, + } + ms.MarkMetricReady() + if diff := cmp.Diff(mc, ms.GetCondition(MetricConditionReady), cmpopts.IgnoreFields(apis.Condition{}, "LastTransitionTime")); diff != "" { + t.Errorf("GetCondition refs diff (-want +got): %v", diff) + } +} + +func TestTypicalFlowWithMetricCondition(t *testing.T) { + m := &MetricStatus{} + m.InitializeConditions() + apitest.CheckConditionOngoing(m.duck(), MetricConditionReady, t) + + const ( + wantReason = "reason" + wantMessage = "the error message" + ) + m.MarkMetricFailed(wantReason, wantMessage) + apitest.CheckConditionFailed(m.duck(), MetricConditionReady, t) + if got := m.GetCondition(MetricConditionReady); got == nil || got.Reason != wantReason || got.Message != wantMessage { + t.Errorf("MarkMetricFailed = %v, wantReason %v, wantMessage %v", got, wantReason, wantMessage) + } + + m.MarkMetricNotReady(wantReason, wantMessage) + apitest.CheckConditionOngoing(m.duck(), MetricConditionReady, t) + if got := m.GetCondition(MetricConditionReady); got == nil || got.Reason != wantReason || got.Message != wantMessage { + t.Errorf("MarkMetricNotReady = %v, wantReason %v, wantMessage %v", got, wantReason, wantMessage) + } + + m.MarkMetricReady() + apitest.CheckConditionSucceeded(m.duck(), MetricConditionReady, t) +} + func TestMetricGetGroupVersionKind(t *testing.T) { - m := &Metric{} + r := &Metric{} want := schema.GroupVersionKind{ Group: autoscaling.InternalGroupName, Version: "v1alpha1", Kind: "Metric", } - if got := m.GetGroupVersionKind(); got != want { + if got := r.GetGroupVersionKind(); got != want { t.Errorf("got: %v, want: %v", got, want) } } diff --git a/pkg/apis/autoscaling/v1alpha1/metric_types.go b/pkg/apis/autoscaling/v1alpha1/metric_types.go index b2592481b90d..376b05c5b114 100644 --- a/pkg/apis/autoscaling/v1alpha1/metric_types.go +++ b/pkg/apis/autoscaling/v1alpha1/metric_types.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" "knative.dev/pkg/kmeta" ) @@ -63,7 +64,9 @@ type MetricSpec struct { } // MetricStatus reflects the status of metric collection for this specific entity. -type MetricStatus struct{} +type MetricStatus struct { + duckv1beta1.Status `json:",inline"` +} // MetricList is a list of Metric resources // diff --git a/pkg/apis/autoscaling/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/autoscaling/v1alpha1/zz_generated.deepcopy.go index dee422bd5a85..435a5a8642e1 100644 --- a/pkg/apis/autoscaling/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/autoscaling/v1alpha1/zz_generated.deepcopy.go @@ -31,7 +31,7 @@ func (in *Metric) DeepCopyInto(out *Metric) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -105,6 +105,7 @@ func (in *MetricSpec) DeepCopy() *MetricSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MetricStatus) DeepCopyInto(out *MetricStatus) { *out = *in + in.Status.DeepCopyInto(&out.Status) return } diff --git a/pkg/autoscaler/collector.go b/pkg/autoscaler/collector.go index 7dadd93ff215..0b91eae93ced 100644 --- a/pkg/autoscaler/collector.go +++ b/pkg/autoscaler/collector.go @@ -259,9 +259,19 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.S scrapeTicker.Stop() return case <-scrapeTicker.C: - message, err := c.getScraper().Scrape() + message, err := c.getScraper().Scrape(logger) if err != nil { + copy := metric.DeepCopy() + switch { + case err == ErrFailedGetEndpoints: + copy.Status.MarkMetricNotReady("NoEndpoints", ErrFailedGetEndpoints.Error()) + case err == ErrDidNotReceiveStat: + copy.Status.MarkMetricFailed("DidNotReceiveStat", ErrDidNotReceiveStat.Error()) + default: + copy.Status.MarkMetricNotReady("CreateOrUpdateFailed", "Collector has failed.") + } logger.Errorw("Failed to scrape metrics", zap.Error(err)) + c.updateMetric(copy) } if message != nil { c.record(message.Stat) diff --git a/pkg/autoscaler/collector_test.go b/pkg/autoscaler/collector_test.go index 8e15b7829510..c05d84f74f12 100644 --- a/pkg/autoscaler/collector_test.go +++ b/pkg/autoscaler/collector_test.go @@ -22,11 +22,16 @@ import ( "time" "github.com/google/go-cmp/cmp" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" . "knative.dev/pkg/logging/testing" av1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" + "knative.dev/serving/pkg/apis/serving" ) var ( @@ -227,6 +232,122 @@ func TestMetricCollectorRecord(t *testing.T) { } } +func TestMetricCollectorError(t *testing.T) { + + testCases := []struct { + name string + scraper *testScraper + metric *av1alpha1.Metric + expectedMetricStatus duckv1beta1.Status + }{{ + name: "Failed to get endpoints scraper error", + scraper: &testScraper{ + s: func() (*StatMessage, error) { + return nil, ErrFailedGetEndpoints + }, + }, + metric: &av1alpha1.Metric{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testRevision, + Labels: map[string]string{ + serving.RevisionLabelKey: testRevision, + }, + }, + Spec: av1alpha1.MetricSpec{ + ScrapeTarget: testRevision + "-zhudex", + }, + }, + expectedMetricStatus: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: av1alpha1.MetricConditionReady, + Status: corev1.ConditionUnknown, + Reason: "NoEndpoints", + Message: ErrFailedGetEndpoints.Error(), + }}, + }, + }, { + name: "Did not receive stat scraper error", + scraper: &testScraper{ + s: func() (*StatMessage, error) { + return nil, ErrDidNotReceiveStat + }, + }, + metric: &av1alpha1.Metric{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testRevision, + Labels: map[string]string{ + serving.RevisionLabelKey: testRevision, + }, + }, + Spec: av1alpha1.MetricSpec{ + ScrapeTarget: testRevision + "-zhudex", + }, + }, + expectedMetricStatus: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: av1alpha1.MetricConditionReady, + Status: corev1.ConditionFalse, + Reason: "DidNotReceiveStat", + Message: ErrDidNotReceiveStat.Error(), + }}, + }, + }, { + name: "Other scraper error", + scraper: &testScraper{ + s: func() (*StatMessage, error) { + return nil, errors.New("foo") + }, + }, + metric: &av1alpha1.Metric{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testRevision, + Labels: map[string]string{ + serving.RevisionLabelKey: testRevision, + }, + }, + Spec: av1alpha1.MetricSpec{ + ScrapeTarget: testRevision + "-zhudex", + }, + }, + expectedMetricStatus: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{{ + Type: av1alpha1.MetricConditionReady, + Status: corev1.ConditionUnknown, + Reason: "CreateOrUpdateFailed", + Message: "Collector has failed.", + }}, + }, + }} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + defer ClearAll() + logger := TestLogger(t) + factory := scraperFactory(test.scraper, nil) + coll := NewMetricCollector(factory, logger) + coll.CreateOrUpdate(test.metric) + key := types.NamespacedName{Namespace: test.metric.Namespace, Name: test.metric.Name} + + var got duckv1beta1.Status + wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { + collection, ok := coll.collections[key] + if ok { + got = collection.currentMetric().Status.Status + return equality.Semantic.DeepEqual(got, test.expectedMetricStatus), nil + } + return false, nil + }) + if !equality.Semantic.DeepEqual(got, test.expectedMetricStatus) { + t.Errorf("Got = %#v, want: %#v, diff:\n%q", got, test.expectedMetricStatus, cmp.Diff(got, test.expectedMetricStatus)) + } + coll.Delete(test.metric.Namespace, test.metric.Name) + }) + } +} + func scraperFactory(scraper StatsScraper, err error) StatsScraperFactory { return func(*av1alpha1.Metric) (StatsScraper, error) { return scraper, err @@ -238,6 +359,6 @@ type testScraper struct { url string } -func (s *testScraper) Scrape() (*StatMessage, error) { +func (s *testScraper) Scrape(logger *zap.SugaredLogger) (*StatMessage, error) { return s.s() } diff --git a/pkg/autoscaler/stats_scraper.go b/pkg/autoscaler/stats_scraper.go index e5fc41152e45..3f18a26c2fb2 100644 --- a/pkg/autoscaler/stats_scraper.go +++ b/pkg/autoscaler/stats_scraper.go @@ -22,10 +22,11 @@ import ( "sync" "time" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/types" - "github.com/pkg/errors" + av1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" "knative.dev/serving/pkg/apis/networking" "knative.dev/serving/pkg/apis/serving" @@ -48,10 +49,20 @@ const ( scraperMaxRetries = 10 ) +var ( + // ErrFailedGetEndpoints specifies the error returned by scraper when it fails to + // get endpoints. + ErrFailedGetEndpoints = errors.New("failed to get endpoints") + + // ErrDidNotReceiveStat specifies the error returned by scraper when it does not receive + // stat from an unscraped pod + ErrDidNotReceiveStat = errors.New("did not receive stat from an unscraped pod") +) + // StatsScraper defines the interface for collecting Revision metrics type StatsScraper interface { // Scrape scrapes the Revision queue metric endpoint. - Scrape() (*StatMessage, error) + Scrape(logger *zap.SugaredLogger) (*StatMessage, error) } // scrapeClient defines the interface for collecting Revision metrics for a given @@ -128,10 +139,11 @@ func urlFromTarget(t, ns string) string { // Scrape calls the destination service then sends it // to the given stats channel. -func (s *ServiceScraper) Scrape() (*StatMessage, error) { +func (s *ServiceScraper) Scrape(logger *zap.SugaredLogger) (*StatMessage, error) { readyPodsCount, err := s.counter.ReadyCount() if err != nil { - return nil, errors.Wrap(err, "failed to get endpoints") + logger.Errorw(ErrFailedGetEndpoints.Error(), zap.Error(err)) + return nil, ErrFailedGetEndpoints } if readyPodsCount == 0 { @@ -162,7 +174,8 @@ func (s *ServiceScraper) Scrape() (*StatMessage, error) { // Return the inner error, if any. if err := grp.Wait(); err != nil { - return nil, errors.Wrapf(err, "unsuccessful scrape, sampleSize=%d", sampleSize) + logger.Errorw(fmt.Sprintf("unsuccessful scrape, sampleSize=%d", sampleSize), zap.Error(err)) + return nil, err } close(statCh) @@ -220,7 +233,7 @@ func (s *ServiceScraper) tryScrape(scrapedPods *sync.Map) (*Stat, error) { } if _, exists := scrapedPods.LoadOrStore(stat.PodName, struct{}{}); exists { - return nil, errors.New("did not receive stat from an unscraped pod") + return nil, ErrDidNotReceiveStat } return stat, nil diff --git a/pkg/autoscaler/stats_scraper_test.go b/pkg/autoscaler/stats_scraper_test.go index 4c9ac5227db9..70985cd4ab01 100644 --- a/pkg/autoscaler/stats_scraper_test.go +++ b/pkg/autoscaler/stats_scraper_test.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + . "knative.dev/pkg/logging/testing" av1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" "knative.dev/serving/pkg/apis/serving" "knative.dev/serving/pkg/resources" @@ -129,6 +130,9 @@ func TestNewServiceScraperWithClientErrorCases(t *testing.T) { } func TestScrapeReportStatWhenAllCallsSucceed(t *testing.T) { + defer ClearAll() + logger := TestLogger(t) + client := newTestScrapeClient(testStats, []error{nil}) scraper, err := serviceScraperForTest(client) if err != nil { @@ -140,9 +144,9 @@ func TestScrapeReportStatWhenAllCallsSucceed(t *testing.T) { // Scrape will set a timestamp bigger than this. now := time.Now() - got, err := scraper.Scrape() + got, err := scraper.Scrape(logger) if err != nil { - t.Fatalf("unexpected error from scraper.Scrape(): %v", err) + t.Fatalf("unexpected error from scraper.Scrape(logger): %v", err) } if got.Key != testPAKey { @@ -175,6 +179,9 @@ func TestScrapeReportStatWhenAllCallsSucceed(t *testing.T) { } func TestScrapeReportErrorCannotFindEnoughPods(t *testing.T) { + defer ClearAll() + logger := TestLogger(t) + client := newTestScrapeClient(testStats[2:], []error{nil}) scraper, err := serviceScraperForTest(client) if err != nil { @@ -184,13 +191,16 @@ func TestScrapeReportErrorCannotFindEnoughPods(t *testing.T) { // Make an Endpoints with 2 pods. endpoints(2, testService) - _, err = scraper.Scrape() + _, err = scraper.Scrape(logger) if err == nil { - t.Errorf("scrape.Scrape() = nil, expected an error") + t.Errorf("scrape.Scrape(logger) = nil, expected an error") } } func TestScrapeReportErrorIfAnyFails(t *testing.T) { + defer ClearAll() + logger := TestLogger(t) + errTest := errors.New("test") // 1 success and 10 failures so one scrape fails permanently through retries. @@ -204,13 +214,16 @@ func TestScrapeReportErrorIfAnyFails(t *testing.T) { // Make an Endpoints with 2 pods. endpoints(2, testService) - _, err = scraper.Scrape() + _, err = scraper.Scrape(logger) if errors.Cause(err) != errTest { - t.Errorf("scraper.Scrape() = %v, want %v", err, errTest) + t.Errorf("scraper.Scrape(logger) = %v, want %v", err, errTest) } } func TestScrapeDoNotScrapeIfNoPodsFound(t *testing.T) { + defer ClearAll() + logger := TestLogger(t) + client := newTestScrapeClient(testStats, nil) scraper, err := serviceScraperForTest(client) if err != nil { @@ -220,9 +233,9 @@ func TestScrapeDoNotScrapeIfNoPodsFound(t *testing.T) { // Make an Endpoints with 0 pods. endpoints(0, testService) - stat, err := scraper.Scrape() + stat, err := scraper.Scrape(logger) if err != nil { - t.Fatalf("got error from scraper.Scrape() = %v", err) + t.Fatalf("got error from scraper.Scrape(logger) = %v", err) } if stat != nil { t.Error("Received unexpected StatMessage.") diff --git a/pkg/reconciler/metric/metric.go b/pkg/reconciler/metric/metric.go index a568239bfb1f..3e7eb93bf398 100644 --- a/pkg/reconciler/metric/metric.go +++ b/pkg/reconciler/metric/metric.go @@ -60,8 +60,12 @@ func (r *reconciler) Reconcile(ctx context.Context, key string) error { return errors.Wrapf(err, "failed to fetch metric %q", key) } + metric.Status.InitializeConditions() + if err := r.collector.CreateOrUpdate(metric); err != nil { return errors.Wrapf(err, "failed to initiate or update scraping") } + + metric.Status.MarkMetricReady() return nil }