Skip to content

Commit

Permalink
Remove controller.Events
Browse files Browse the repository at this point in the history
Since events.Recorder implements the k8s EventRecorder interface, there
is no need of the controller.Events any more. The reconcilers can embed
the k8s EventRecorder and use a events.Recorder recorder.

Update the events.Recorder to embed a k8s event recorder to pass events
to k8s along with an external recorder.

The trace events are passed to k8s recorder as a normal event since it
only accepts normal and warning event types.

Update tests to use testenv with suite_test.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Oct 18, 2021
1 parent 43b0ffb commit 1510ec0
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 115 deletions.
97 changes: 0 additions & 97 deletions runtime/controller/events.go

This file was deleted.

39 changes: 33 additions & 6 deletions runtime/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,27 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
)

// Recorder posts events to the webhook address.
// Recorder posts events to the kubernetes API and any other event recorder webhook address, like the GitOps Toolkit
// notification-controller.
//
// Use it by embedding EventRecorder in reconciler struct:
//
// import (
// ...
// kuberecorder "k8s.io/client-go/tools/record"
// ...
// )
//
// type MyTypeReconciler {
// client.Client
// // ... etc.
// kuberecorder.EventRecorder
// }
//
// Use NewRecorder to create a working Recorder.
type Recorder struct {
// URL address of the events endpoint.
Webhook string
Expand All @@ -45,6 +63,9 @@ type Recorder struct {
// Retryable HTTP client.
Client *retryablehttp.Client

// EventRecorder is the kubernetes event recorder.
EventRecorder kuberecorder.EventRecorder

// Scheme of the recorded objects.
Scheme *runtime.Scheme

Expand All @@ -54,9 +75,10 @@ type Recorder struct {

var _ kuberecorder.EventRecorder = &Recorder{}

// NewRecorder creates an event Recorder with default settings.
// The recorder performs automatic retries for connection errors and 500-range response codes.
func NewRecorder(scheme *runtime.Scheme, log logr.Logger, webhook, reportingController string) (*Recorder, error) {
// NewRecorder creates an event Recorder with k8s event recorder and an external event recorder based on the given
// webhook. The recorder performs automatic retries for connection errors and 500-range response codes from the external
// recorder.
func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController string) (*Recorder, error) {
if _, err := url.Parse(webhook); err != nil {
return nil, err
}
Expand All @@ -67,10 +89,11 @@ func NewRecorder(scheme *runtime.Scheme, log logr.Logger, webhook, reportingCont
httpClient.Logger = nil

return &Recorder{
Scheme: scheme,
Scheme: mgr.GetScheme(),
Webhook: webhook,
ReportingController: reportingController,
Client: httpClient,
EventRecorder: mgr.GetEventRecorderFor(reportingController),
Log: log,
}, nil
}
Expand Down Expand Up @@ -101,11 +124,15 @@ func (r *Recorder) AnnotatedEventf(
severity := eventTypeToSeverity(eventtype)

// Do not send trace events to notification controller,
// traces are persisted as Kubernetes events only.
// traces are persisted as Kubernetes events only as normal events.
if severity == EventSeverityTrace {
r.EventRecorder.AnnotatedEventf(object, annotations, corev1.EventTypeNormal, reason, messageFmt, args...)
return
}

// Forward the event to the k8s recorder.
r.EventRecorder.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)

if r.Client == nil {
r.Log.Error(nil, "retryable HTTP client has not been initialized")
return
Expand Down
16 changes: 4 additions & 12 deletions runtime/events/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -50,9 +48,7 @@ func TestEventRecorder_AnnotatedEventf(t *testing.T) {
}))
defer ts.Close()

scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)

obj := &corev1.ConfigMap{}
Expand Down Expand Up @@ -86,9 +82,7 @@ func TestEventRecorder_AnnotatedEventf_Retry(t *testing.T) {
}))
defer ts.Close()

scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)
eventRecorder.Client.RetryMax = 2

Expand All @@ -115,16 +109,14 @@ func TestEventRecorder_AnnotatedEventf_RateLimited(t *testing.T) {
}))
defer ts.Close()

scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)
eventRecorder.Client.RetryMax = 2

obj := &corev1.ConfigMap{}
obj.Namespace = "gitops-system"
obj.Name = "webapp"

eventRecorder.AnnotatedEventf(obj, nil, "sync", "sync %s", obj.Name)
eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name)
require.Equal(t, 1, requestCount)
}
76 changes: 76 additions & 0 deletions runtime/events/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
"fmt"
"os"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/fluxcd/pkg/runtime/testenv"
)

var (
env *testenv.Environment
ctx = ctrl.SetupSignalHandler()
)

func TestMain(m *testing.M) {
scheme := runtime.NewScheme()
utilruntime.Must(corev1.AddToScheme(scheme))

env = testenv.New(
testenv.WithScheme(scheme),
)

go func() {
fmt.Println("Starting the test environment")
if err := env.Start(ctx); err != nil {
panic(fmt.Sprintf("Failed to start the test environment manager: %v", err))
}
}()
<-env.Manager.Elected()

// Create test namespace.
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "gitops-system",
},
}
if err := env.Client.Create(ctx, ns); err != nil {
panic(fmt.Sprintf("Failed to create gitops-system namespace: %v", err))
}

code := m.Run()

if err := env.Client.Delete(ctx, ns); err != nil {
panic(fmt.Sprintf("Failed to delete gitops-system namespace: %v", err))
}

fmt.Println("Stopping the test environment")
if err := env.Stop(); err != nil {
panic(fmt.Sprintf("Failed to stop the test environment: %v", err))
}

os.Exit(code)
}

0 comments on commit 1510ec0

Please sign in to comment.