Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Metric tekton_pipelines_controller_pipelinerun_count #4468

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions pkg/pipelinerunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.opencensus.io/tag"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -209,14 +210,21 @@ func nilInsertTag(task, taskrun string) []tag.Mutator {
// DurationAndCount logs the duration of PipelineRun execution and
// count for number of PipelineRuns succeed or failed
// returns an error if its failed to log the metrics
func (r *Recorder) DurationAndCount(pr *v1beta1.PipelineRun) error {
r.mutex.Lock()
defer r.mutex.Unlock()
func (r *Recorder) DurationAndCount(pr *v1beta1.PipelineRun, beforeCondition *apis.Condition) error {

if !r.initialized {
return fmt.Errorf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", pr.Name)
}

afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded)
// To avoid recount
if equality.Semantic.DeepEqual(beforeCondition, afterCondition) {
return nil
}

r.mutex.Lock()
defer r.mutex.Unlock()

duration := time.Duration(0)
if pr.Status.StartTime != nil {
duration = time.Since(pr.Status.StartTime.Time)
Expand Down
104 changes: 90 additions & 14 deletions pkg/pipelinerunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func getConfigContext() context.Context {
func TestUninitializedMetrics(t *testing.T) {
metrics := Recorder{}

if err := metrics.DurationAndCount(&v1beta1.PipelineRun{}); err == nil {
if err := metrics.DurationAndCount(&v1beta1.PipelineRun{}, nil); err == nil {
t.Error("DurationAndCount recording expected to return error but got nil")
}
if err := metrics.RunningPipelineRuns(nil); err == nil {
Expand Down Expand Up @@ -110,12 +110,13 @@ func TestMetricsOnStore(t *testing.T) {

func TestRecordPipelineRunDurationCount(t *testing.T) {
for _, test := range []struct {
name string
pipelineRun *v1beta1.PipelineRun
expectedTags map[string]string
expectedCountTags map[string]string
expectedDuration float64
expectedCount int64
name string
pipelineRun *v1beta1.PipelineRun
expectedDurationTags map[string]string
expectedCountTags map[string]string
expectedDuration float64
expectedCount int64
beforeCondition *apis.Condition
}{{
name: "for succeeded pipeline",
pipelineRun: &v1beta1.PipelineRun{
Expand All @@ -136,7 +137,7 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
},
},
expectedTags: map[string]string{
expectedDurationTags: map[string]string{
"pipeline": "pipeline-1",
"pipelinerun": "pipelinerun-1",
"namespace": "ns",
Expand All @@ -147,6 +148,70 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
expectedDuration: 60,
expectedCount: 1,
beforeCondition: nil,
}, {
name: "for succeeded pipeline different condition",
pipelineRun: &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-1", Namespace: "ns"},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{Name: "pipeline-1"},
},
Status: v1beta1.PipelineRunStatus{
Status: duckv1beta1.Status{
Conditions: duckv1beta1.Conditions{{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
}},
},
PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &startTime,
CompletionTime: &completionTime,
},
},
},
expectedDurationTags: map[string]string{
"pipeline": "pipeline-1",
"pipelinerun": "pipelinerun-1",
"namespace": "ns",
"status": "success",
},
expectedCountTags: map[string]string{
"status": "success",
},
expectedDuration: 60,
expectedCount: 1,
beforeCondition: &apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
},
}, {
name: "for succeeded pipeline recount",
pipelineRun: &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-1", Namespace: "ns"},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{Name: "pipeline-1"},
},
Status: v1beta1.PipelineRunStatus{
Status: duckv1beta1.Status{
Conditions: duckv1beta1.Conditions{{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
}},
},
PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &startTime,
CompletionTime: &completionTime,
},
},
},
expectedDurationTags: nil,
expectedCountTags: nil,
expectedDuration: 0,
expectedCount: 0,
beforeCondition: &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
},
}, {
name: "for cancelled pipeline",
pipelineRun: &v1beta1.PipelineRun{
Expand All @@ -168,7 +233,7 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
},
},
expectedTags: map[string]string{
expectedDurationTags: map[string]string{
"pipeline": "pipeline-1",
"pipelinerun": "pipelinerun-1",
"namespace": "ns",
Expand All @@ -179,6 +244,7 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
expectedDuration: 60,
expectedCount: 1,
beforeCondition: nil,
}, {
name: "for failed pipeline",
pipelineRun: &v1beta1.PipelineRun{
Expand All @@ -199,7 +265,7 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
},
},
expectedTags: map[string]string{
expectedDurationTags: map[string]string{
"pipeline": "pipeline-1",
"pipelinerun": "pipelinerun-1",
"namespace": "ns",
Expand All @@ -210,6 +276,7 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
expectedDuration: 60,
expectedCount: 1,
beforeCondition: nil,
}, {
name: "for pipeline without start or completion time",
pipelineRun: &v1beta1.PipelineRun{
Expand All @@ -227,7 +294,7 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
},
},
expectedTags: map[string]string{
expectedDurationTags: map[string]string{
"pipeline": "pipeline-1",
"pipelinerun": "pipelinerun-1",
"namespace": "ns",
Expand All @@ -238,6 +305,7 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
},
expectedDuration: 0,
expectedCount: 1,
beforeCondition: nil,
}} {
t.Run(test.name, func(t *testing.T) {
unregisterMetrics()
Expand All @@ -248,11 +316,19 @@ func TestRecordPipelineRunDurationCount(t *testing.T) {
t.Fatalf("NewRecorder: %v", err)
}

if err := metrics.DurationAndCount(test.pipelineRun); err != nil {
if err := metrics.DurationAndCount(test.pipelineRun, test.beforeCondition); err != nil {
t.Errorf("DurationAndCount: %v", err)
}
metricstest.CheckLastValueData(t, "pipelinerun_duration_seconds", test.expectedTags, test.expectedDuration)
metricstest.CheckCountData(t, "pipelinerun_count", test.expectedCountTags, test.expectedCount)
if test.expectedDurationTags != nil {
metricstest.CheckLastValueData(t, "pipelinerun_duration_seconds", test.expectedDurationTags, test.expectedDuration)
} else {
metricstest.CheckStatsNotReported(t, "pipelinerun_duration_seconds")
}
if test.expectedCountTags != nil {
metricstest.CheckCountData(t, "pipelinerun_count", test.expectedCountTags, test.expectedCount)
} else {
metricstest.CheckStatsNotReported(t, "pipelinerun_count")
}

})
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
logger.Errorf("Failed to update Run status for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
}
go func(metrics *pipelinerunmetrics.Recorder) {
err := metrics.DurationAndCount(pr)
if err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, nil)
}

Expand Down Expand Up @@ -242,6 +236,24 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
return nil
}

func (c *Reconciler) durationAndCountMetrics(ctx context.Context, pr *v1beta1.PipelineRun) {
logger := logging.FromContext(ctx)
if pr.IsDone() {
// We get latest pipelinerun cr already to avoid recount
newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name)
Comment on lines +242 to +243
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with how/when the lister updates, but wouldn't pulling in the latest status mean that the current pr status and the "before" status always match, since the status is updated during reconcile?

It's would be useful to add a test to make sure we're handling the reconciler state changes properly (i.e. making sure we're avoiding instances where before always equals after) - right now we're really only testing the DurationAndCount func directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reconcile changes status but that isn't applied to the pipelienrun object in Informer cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding aligns with @khrm, the status changes but the change is not visible through the lister.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own knowledge - When would the lister value update? Is it because we're updating only the status that this is safe to do?

I'm a bit confused with the current impl/comments since we're getting the "latest" pipelinerun, but this is really the pipelinerun before the latest status update?

I played around with this change and this appears to be correct (I even tried adding some sleeps to try and trigger a race condition), but it feels like we're relying on non-obvious behavior of the lister. I'd have a lot more peace of mind if we could add a test to ensure that it's safe rely on this lister behavior and be able to catch if this assumption is no longer true (i.e. if we make an unrelated change that would cause the lister to start returning the true latest value).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, sounds good. @khrm please add a unit test including an update during the reconcile in addition to what we have otherwise looks good to me 👍 Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share @wlynch concern - this feels like introducing a potential race condition.
Wouldn't it be safer to extract the beforeCondition at the beginning of the reconcile so we can use it later?
That's what we do for events already - see https://github.com/tektoncd/pipeline/blob/7855cb720c3fc59e585c2e230f71e488fc9038be/pkg/reconciler/pipelinerun/pipelinerun.go#L148:L148.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this comment.

#4468 (comment)

if err != nil {
logger.Errorf("Error getting PipelineRun %s when updating metrics: %w", pr.Name, err)
}
before := newPr.Status.GetCondition(apis.ConditionSucceeded)
go func(metrics *pipelinerunmetrics.Recorder) {
err := metrics.DurationAndCount(pr, before)
if err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
}
}

func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, pr *v1beta1.PipelineRun, beforeCondition *apis.Condition, previousError error) error {
logger := logging.FromContext(ctx)

Expand Down Expand Up @@ -319,6 +331,7 @@ func (c *Reconciler) resolvePipelineState(
}

func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, getPipelineFunc resources.GetPipeline) error {
defer c.durationAndCountMetrics(ctx, pr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about creating a copy of the PR condition in a beforeCondition variable here, and pass that to durationAndCountMetrics?

Copy link
Contributor Author

@khrm khrm Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afrittoli When I was running a test, somehow the same event came twice even when the informer cache was updated. So I decided to use informer cache. Any idea why this happens? @wlynch

Is it due to the controller resync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afrittoli @wlynch Please check this comment:

#4468 (comment)

logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)
pr.SetDefaults(ctx)
Expand Down