Skip to content

Commit

Permalink
Merge pull request #3132 from ramramu3433/fix-leader-election
Browse files Browse the repository at this point in the history
✨ Add installControllers and uninstallControllers for leaderElectionStopping
  • Loading branch information
kcp-ci-bot committed May 30, 2024
2 parents 62229c4 + 25094fb commit 3fb4c44
Show file tree
Hide file tree
Showing 4 changed files with 447 additions and 207 deletions.
62 changes: 31 additions & 31 deletions pkg/reconciler/cache/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,82 +71,82 @@ func NewController(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicCacheClient: dynamicCacheClient,

gvrs: map[schema.GroupVersionResource]replicatedGVR{
Gvrs: map[schema.GroupVersionResource]replicatedGVR{
apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"): {
kind: "APIExport",
local: localKcpInformers.Apis().V1alpha1().APIExports().Informer(),
global: globalKcpInformers.Apis().V1alpha1().APIExports().Informer(),
Local: localKcpInformers.Apis().V1alpha1().APIExports().Informer(),
Global: globalKcpInformers.Apis().V1alpha1().APIExports().Informer(),
},
apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): {
kind: "APIResourceSchema",
local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
global: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
Local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
Global: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
},
apisv1alpha1.SchemeGroupVersion.WithResource("apiconversions"): {
kind: "APIConversion",
local: localKcpInformers.Apis().V1alpha1().APIConversions().Informer(),
global: globalKcpInformers.Apis().V1alpha1().APIConversions().Informer(),
Local: localKcpInformers.Apis().V1alpha1().APIConversions().Informer(),
Global: globalKcpInformers.Apis().V1alpha1().APIConversions().Informer(),
},
admissionregistrationv1.SchemeGroupVersion.WithResource("mutatingwebhookconfigurations"): {
kind: "MutatingWebhookConfiguration",
local: localKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
global: globalKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
Local: localKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
Global: globalKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
},
admissionregistrationv1.SchemeGroupVersion.WithResource("validatingwebhookconfigurations"): {
kind: "ValidatingWebhookConfiguration",
local: localKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
global: globalKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
Local: localKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
Global: globalKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
},
admissionregistrationv1alpha1.SchemeGroupVersion.WithResource("validatingadmissionpolicies"): {
kind: "ValidatingAdmissionPolicy",
local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(),
global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(),
Local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(),
Global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(),
},
admissionregistrationv1alpha1.SchemeGroupVersion.WithResource("validatingadmissionpolicybindings"): {
kind: "ValidatingAdmissionPolicyBinding",
local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(),
global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(),
Local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(),
Global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(),
},
corev1alpha1.SchemeGroupVersion.WithResource("shards"): {
kind: "Shard",
local: localKcpInformers.Core().V1alpha1().Shards().Informer(),
global: globalKcpInformers.Core().V1alpha1().Shards().Informer(),
Local: localKcpInformers.Core().V1alpha1().Shards().Informer(),
Global: globalKcpInformers.Core().V1alpha1().Shards().Informer(),
},
corev1alpha1.SchemeGroupVersion.WithResource("logicalclusters"): {
kind: "LogicalCluster",
filter: func(u *unstructured.Unstructured) bool {
return u.GetAnnotations()[core.ReplicateAnnotationKey] != ""
},
local: localKcpInformers.Core().V1alpha1().LogicalClusters().Informer(),
global: globalKcpInformers.Core().V1alpha1().LogicalClusters().Informer(),
Local: localKcpInformers.Core().V1alpha1().LogicalClusters().Informer(),
Global: globalKcpInformers.Core().V1alpha1().LogicalClusters().Informer(),
},
tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"): {
kind: "WorkspaceType",
local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
Local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
Global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
},
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): {
kind: "ClusterRole",
filter: func(u *unstructured.Unstructured) bool {
return u.GetAnnotations()[core.ReplicateAnnotationKey] != ""
},
local: localKubeInformers.Rbac().V1().ClusterRoles().Informer(),
global: globalKubeInformers.Rbac().V1().ClusterRoles().Informer(),
Local: localKubeInformers.Rbac().V1().ClusterRoles().Informer(),
Global: globalKubeInformers.Rbac().V1().ClusterRoles().Informer(),
},
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): {
kind: "ClusterRoleBinding",
filter: func(u *unstructured.Unstructured) bool {
return u.GetAnnotations()[core.ReplicateAnnotationKey] != ""
},
local: localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
global: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
Local: localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
Global: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
},
},
}

for gvr, info := range c.gvrs {
for gvr, info := range c.Gvrs {
indexers.AddIfNotPresentOrDie(
info.global.GetIndexer(),
info.Global.GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
Expand All @@ -155,7 +155,7 @@ func NewController(
// shadow gvr to get the right value in the closure
gvr := gvr

_, _ = info.local.AddEventHandler(cache.FilteringResourceEventHandler{
_, _ = info.Local.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: IsNoSystemClusterName,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
Expand All @@ -164,7 +164,7 @@ func NewController(
},
})

_, _ = info.global.AddEventHandler(cache.FilteringResourceEventHandler{
_, _ = info.Global.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: IsNoSystemClusterName, // not really needed, but cannot harm
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) },
Expand Down Expand Up @@ -263,11 +263,11 @@ type controller struct {

dynamicCacheClient kcpdynamic.ClusterInterface

gvrs map[schema.GroupVersionResource]replicatedGVR
Gvrs map[schema.GroupVersionResource]replicatedGVR
}

type replicatedGVR struct {
kind string
filter func(u *unstructured.Unstructured) bool
global, local cache.SharedIndexInformer
Global, Local cache.SharedIndexInformer
}
6 changes: 3 additions & 3 deletions pkg/reconciler/cache/replication/replication_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error {
gvr := schema.GroupVersionResource{Version: gvrParts[0], Resource: gvrParts[1], Group: gvrParts[2]}
key := keyParts[1]

info := c.gvrs[gvr]
info := c.Gvrs[gvr]

r := &reconciler{
shardName: c.shardName,
getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name)
obj, exists, err := info.local.GetIndexer().GetByKey(key)
obj, exists, err := info.Local.GetIndexer().GetByKey(key)
if !exists {
return nil, apierrors.NewNotFound(gvr.GroupResource(), name)
} else if err != nil {
Expand All @@ -73,7 +73,7 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error {
return u, nil
},
getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) {
objs, err := info.global.GetIndexer().ByIndex(ByShardAndLogicalClusterAndNamespaceAndName, ShardAndLogicalClusterAndNamespaceKey(c.shardName, cluster, namespace, name))
objs, err := info.Global.GetIndexer().ByIndex(ByShardAndLogicalClusterAndNamespaceAndName, ShardAndLogicalClusterAndNamespaceKey(c.shardName, cluster, namespace, name))
if err != nil {
return nil, err // necessary to avoid non-zero nil interface
}
Expand Down
Loading

0 comments on commit 3fb4c44

Please sign in to comment.