Skip to content

Commit

Permalink
Add clock sync condition controller. (#312)
Browse files Browse the repository at this point in the history
Signed-off-by: xuezhaojun <zxue@redhat.com>
  • Loading branch information
xuezhaojun authored Dec 5, 2023
1 parent 7ceb9a2 commit 5884bc5
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 0 deletions.
150 changes: 150 additions & 0 deletions pkg/registration/hub/lease/clocksynccontroller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package lease

import (
"context"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
coordv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
coordinformers "k8s.io/client-go/informers/coordination/v1"
coordlisters "k8s.io/client-go/listers/coordination/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"

"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/common/queue"
)

type clockSyncController struct {
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
}

const (
clockSyncControllerName = "ClockSyncController"
)

func NewClockSyncController(
clusterClient clientset.Interface,
clusterInformer clusterv1informer.ManagedClusterInformer,
leaseInformer coordinformers.LeaseInformer,
recorder events.Recorder,
) factory.Controller {
c := &clockSyncController{
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-clock-sync-controller"),
}

syncCtx := factory.NewSyncContext(clockSyncControllerName, recorder)
leaseRenewTimeUpdateInformer := renewUpdateInfomer(syncCtx.Queue(), leaseInformer)

return factory.New().WithSyncContext(syncCtx).
WithBareInformers(leaseRenewTimeUpdateInformer).
WithSync(c.sync).
ToController(clockSyncControllerName, recorder)
}

func renewUpdateInfomer(q workqueue.RateLimitingInterface, leaseInformer coordinformers.LeaseInformer) factory.Informer {
leaseRenewTimeUpdateInformer := leaseInformer.Informer()
queueKeyByLabel := queue.QueueKeyByLabel(clusterv1.ClusterNameLabelKey)
_, err := leaseRenewTimeUpdateInformer.AddEventHandler(&cache.FilteringResourceEventHandler{
FilterFunc: queue.UnionFilter(queue.FileterByLabel(clusterv1.ClusterNameLabelKey), queue.FilterByNames(leaseName)),
Handler: &cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
// only renew field update event will be added to queue
oldLease := oldObj.(*coordv1.Lease)
newLease := newObj.(*coordv1.Lease)
if !oldLease.Spec.RenewTime.Equal(newLease.Spec.RenewTime) {
for _, queueKey := range queueKeyByLabel(newLease) {
q.Add(queueKey)
}
}
},
},
})
if err != nil {
runtime.HandleError(err)
}
return leaseRenewTimeUpdateInformer
}

func (c *clockSyncController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
clusterName := syncCtx.QueueKey()

// the event caused by resync will be filtered because the cluster is not found
cluster, err := c.clusterLister.Get(clusterName)
if errors.IsNotFound(err) {
// the cluster is not found, do nothing
return nil
}
if err != nil {
return err
}

now := time.Now()
observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName)
if err != nil {
return err
}
// When the agent's lease get renewed, the "now" on hub should close to the RenewTime on agent.
// If the two time are not close(over 1 lease duration), we assume the clock is out of sync.
oneLeaseDuration := time.Duration(LeaseDurationSeconds) * time.Second
if err := c.updateClusterStatusClockSynced(ctx, cluster,
now.Sub(observedLease.Spec.RenewTime.Time) < oneLeaseDuration && observedLease.Spec.RenewTime.Time.Sub(now) < oneLeaseDuration); err != nil {
return err
}
return nil
}

func (c *clockSyncController) updateClusterStatusClockSynced(ctx context.Context, cluster *clusterv1.ManagedCluster, synced bool) error {
var desiredStatus metav1.ConditionStatus
var condition metav1.Condition
if synced {
desiredStatus = metav1.ConditionTrue
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterClockSynced",
Message: "The clock of the managed cluster is synced with the hub.",
}
} else {
desiredStatus = metav1.ConditionFalse
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
}

if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced, desiredStatus) {
// the managed cluster clock synced condition alreay is desired status, do nothing
return nil
}

newCluster := cluster.DeepCopy()
meta.SetStatusCondition(&newCluster.Status.Conditions, condition)

updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updated {
c.eventRecorder.Eventf("ManagedClusterClockSyncedConditionUpdated",
"update managed cluster %q clock synced condition to %v.", cluster.Name, desiredStatus)
}
return err
}
125 changes: 125 additions & 0 deletions pkg/registration/hub/lease/clocksynccontroller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package lease

import (
"context"
"encoding/json"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
v1 "open-cluster-management.io/api/cluster/v1"

"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)

func TestClockSyncController(t *testing.T) {
// cases:
// 1. hub and agent clock is close
// 2. hub and agent clock is not close
cases := []struct {
name string
clusters []runtime.Object
leases []runtime.Object
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
}{
{
name: "hub and agent clock is close",
clusters: []runtime.Object{
testinghelpers.NewManagedCluster(),
},
leases: []runtime.Object{
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(5*time.Second)),
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterClockSynced",
Message: "The clock of the managed cluster is synced with the hub.",
}
testingcommon.AssertActions(t, clusterActions, "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
},
},
{
name: "hub and agent clock is not close",
clusters: []runtime.Object{
testinghelpers.NewManagedCluster(),
},
leases: []runtime.Object{
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(61*time.Second)),
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
testingcommon.AssertActions(t, clusterActions, "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.clusters...)
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
for _, cluster := range c.clusters {
if err := clusterStore.Add(cluster); err != nil {
t.Fatal(err)
}
}

leaseClient := kubefake.NewSimpleClientset(c.leases...)
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(leaseClient, time.Minute*10)
leaseStore := leaseInformerFactory.Coordination().V1().Leases().Informer().GetStore()
for _, lease := range c.leases {
if err := leaseStore.Add(lease); err != nil {
t.Fatal(err)
}
}

syncCtx := testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)

controller := &clockSyncController{
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
}
syncErr := controller.sync(context.TODO(), syncCtx)
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)
}
c.validateActions(t, leaseClient.Actions(), clusterClient.Actions())
})
}

}
8 changes: 8 additions & 0 deletions pkg/registration/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
controllerContext.EventRecorder,
)

clockSyncController := lease.NewClockSyncController(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
kubeInformers.Coordination().V1().Leases(),
controllerContext.EventRecorder,
)

managedClusterSetController := managedclusterset.NewManagedClusterSetController(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
Expand Down Expand Up @@ -268,6 +275,7 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
go taintController.Run(ctx, 1)
go csrController.Run(ctx, 1)
go leaseController.Run(ctx, 1)
go clockSyncController.Run(ctx, 1)
go managedClusterSetController.Run(ctx, 1)
go managedClusterSetBindingController.Run(ctx, 1)
go clusterroleController.Run(ctx, 1)
Expand Down
57 changes: 57 additions & 0 deletions test/integration/registration/managedcluster_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,45 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() {
gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
assertAvailableCondition(managedClusterName, metav1.ConditionUnknown, gracePeriod)
})

ginkgo.It("clock sync condition should work", func() {
// run registration agent
agentOptions := &spoke.SpokeAgentOptions{
BootstrapKubeconfig: bootstrapKubeConfigFile,
HubKubeconfigSecret: hubKubeconfigSecret,
ClusterHealthCheckPeriod: 1 * time.Minute,
}
commOptions := commonoptions.NewAgentOptions()
commOptions.HubKubeconfigDir = hubKubeconfigDir
commOptions.SpokeClusterName = managedClusterName

stop := runAgent("cluster-leasetest", agentOptions, commOptions, spokeCfg)
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds)

gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
assertCloclSyncedCondition(managedClusterName, metav1.ConditionTrue, gracePeriod)

// stop the agent in case agent update the lease.
stop()

// update the managed cluster lease renew time
now := time.Now()
gomega.Eventually(func() error {
lease, err := util.GetManagedClusterLease(kubeClient, managedClusterName)
if err != nil {
return err
}
// The default lease duration is 60s.
// The renewTime is 2 leaseDuration before the hub's now, so the clock should be out of sync.
// The renewTime + 5 * leaseDuration > now, so the available condition should be true
lease.Spec.RenewTime = &metav1.MicroTime{Time: now.Add(-120 * time.Second)}
_, err = kubeClient.CoordinationV1().Leases(managedClusterName).Update(context.TODO(), lease, metav1.UpdateOptions{})
return err
}, eventuallyInterval, eventuallyTimeout).ShouldNot(gomega.HaveOccurred())

assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0)
assertCloclSyncedCondition(managedClusterName, metav1.ConditionFalse, 0)
})
})

func bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret string, leaseDuration int32) {
Expand Down Expand Up @@ -213,3 +252,21 @@ func updateManagedClusterLeaseDuration(clusterName string, leaseDuration int32)
_, err = clusterClient.ClusterV1().ManagedClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
return err
}

func assertCloclSyncedCondition(managedClusterName string, status metav1.ConditionStatus, d int) {
<-time.After(time.Duration(d) * time.Second)
gomega.Eventually(func() error {
managedCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
if err != nil {
return err
}
cond := meta.FindStatusCondition(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced)
if cond == nil {
return fmt.Errorf("available condition is not found")
}
if cond.Status != status {
return fmt.Errorf("expected avaibale condition is %s, but %v", status, cond)
}
return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
}
9 changes: 9 additions & 0 deletions test/integration/util/managedcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -21,6 +22,14 @@ func GetManagedCluster(clusterClient clusterclientset.Interface, spokeClusterNam
return spokeCluster, nil
}

func GetManagedClusterLease(kubeClient kubernetes.Interface, spokeClusterName string) (*coordinationv1.Lease, error) {
lease, err := kubeClient.CoordinationV1().Leases(spokeClusterName).Get(context.TODO(), "managed-cluster-lease", metav1.GetOptions{})
if err != nil {
return nil, err
}
return lease, nil
}

func AcceptManagedCluster(clusterClient clusterclientset.Interface, spokeClusterName string) error {
return AcceptManagedClusterWithLeaseDuration(clusterClient, spokeClusterName, 60)
}
Expand Down

0 comments on commit 5884bc5

Please sign in to comment.