Skip to content

Commit

Permalink
optimize: add probe for cluster status
Browse files Browse the repository at this point in the history
  • Loading branch information
wy-lucky committed Dec 16, 2024
1 parent 26a5bcb commit 28a26d2
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 1 deletion.
8 changes: 8 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Options struct {

NSAutoPropExcludeRegexp string
ClusterJoinTimeout time.Duration
ClusterStatusThreshold time.Duration
MemberObjectEnqueueDelay time.Duration

MaxPodListers int64
Expand Down Expand Up @@ -174,6 +175,13 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string, disabl
time.Second*30,
"The period of health check for member clusters. The minimum value is "+MinClusterHealthCheckPeriod.String()+".",
)

flags.DurationVar(
&o.ClusterStatusThreshold,
"cluster-status-threshold",
time.Second*100,
"The threshold of member clusters status change.",
)
}

func (o *Options) addKlogFlags(flags *pflag.FlagSet) {
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func getComponentConfig(opts *options.Options) (*controllercontext.ComponentConf
MemberObjectEnqueueDelay: opts.MemberObjectEnqueueDelay,
EnableKatalystSupport: opts.EnableKatalystSupport,
ClusterHealthCheckPeriod: opts.ClusterHealthCheckPeriod,
ClusterStatusThreshold: opts.ClusterStatusThreshold,
}

if opts.ClusterHealthCheckPeriod < options.MinClusterHealthCheckPeriod {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ type ComponentConfig struct {
ResourceAggregationNodeFilter []labels.Selector
EnableKatalystSupport bool
ClusterHealthCheckPeriod time.Duration
ClusterStatusThreshold time.Duration
}
87 changes: 87 additions & 0 deletions pkg/controllers/federatedcluster/cluster_status_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package federatedcluster

Check failure on line 1 in pkg/controllers/federatedcluster/cluster_status_cache.go

View workflow job for this annotation

GitHub Actions / lint

Missed header for check (goheader)

import (
"context"
"sync"
"time"

"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
)

type clusterStatusStore struct {
clusterStatusData sync.Map
clusterStatusThreshold time.Duration
}

type clusterStatusConditionData struct {
offlineCondition fedcorev1a1.ClusterCondition
readyCondition fedcorev1a1.ClusterCondition
probeTimestamp time.Time
}

func (c *clusterStatusStore) thresholdAdjustedStatusCondition(
ctx context.Context,
cluster *fedcorev1a1.FederatedCluster,
observedOfflineCondition fedcorev1a1.ClusterCondition,
observedReadyCondition fedcorev1a1.ClusterCondition,
) (fedcorev1a1.ClusterCondition, fedcorev1a1.ClusterCondition) {
logger := klog.FromContext(ctx)

saved := c.get(cluster.Name)
if saved == nil {
// the cluster is just joined
c.update(cluster.Name, &clusterStatusConditionData{
offlineCondition: observedOfflineCondition,
readyCondition: observedReadyCondition,
})
return observedOfflineCondition, observedReadyCondition
}
curOfflineCondition := getClusterCondition(&cluster.Status, fedcorev1a1.ClusterOffline)
curReadyCondition := getClusterCondition(&cluster.Status, fedcorev1a1.ClusterReady)
if curOfflineCondition == nil || curReadyCondition == nil {
return observedOfflineCondition, observedReadyCondition
}

Check warning on line 45 in pkg/controllers/federatedcluster/cluster_status_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/cluster_status_cache.go#L29-L45

Added lines #L29 - L45 were not covered by tests

now := time.Now()
if saved.offlineCondition.Status != observedOfflineCondition.Status || saved.readyCondition.Status != observedReadyCondition.Status {
// condition status changed, record the probe timestamp
saved = &clusterStatusConditionData{
offlineCondition: observedOfflineCondition,
readyCondition: observedReadyCondition,
probeTimestamp: now,
}
}
c.update(cluster.Name, saved)

if curOfflineCondition.Status != observedOfflineCondition.Status || curReadyCondition.Status != observedReadyCondition.Status {
// threshold not exceeded, return the old status condition
if now.Before(saved.probeTimestamp.Add(c.clusterStatusThreshold)) {
logger.V(3).WithValues("offline", curOfflineCondition.Status, "ready", curReadyCondition.Status).
Info("Threshold not exceeded, return the old status condition")
return *curOfflineCondition, *curReadyCondition
}

Check warning on line 64 in pkg/controllers/federatedcluster/cluster_status_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/cluster_status_cache.go#L47-L64

Added lines #L47 - L64 were not covered by tests

logger.V(3).WithValues("offline", observedOfflineCondition.Status, "ready", observedReadyCondition.Status).
Info("Cluster status condition changed")

Check warning on line 67 in pkg/controllers/federatedcluster/cluster_status_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/cluster_status_cache.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}

return observedOfflineCondition, observedReadyCondition

Check warning on line 70 in pkg/controllers/federatedcluster/cluster_status_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/cluster_status_cache.go#L70

Added line #L70 was not covered by tests
}

func (c *clusterStatusStore) get(cluster string) *clusterStatusConditionData {
condition, ok := c.clusterStatusData.Load(cluster)
if !ok {
return nil
}
return condition.(*clusterStatusConditionData)

Check warning on line 78 in pkg/controllers/federatedcluster/cluster_status_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/cluster_status_cache.go#L73-L78

Added lines #L73 - L78 were not covered by tests
}

func (c *clusterStatusStore) update(cluster string, data *clusterStatusConditionData) {
c.clusterStatusData.Store(cluster, data)

Check warning on line 82 in pkg/controllers/federatedcluster/cluster_status_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/cluster_status_cache.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}

func (c *clusterStatusStore) delete(cluster string) {

Check failure on line 85 in pkg/controllers/federatedcluster/cluster_status_cache.go

View workflow job for this annotation

GitHub Actions / lint

func `(*clusterStatusStore).delete` is unused (unused)
c.clusterStatusData.Delete(cluster)

Check warning on line 86 in pkg/controllers/federatedcluster/cluster_status_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/cluster_status_cache.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
5 changes: 4 additions & 1 deletion pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,11 @@ func (c *FederatedClusterController) collectIndividualClusterStatus(
}

offlineCondition := getNewClusterOfflineCondition(offlineStatus, conditionTime)
setClusterCondition(&cluster.Status, &offlineCondition)
readyCondition := getNewClusterReadyCondition(readyStatus, readyReason, readyMessage, conditionTime)

offlineCondition, readyCondition = c.clusterStatusCache.thresholdAdjustedStatusCondition(ctx, cluster, offlineCondition, readyCondition)

setClusterCondition(&cluster.Status, &offlineCondition)

Check warning on line 305 in pkg/controllers/federatedcluster/clusterstatus.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/clusterstatus.go#L302-L305

Added lines #L302 - L305 were not covered by tests
setClusterCondition(&cluster.Status, &readyCondition)

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/federatedcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type FederatedClusterController struct {
clusterHealthCheckConfig *ClusterHealthCheckConfig
clusterJoinTimeout time.Duration
resourceAggregationNodeFilter []labels.Selector
clusterStatusCache clusterStatusStore

lock sync.Mutex
clusterConnectionHashes map[string]string
Expand Down Expand Up @@ -120,6 +121,10 @@ func NewFederatedClusterController(
clusterHealthCheckConfig: &ClusterHealthCheckConfig{
Period: componentConfig.ClusterHealthCheckPeriod,
},
clusterStatusCache: clusterStatusStore{
clusterStatusData: sync.Map{},
clusterStatusThreshold: componentConfig.ClusterStatusThreshold,
},

Check warning on line 127 in pkg/controllers/federatedcluster/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/federatedcluster/controller.go#L124-L127

Added lines #L124 - L127 were not covered by tests

lock: sync.Mutex{},
clusterConnectionHashes: map[string]string{},
Expand Down

0 comments on commit 28a26d2

Please sign in to comment.