Skip to content

Commit

Permalink
Add pipeline run support for cloud events
Browse files Browse the repository at this point in the history
Replace the pipeline run controller own config store with the
shared one used by the taskrun controller too. The pipeline run
config store is only useful to the artifact storage, however
the artifact storage loads the config by fetching the configmap
via the kube client, so it does not use the config store.

Attaching the shared config store to the controller, along with
the cloud events client, enables the pipeline run controller to
start sending cloud events for all events where we send k8s events
today (except for error ones).

Add a reconciler unit test to verify that events are sent when
the sink is configured.
  • Loading branch information
afrittoli committed Jul 14, 2020
1 parent 8e9230d commit e51caf3
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 3 deletions.
7 changes: 7 additions & 0 deletions internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,13 @@ func PipelineRunNamespace(namespace string) PipelineRunOp {
}
}

// PipelineRunSelfLink adds a SelfLink
func PipelineRunSelfLink(selflink string) PipelineRunOp {
return func(tr *v1beta1.PipelineRun) {
tr.ObjectMeta.SelfLink = selflink
}
}

// PipelineRunSpec sets the PipelineRunSpec, references Pipeline with specified name, to the PipelineRun.
// Any number of PipelineRunSpec modifier can be passed to transform it.
func PipelineRunSpec(name string, ops ...PipelineRunSpecOp) PipelineRunOp {
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
conditioninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/condition"
Expand All @@ -31,7 +32,7 @@ import (
pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun"
resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/config"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
Expand Down Expand Up @@ -72,11 +73,12 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
metrics: metrics,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
}
impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
configStore := config.NewStore(images, logger.Named("config-store"))
configStore := config.NewStore(logger.Named("config-store"))
configStore.WatchConfigs(cmw)
return controller.Options{
AgentName: pipeline.PipelineRunControllerName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/tektoncd/pipeline/pkg/contexts"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun"
Expand Down Expand Up @@ -116,6 +117,7 @@ type Reconciler struct {
clusterTaskLister listers.ClusterTaskLister
resourceLister resourcelisters.PipelineResourceLister
conditionLister listersv1alpha1.ConditionLister
cloudEventClient cloudevent.CEClient
tracker tracker.Interface
timeoutHandler *reconciler.TimeoutSet
metrics *Recorder
Expand All @@ -132,6 +134,8 @@ var (
// resource with the current status of the resource.
func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) pkgreconciler.Event {
logger := logging.FromContext(ctx)
ctx = cloudevent.ToContext(ctx, c.cloudEventClient)

// Read the initial condition
before := pr.Status.GetCondition(apis.ConditionSucceeded)

Expand Down
122 changes: 122 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
tbv1alpha1 "github.com/tektoncd/pipeline/internal/builder/v1alpha1"
tb "github.com/tektoncd/pipeline/internal/builder/v1beta1"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
Expand Down Expand Up @@ -82,12 +83,37 @@ func getRunName(pr *v1beta1.PipelineRun) string {
return strings.Join([]string{pr.Namespace, pr.Name}, "/")
}

func ensureConfigurationConfigMapsExist(d *test.Data) {
var defaultsExists, featureFlagsExists bool
for _, cm := range d.ConfigMaps {
if cm.Name == config.GetDefaultsConfigName() {
defaultsExists = true
}
if cm.Name == config.GetFeatureFlagsConfigName() {
featureFlagsExists = true
}
}
if !defaultsExists {
d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.GetNamespace()},
Data: map[string]string{},
})
}
if !featureFlagsExists {
d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.GetNamespace()},
Data: map[string]string{},
})
}
}

// getPipelineRunController returns an instance of the PipelineRun controller/reconciler that has been seeded with
// d, where d represents the state of the system (existing resources) needed for the test.
func getPipelineRunController(t *testing.T, d test.Data) (test.Assets, func()) {
//unregisterMetrics()
ctx, _ := ttesting.SetupFakeContext(t)
ctx, cancel := context.WithCancel(ctx)
ensureConfigurationConfigMapsExist(&d)
c, informers := test.SeedTestData(t, ctx, d)
configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())

Expand All @@ -96,6 +122,9 @@ func getPipelineRunController(t *testing.T, d test.Data) (test.Assets, func()) {
if la, ok := ctl.Reconciler.(reconciler.LeaderAware); ok {
la.Promote(reconciler.UniversalBucket(), func(reconciler.Bucket, types.NamespacedName) {})
}
if err := configMapWatcher.Start(ctx.Done()); err != nil {
t.Fatalf("error starting configmap watcher: %v", err)
}

return test.Assets{
Logger: logging.FromContext(ctx),
Expand Down Expand Up @@ -3759,3 +3788,96 @@ func getTaskRunStatus(t string, status corev1.ConditionStatus) *v1beta1.Pipeline
},
}
}

// TestReconcile_CloudEvents runs reconcile with a cloud event sink configured
// to ensure that events are sent in different cases
func TestReconcile_CloudEvents(t *testing.T) {
names.TestingSeed()

prs := []*v1beta1.PipelineRun{
tb.PipelineRun("test-pipelinerun",
tb.PipelineRunNamespace("foo"),
tb.PipelineRunSelfLink("/pipeline/1234"),
tb.PipelineRunSpec("test-pipeline"),
),
}
ps := []*v1beta1.Pipeline{
tb.Pipeline("test-pipeline",
tb.PipelineNamespace("foo"),
tb.PipelineSpec(tb.PipelineTask("test-1", "test-task")),
),
}
ts := []*v1beta1.Task{
tb.Task("test-task", tb.TaskNamespace("foo"),
tb.TaskSpec(tb.Step("foo", tb.StepName("simple-step"),
tb.StepCommand("/mycmd"), tb.StepEnvVar("foo", "bar"),
))),
}
cms := []*corev1.ConfigMap{
{
ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.GetNamespace()},
Data: map[string]string{
"default-cloud-events-sink": "http://synk:8080",
},
},
}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
ConfigMaps: cms,
}

names.TestingSeed()

testAssets, cancel := getPipelineRunController(t, d)
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients

if err := c.Reconciler.Reconcile(context.Background(), "foo/test-pipelinerun"); err != nil {
t.Fatalf("Error reconciling: %s", err)
}

if len(clients.Pipeline.Actions()) == 0 {
t.Fatalf("Expected client to have been used to create a TaskRun but it wasn't")
}

// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Get("test-pipelinerun", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}

// This PipelineRun is in progress now and the status should reflect that
condition := reconciledRun.Status.GetCondition(apis.ConditionSucceeded)
if condition == nil || condition.Status != corev1.ConditionUnknown {
t.Errorf("Expected PipelineRun status to be in progress, but was %v", condition)
}
if condition != nil && condition.Reason != v1beta1.PipelineRunReasonRunning.String() {
t.Errorf("Expected reason %q but was %s", v1beta1.PipelineRunReasonRunning.String(), condition.Reason)
}

if len(reconciledRun.Status.TaskRuns) != 1 {
t.Errorf("Expected PipelineRun status to include the TaskRun status items that can run immediately: %v", reconciledRun.Status.TaskRuns)
}

wantEvents := []string{
"Normal Started",
"Normal Running Tasks Completed: 0",
}
err = checkEvents(t, testAssets.Recorder, "reconcile-cloud-events", wantEvents)
if !(err == nil) {
t.Errorf(err.Error())
}
wantCloudEvents := []string{
`(?s)dev.tekton.event.pipelinerun.started.v1.*test-pipelinerun`,
`(?s)dev.tekton.event.pipelinerun.running.v1.*test-pipelinerun`,
}
ceClient := clients.CloudEvents.(cloudevent.FakeClient)
err = checkCloudEvents(t, &ceClient, "reconcile-cloud-events", wantCloudEvents)
if !(err == nil) {
t.Errorf(err.Error())
}
}
1 change: 0 additions & 1 deletion pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: images,

taskRunLister: taskRunInformer.Lister(),
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
Expand Down

0 comments on commit e51caf3

Please sign in to comment.