Skip to content

Commit

Permalink
✨ Send available condition events for managed cluster (#450)
Browse files Browse the repository at this point in the history
* Send available condition events for managed cluster

Signed-off-by: zhujian <jiazhu@redhat.com>

* Send available condition events for managed cluster

Signed-off-by: zhujian <jiazhu@redhat.com>

* Rename event reporting component

Signed-off-by: zhujian <jiazhu@redhat.com>

---------

Signed-off-by: zhujian <jiazhu@redhat.com>
  • Loading branch information
zhujian7 authored May 6, 2024
1 parent 5fc1dbd commit c749b42
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 41 deletions.
22 changes: 22 additions & 0 deletions pkg/common/helpers/event_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package helpers

import (
"context"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
kevents "k8s.io/client-go/tools/events"
)

// NewEventRecorder creates a new event recorder for the given controller, it will also log the events
func NewEventRecorder(ctx context.Context, scheme *runtime.Scheme,
kubeClient kubernetes.Interface, controllerName string) (kevents.EventRecorder, error) {
broadcaster := kevents.NewBroadcaster(&kevents.EventSinkImpl{Interface: kubeClient.EventsV1()})
err := broadcaster.StartRecordingToSinkWithContext(ctx)
if err != nil {
return nil, nil
}
broadcaster.StartStructuredLogging(0)
recorder := broadcaster.NewRecorder(scheme, controllerName)
return recorder, nil
}
72 changes: 72 additions & 0 deletions pkg/common/helpers/event_recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package helpers

import (
"context"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fakekube "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
workscheme "open-cluster-management.io/api/client/work/clientset/versioned/scheme"
clusterv1 "open-cluster-management.io/api/cluster/v1"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
)

func TestNewEventRecorder(t *testing.T) {
tests := []struct {
name string
scheme *runtime.Scheme
wait time.Duration
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "test new event recorder, scheme not match ",
scheme: workscheme.Scheme,
wait: 100 * time.Millisecond,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "test new event recorder, scheme match",
scheme: clusterscheme.Scheme,
wait: 100 * time.Millisecond,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "create")
},
},
{
name: "test new event recorder, scheme match, no wait",
scheme: clusterscheme.Scheme,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.TODO()
kubeClient := fakekube.NewSimpleClientset()
recorder, err := NewEventRecorder(ctx, tt.scheme, kubeClient, "test")
if err != nil {
t.Errorf("NewEventRecorder() error = %v", err)
return
}

object := &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
}
recorder.Eventf(object, nil, corev1.EventTypeNormal, "test", "test", "")
time.Sleep(tt.wait)
tt.validateActions(t, kubeClient.Actions())
})
}
}
8 changes: 3 additions & 5 deletions pkg/placement/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"github.com/openshift/library-go/pkg/controller/controllercmd"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/events"
"k8s.io/utils/clock"

clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/placement/controllers/metrics"
"open-cluster-management.io/ocm/pkg/placement/controllers/scheduling"
"open-cluster-management.io/ocm/pkg/placement/debugger"
Expand Down Expand Up @@ -44,13 +44,11 @@ func RunControllerManagerWithInformers(
clusterClient clusterclient.Interface,
clusterInformers clusterinformers.SharedInformerFactory,
) error {
broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeClient.EventsV1()})

if err := broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
recorder, err := helpers.NewEventRecorder(ctx, clusterscheme.Scheme, kubeClient, "placement-controller")
if err != nil {
return err
}

recorder := broadcaster.NewRecorder(clusterscheme.Scheme, "placementController")
metrics := metrics.NewScheduleMetrics(clock.RealClock{})

scheduler := scheduling.NewPluginScheduler(
Expand Down
27 changes: 15 additions & 12 deletions pkg/registration/hub/lease/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
coordv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coordinformers "k8s.io/client-go/informers/coordination/v1"
"k8s.io/client-go/kubernetes"
coordlisters "k8s.io/client-go/listers/coordination/v1"
kevents "k8s.io/client-go/tools/events"
"k8s.io/utils/pointer"

clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
Expand All @@ -34,11 +36,11 @@ var (

// leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available.
type leaseController struct {
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
mcEventRecorder kevents.EventRecorder
}

// NewClusterLeaseController creates a cluster lease controller on hub cluster.
Expand All @@ -47,15 +49,16 @@ func NewClusterLeaseController(
clusterClient clientset.Interface,
clusterInformer clusterv1informer.ManagedClusterInformer,
leaseInformer coordinformers.LeaseInformer,
recorder events.Recorder) factory.Controller {
recorder events.Recorder,
mcEventRecorder kevents.EventRecorder) factory.Controller {
c := &leaseController{
kubeClient: kubeClient,
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
mcEventRecorder: mcEventRecorder,
}
return factory.New().
WithFilteredEventsInformersQueueKeysFunc(
Expand Down Expand Up @@ -147,9 +150,9 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus

updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updated {
c.eventRecorder.Eventf("ManagedClusterAvailableConditionUpdated",
"update managed cluster %q available condition to unknown, due to its lease is not updated constantly",
cluster.Name)
newCluster.SetNamespace(newCluster.Name)
c.mcEventRecorder.Eventf(newCluster, nil, corev1.EventTypeWarning, "AvailableUnknown", "AvailableUnknown",
"The managed cluster (%s) cannot connect to the hub cluster.", cluster.Name)
}

return err
Expand Down
43 changes: 34 additions & 9 deletions pkg/registration/hub/lease/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
clienttesting "k8s.io/client-go/testing"

clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
v1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/sdk-go/pkg/patcher"

"open-cluster-management.io/ocm/pkg/common/helpers"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
Expand Down Expand Up @@ -57,7 +59,7 @@ func TestSync(t *testing.T) {
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-5*time.Minute)),
testinghelpers.NewManagedClusterLease(fmt.Sprintf("cluster-lease-%s", testinghelpers.TestManagedClusterName), now.Add(-5*time.Minute)),
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
validateActions: func(t *testing.T, hubKubeActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionAvailable,
Status: metav1.ConditionUnknown,
Expand All @@ -72,6 +74,22 @@ func TestSync(t *testing.T) {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)

if len(hubKubeActions) != 1 {
t.Errorf("Expected 1 event created in the sync loop, actual %d",
len(hubKubeActions))
}
actionEvent := hubKubeActions[0]
if actionEvent.GetResource().Resource != "events" {
t.Errorf("Expected event created, actual %s", actionEvent.GetResource())
}
if actionEvent.GetNamespace() != testinghelpers.TestManagedClusterName {
t.Errorf("Expected event created in namespace %s, actual %s",
testinghelpers.TestManagedClusterName, actionEvent.GetNamespace())
}
if actionEvent.GetVerb() != "create" {
t.Errorf("Expected event created, actual %s", actionEvent.GetVerb())
}
},
},
{
Expand Down Expand Up @@ -123,31 +141,38 @@ func TestSync(t *testing.T) {
}
}

leaseClient := kubefake.NewSimpleClientset(c.clusterLeases...)
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(leaseClient, time.Minute*10)
hubClient := kubefake.NewSimpleClientset(c.clusterLeases...)
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(hubClient, time.Minute*10)
leaseStore := leaseInformerFactory.Coordination().V1().Leases().Informer().GetStore()
for _, lease := range c.clusterLeases {
if err := leaseStore.Add(lease); err != nil {
t.Fatal(err)
}
}

ctx := context.TODO()
syncCtx := testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)

mcEventRecorder, err := helpers.NewEventRecorder(ctx, clusterscheme.Scheme, hubClient, "test")
if err != nil {
t.Fatal(err)
}
ctrl := &leaseController{
kubeClient: leaseClient,
kubeClient: hubClient,
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
mcEventRecorder: mcEventRecorder,
}
syncErr := ctrl.sync(context.TODO(), syncCtx)
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)
}
c.validateActions(t, leaseClient.Actions(), clusterClient.Actions())

// wait for the event to be recorded
time.Sleep(100 * time.Millisecond)
c.validateActions(t, hubClient.Actions(), clusterClient.Actions())
})
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/registration/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"

commonhelpers "open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"open-cluster-management.io/ocm/pkg/registration/hub/addon"
Expand Down Expand Up @@ -186,12 +188,17 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
)
}

mcRecorder, err := commonhelpers.NewEventRecorder(ctx, clusterscheme.Scheme, kubeClient, "registration-controller")
if err != nil {
return err
}
leaseController := lease.NewClusterLeaseController(
kubeClient,
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
kubeInformers.Coordination().V1().Leases(),
controllerContext.EventRecorder,
mcRecorder,
)

clockSyncController := lease.NewClockSyncController(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ rules:
- apiGroups: ["addon.open-cluster-management.io"]
resources: ["managedclusteraddons/status"]
verbs: ["patch", "update"]
# Allow agent to send events to the hub
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["create"]
23 changes: 21 additions & 2 deletions pkg/registration/spoke/managedcluster/claim_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
fakekube "k8s.io/client-go/kubernetes/fake"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
ocmfeature "open-cluster-management.io/api/feature"

"open-cluster-management.io/ocm/pkg/common/helpers"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/features"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
Expand Down Expand Up @@ -107,6 +110,13 @@ func TestSync(t *testing.T) {
}
}

fakeHubClient := fakekube.NewSimpleClientset()
ctx := context.TODO()
hubEventRecorder, err := helpers.NewEventRecorder(ctx,
clusterscheme.Scheme, fakeHubClient, "test")
if err != nil {
t.Fatal(err)
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
Expand All @@ -116,9 +126,10 @@ func TestSync(t *testing.T) {
kubeInformerFactory.Core().V1().Nodes(),
20,
eventstesting.NewTestingEventRecorder(t),
hubEventRecorder,
)

syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
syncErr := ctrl.sync(ctx, testingcommon.NewFakeSyncContext(t, ""))
testingcommon.AssertError(t, syncErr, c.expectedErr)

c.validateActions(t, clusterClient.Actions())
Expand Down Expand Up @@ -330,6 +341,13 @@ func TestExposeClaims(t *testing.T) {
c.maxCustomClusterClaims = 20
}

fakeHubClient := fakekube.NewSimpleClientset()
ctx := context.TODO()
hubEventRecorder, err := helpers.NewEventRecorder(ctx,
clusterscheme.Scheme, fakeHubClient, "test")
if err != nil {
t.Fatal(err)
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
Expand All @@ -339,9 +357,10 @@ func TestExposeClaims(t *testing.T) {
kubeInformerFactory.Core().V1().Nodes(),
c.maxCustomClusterClaims,
eventstesting.NewTestingEventRecorder(t),
hubEventRecorder,
)

syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, c.cluster.Name))
syncErr := ctrl.sync(ctx, testingcommon.NewFakeSyncContext(t, c.cluster.Name))
testingcommon.AssertError(t, syncErr, c.expectedErr)

c.validateActions(t, clusterClient.Actions())
Expand Down
Loading

0 comments on commit c749b42

Please sign in to comment.