Skip to content

Commit

Permalink
start service informer before build resource interpreter to avoid pan…
Browse files Browse the repository at this point in the history
…ic for unexpected calling when waiting for informer synced
  • Loading branch information
lxtywypc committed Jan 16, 2023
1 parent d697528 commit 8d1eb7a
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 27 deletions.
18 changes: 17 additions & 1 deletion cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"fmt"
"net"
"strconv"
"time"

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
Expand Down Expand Up @@ -229,7 +231,21 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, restConfig)
controlPlaneKubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)

serviceInformer := informers.NewSharedInformerFactory(controlPlaneKubeClientSet, 0).Core().V1().Services()
go serviceInformer.Informer().Run(stopChan)

for !serviceInformer.Informer().HasSynced() {
select {
case <-stopChan:
klog.Errorf("Failed to wait service informer synced")
default:
time.Sleep(time.Second)
}
}

resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceInformer.Lister())
if err := mgr.Add(resourceInterpreter); err != nil {
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
}
Expand Down
16 changes: 15 additions & 1 deletion cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
Expand Down Expand Up @@ -504,6 +505,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)

overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName))
skippedResourceConfig := util.NewSkippedResourceConfig()
Expand All @@ -519,7 +521,19 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop

controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)

resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, restConfig)
serviceInformer := informers.NewSharedInformerFactory(kubeClientSet, 0).Core().V1().Services()
go serviceInformer.Informer().Run(stopChan)

for !serviceInformer.Informer().HasSynced() {
select {
case <-stopChan:
klog.Errorf("Failed to wait service informer synced")
default:
time.Sleep(time.Second)
}
}

resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceInformer.Lister())
if err := mgr.Add(resourceInterpreter); err != nil {
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
}
Expand Down
20 changes: 2 additions & 18 deletions pkg/resourceinterpreter/customizedinterpreter/customized.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
corev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
utiltrace "k8s.io/utils/trace"
Expand All @@ -37,7 +35,7 @@ type CustomizedInterpreter struct {
}

// NewCustomizedInterpreter return a new CustomizedInterpreter.
func NewCustomizedInterpreter(ctx context.Context, informer genericmanager.SingleClusterInformerManager, config *rest.Config) (*CustomizedInterpreter, error) {
func NewCustomizedInterpreter(informer genericmanager.SingleClusterInformerManager, serviceLister corev1.ServiceLister) (*CustomizedInterpreter, error) {
cm, err := webhookutil.NewClientManager(
[]schema.GroupVersion{configv1alpha1.SchemeGroupVersion},
configv1alpha1.AddToScheme,
Expand All @@ -50,20 +48,6 @@ func NewCustomizedInterpreter(ctx context.Context, informer genericmanager.Singl
return nil, err
}

serviceInformer := informers.NewSharedInformerFactory(kubernetes.NewForConfigOrDie(config), 0).Core().V1().Services()
go serviceInformer.Informer().Run(ctx.Done())

for !serviceInformer.Informer().HasSynced() {
select {
case <-ctx.Done():
return nil, fmt.Errorf("failed to wait service informer synced")
default:
time.Sleep(time.Second)
}
}

serviceLister := serviceInformer.Lister()

cm.SetAuthenticationInfoResolver(authInfoResolver)
cm.SetServiceResolver(apiserver.NewClusterIPServiceResolver(serviceLister))

Expand Down
14 changes: 7 additions & 7 deletions pkg/resourceinterpreter/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
corev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
Expand Down Expand Up @@ -51,16 +51,16 @@ type ResourceInterpreter interface {
}

// NewResourceInterpreter builds a new ResourceInterpreter object.
func NewResourceInterpreter(informer genericmanager.SingleClusterInformerManager, config *rest.Config) ResourceInterpreter {
func NewResourceInterpreter(informer genericmanager.SingleClusterInformerManager, serviceLister corev1.ServiceLister) ResourceInterpreter {
return &customResourceInterpreterImpl{
informer: informer,
config: config,
informer: informer,
serviceLister: serviceLister,
}
}

type customResourceInterpreterImpl struct {
informer genericmanager.SingleClusterInformerManager
config *rest.Config
informer genericmanager.SingleClusterInformerManager
serviceLister corev1.ServiceLister

customizedInterpreter *customizedinterpreter.CustomizedInterpreter
defaultInterpreter *defaultinterpreter.DefaultInterpreter
Expand All @@ -71,7 +71,7 @@ type customResourceInterpreterImpl struct {
func (i *customResourceInterpreterImpl) Start(ctx context.Context) (err error) {
klog.Infof("Starting custom resource interpreter.")

i.customizedInterpreter, err = customizedinterpreter.NewCustomizedInterpreter(ctx, i.informer, i.config)
i.customizedInterpreter, err = customizedinterpreter.NewCustomizedInterpreter(i.informer, i.serviceLister)
if err != nil {
return
}
Expand Down

0 comments on commit 8d1eb7a

Please sign in to comment.