Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed eventtype create-delete loop on built in sources #7245

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/reconciler/apiserversource/apiserversource.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
return nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, source *v1.ApiServerSource) pkgreconciler.Event {
logging.FromContext(ctx).Info("Deleting source")
// Allow for eventtypes to be cleaned up
source.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{}
return nil
}

Comment on lines +137 to +143
Copy link
Member

@pierDipi pierDipi Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update is not guaranteed to be seen by the other controller (since after FinalizeKind ends the finalizer is removed and with it the resource) and also I think there is (or there should be) an owner reference that should clean up event types associated with this source.

func (r *Reconciler) namespacesFromSelector(src *v1.ApiServerSource) ([]string, error) {
if src.Spec.NamespaceSelector == nil {
return []string{src.Namespace}, nil
Expand Down
71 changes: 71 additions & 0 deletions pkg/reconciler/apiserversource/apiserversource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ func TestReconcile(t *testing.T) {
},
WantErr: true,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "InternalError", `insufficient permissions: User system:serviceaccount:testnamespace:default cannot get, list, watch resource "namespaces" in API group "" in Namespace "testnamespace"`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(false)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -193,6 +197,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -254,8 +264,12 @@ func TestReconcile(t *testing.T) {
Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b"}, false),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -320,8 +334,12 @@ func TestReconcile(t *testing.T) {
Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b", "test-c"}, true),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -373,6 +391,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -422,6 +446,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand All @@ -441,9 +471,13 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "SinkNotFound",
`Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -485,11 +519,15 @@ func TestReconcile(t *testing.T) {
Key: testNS + "/" + sourceName,
WantErr: true,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, apiserversourceDeploymentCreated,
"Deployment created, error:inducing failure for create deployments"),
Eventf(corev1.EventTypeWarning, "InternalError",
"inducing failure for create deployments"),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -578,6 +616,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand All @@ -602,8 +646,12 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -660,8 +708,12 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -718,8 +770,12 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -793,6 +849,12 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantCreates: []runtime.Object{
makeSubjectAccessReview("namespaces", "get", "default"),
makeSubjectAccessReview("namespaces", "list", "default"),
Expand Down Expand Up @@ -1018,3 +1080,12 @@ func subjectAccessReviewCreateReactor(allowed bool) clientgotesting.ReactionFunc
return false, nil, nil
}
}

func patchFinalizers(name, namespace string) clientgotesting.PatchActionImpl {
action := clientgotesting.PatchActionImpl{}
action.Name = name
action.Namespace = namespace
patch := `{"metadata":{"finalizers":["apiserversources.sources.knative.dev"],"resourceVersion":""}}`
action.Patch = []byte(patch)
return action
}
7 changes: 7 additions & 0 deletions pkg/reconciler/pingsource/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *sourcesv1.PingSo
return nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, source *sourcesv1.PingSource) pkgreconciler.Event {
logging.FromContext(ctx).Info("Deleting source")
// Allow for eventtypes to be cleaned up
source.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{}
return nil
}

func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *sourcesv1.PingSource) (*appsv1.Deployment, error) {
args := resources.Args{
ConfigEnvVars: r.configAcc.ToEnvVars(),
Expand Down
49 changes: 49 additions & 0 deletions pkg/reconciler/pingsource/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,13 @@ func TestAllCases(t *testing.T) {
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "SinkNotFound",
`Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "sink ref has no namespace",
Objects: []runtime.Object{
Expand Down Expand Up @@ -183,9 +187,13 @@ func TestAllCases(t *testing.T) {
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "SinkNotFound",
`Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "error creating deployment",
Objects: []runtime.Object{
Expand All @@ -208,8 +216,12 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "InternalError", "deployments.apps \"pingsource-mt-adapter\" not found"),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantErr: true,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rtv1.NewPingSource(sourceName, testNS,
Expand Down Expand Up @@ -271,6 +283,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "Propagate CA certs",
Objects: []runtime.Object{
Expand Down Expand Up @@ -321,6 +339,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "deployment update due to env",
Objects: []runtime.Object{
Expand All @@ -344,8 +368,12 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, pingSourceDeploymentUpdated, `PingSource adapter deployment updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rtv1.NewPingSource(sourceName, testNS,
rtv1.WithPingSourceSpec(sourcesv1.PingSourceSpec{
Expand Down Expand Up @@ -411,6 +439,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "valid with dataBase64",
Objects: []runtime.Object{
Expand Down Expand Up @@ -453,6 +487,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
},
}

Expand Down Expand Up @@ -512,3 +552,12 @@ func makeAvailableMTAdapterWithDifferentEnv() *appsv1.Deployment {
WithDeploymentAvailable()(ma)
return ma
}

func patchFinalizers(name, namespace string) clientgotesting.PatchActionImpl {
action := clientgotesting.PatchActionImpl{}
action.Name = name
action.Namespace = namespace
patch := `{"metadata":{"finalizers":["pingsources.sources.knative.dev"],"resourceVersion":""}}`
action.Patch = []byte(patch)
return action
}
7 changes: 7 additions & 0 deletions pkg/reconciler/source/duck/duck.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import (
"knative.dev/eventing/pkg/reconciler/source/duck/resources"
)

const (
defaultNamespace = "default"
)

type Reconciler struct {
// eventingClientSet allows us to configure Eventing objects
eventingClientSet clientset.Interface
Expand Down Expand Up @@ -211,6 +215,9 @@ func (r *Reconciler) makeEventTypes(ctx context.Context, src *duckv1.Source) []v
CeSchema: schemaURL,
Description: description,
})
if eventType.Spec.Reference.Namespace == "" {
eventType.Spec.Reference.Namespace = defaultNamespace
}
Comment on lines +218 to +220
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to set a namespace, the correct one is src.Namespace, assuming default is wrong

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we had this assumption before.

but I like this suggestion

Copy link
Member

@pierDipi pierDipi Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did not, that was on the Broker name, which different from the reference namespace here

eventTypes = append(eventTypes, *eventType)
}
return eventTypes
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/source/duck/duck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ func makeEventType(ceType, ceSource string) *v1beta2.EventType {

func makeEventTypeWithReference(ceType, ceSource string, ref *duckv1.KReference) *v1beta2.EventType {
ceSourceURL, _ := apis.ParseURL(ceSource)
if ref.Namespace == "" {
ref.Namespace = "default"
}
return &v1beta2.EventType{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%x", md5.Sum([]byte(ceType+ceSource+sourceUID))),
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/features/apiserversource/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func SendsEventsWithEventTypes() *feature.Feature {
})
f.Requirement("ApiServerSource goes ready", apiserversource.IsReady(source))

expectedCeTypes := sets.New(sources.ApiServerSourceEventReferenceModeTypes...)
expectedCeTypes := sets.New(sources.ApiServerSourceEventResourceModeTypes...)

f.Stable("ApiServerSource as event source").
Must("delivers events on broker with URI",
Expand Down
Loading