Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add serviceImport's ips #3832

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,15 @@ func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool,
}

func startServiceImportController(ctx controllerscontext.Context) (enabled bool, err error) {
sharedFactory := informers.NewSharedInformerFactory(ctx.KubeClientSet, 0)
serviceLister := sharedFactory.Core().V1().Services().Lister()
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(ctx.DynamicClientSet, 0, nil)
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
serviceImportController := &mcs.ServiceImportController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ResourceInterpreter: resourceInterpreter,
}
if err := serviceImportController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand Down
177 changes: 177 additions & 0 deletions pkg/controllers/mcs/service_import_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,31 @@ package mcs

import (
"context"
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)

// ServiceImportControllerName is the controller name that will be used when reporting events.
Expand All @@ -26,6 +36,167 @@ const ServiceImportControllerName = "service-import-controller"
type ServiceImportController struct {
client.Client
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
// ResourceInterpreter knows the details of resource structure.
ResourceInterpreter resourceinterpreter.ResourceInterpreter
}

func (c *ServiceImportController) getSvcClusterNameFromControlSvc(svc *corev1.Service) ([]string, error) {
binding := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name + "-" + strings.ToLower(svc.Kind),
Namespace: svc.Namespace,
},
}
if bindErr := c.Get(context.TODO(), client.ObjectKey{Name: binding.Name, Namespace: binding.Namespace}, binding); bindErr != nil {
return nil, bindErr
}

var cNames []string
for _, cluster := range binding.Spec.Clusters {
name := cluster.Name
cNames = append(cNames, name)
}

if len(cNames) == 0 {
return nil, fmt.Errorf("binding.Spec.Clusters is nil")
}
return cNames, nil
}

func (c *ServiceImportController) setSvcImportIpsFromSvc(svcImport *mcsv1alpha1.ServiceImport) error {
derivedSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: names.GenerateDerivedServiceName(svcImport.Name),
Namespace: svcImport.Namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
}

clusterNames, err := c.getServiceImportPropagateClusterNames(derivedSvc)
if err != nil {
klog.Errorf("Failed to get derived svc %s/%s propagate clusters, err: %v", derivedSvc.Namespace, derivedSvc.Name, err)
return err
}

for _, cName := range clusterNames {
if setErr := c.setClusterIPToMemberServiceImport(derivedSvc, svcImport, cName); setErr != nil {
klog.Errorf("Failed to set %s serviceImport %s/%s clusterIP, %v", cName, derivedSvc.Namespace, derivedSvc.Name, setErr)
return setErr
}
}

return nil
}

func (c *ServiceImportController) getServiceImportPropagateClusterNames(derivedSvc *corev1.Service) ([]string, error) {
clusterNames, clusterErr := c.getSvcClusterNameFromControlSvc(derivedSvc)
if clusterErr != nil {
return nil, clusterErr
}
return clusterNames, nil
}

func (c *ServiceImportController) getClusterIPFromClusterDerivedSvs(derivedSvc *corev1.Service, clusterDynamicClient *util.DynamicClusterClient) (string, error) {
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, schema.FromAPIVersionAndKind(derivedSvc.APIVersion, derivedSvc.Kind))
if err != nil {
return "", err
}

var svcLoad *unstructured.Unstructured
var getErr error
if waitErr := wait.PollUntilContextTimeout(context.TODO(), 1*time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) {
svcLoad, getErr = clusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(derivedSvc.Namespace).Get(context.TODO(), derivedSvc.Name, metav1.GetOptions{})
if getErr != nil {
klog.Warningf("Failed to get derived service %s/%s from cluster, err: %v", derivedSvc.Namespace, derivedSvc.Name, err)
return false, nil
}
return true, nil
}); waitErr != nil {
klog.Errorf("Failed to get derived service %s/%s from cluster, err: %v", derivedSvc.Namespace, derivedSvc.Name, err)
return "", waitErr
}

clusterIP, _, err := unstructured.NestedString(svcLoad.Object, "spec", "clusterIP")
if err != nil {
return "", err
}
return clusterIP, nil
}

func (c *ServiceImportController) updateClusterServiceImport(svcImport *mcsv1alpha1.ServiceImport, clusterIP string, clusterDynamicClient *util.DynamicClusterClient) error {
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, schema.FromAPIVersionAndKind(svcImport.APIVersion, svcImport.Kind))
if err != nil {
return err
}

var clusterSvcImportObj *unstructured.Unstructured
var setErr error
if err := wait.PollUntilContextTimeout(context.TODO(), 1*time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) {
if clusterSvcImportObj, setErr = clusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(svcImport.Namespace).Get(context.TODO(),
svcImport.Name, metav1.GetOptions{}); setErr != nil {
klog.Warningf("Failed to get service import %s/%s from cluster, err: %v", svcImport.Namespace, svcImport.Name, err)
return false, nil
}
return true, nil
}); err != nil {
klog.Errorf("Failed to get service import %s/%s from cluster, err: %v", svcImport.Namespace, svcImport.Name, err)
return err
}

unstructuredObj, addErr := c.addUnstructSvcImportClusterIP(clusterSvcImportObj, clusterIP)
if addErr != nil {
return addErr
}

if _, err = clusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(unstructuredObj.GetNamespace()).Update(context.TODO(), unstructuredObj, metav1.UpdateOptions{}); err != nil {
zishen marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return nil
}

func (c *ServiceImportController) addUnstructSvcImportClusterIP(clusterSvcImportObj *unstructured.Unstructured, clusterIP string) (*unstructured.Unstructured, error) {
nameSpace := clusterSvcImportObj.GetNamespace()
name := clusterSvcImportObj.GetName()

ips, _, err := unstructured.NestedStringSlice(clusterSvcImportObj.Object, "spec", "ips")
if err != nil {
return nil, fmt.Errorf("error get ips from serviceImport: %w", err)
}
if len(ips) != 0 {
klog.Warningf("Service import %s/%s already has ips: %v", nameSpace, name, ips)
return clusterSvcImportObj, nil
}

var data []interface{}
data = append(data, clusterIP)
// !ok could indicate that a cluster ip was not assigned
err = unstructured.SetNestedSlice(clusterSvcImportObj.Object, data, "spec", "ips")
if err != nil {
return nil, fmt.Errorf("error setting ips for serviceImport: %w", err)
}
return clusterSvcImportObj, nil
}

func (c *ServiceImportController) setClusterIPToMemberServiceImport(derivedSvc *corev1.Service, svcImport *mcsv1alpha1.ServiceImport, clusterName string) error {
clusterDynamicClient, err := util.NewClusterDynamicClientSet(clusterName, c.Client)
if err != nil {
return err
}

clusterIP, getClusterIPErr := c.getClusterIPFromClusterDerivedSvs(derivedSvc, clusterDynamicClient)
if getClusterIPErr != nil {
return getClusterIPErr
}

if updateClusterIPErr := c.updateClusterServiceImport(svcImport, clusterIP, clusterDynamicClient); updateClusterIPErr != nil {
return updateClusterIPErr
}
klog.Infof("Set clusterIP(%s) to %s serviceImport %s/%s successfully.", clusterIP, clusterName, svcImport.Namespace, svcImport.Name)
return nil
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand All @@ -49,6 +220,12 @@ func (c *ServiceImportController) Reconcile(ctx context.Context, req controllerr
c.EventRecorder.Eventf(svcImport, corev1.EventTypeWarning, events.EventReasonSyncDerivedServiceFailed, err.Error())
return controllerruntime.Result{Requeue: true}, err
}

if setErr := c.setSvcImportIpsFromSvc(svcImport); setErr != nil {
c.EventRecorder.Eventf(svcImport, corev1.EventTypeWarning, events.EventReasonSetServiceImportIPsFailed, setErr.Error())
return controllerruntime.Result{Requeue: true}, setErr
}

c.EventRecorder.Eventf(svcImport, corev1.EventTypeNormal, events.EventReasonSyncDerivedServiceSucceed, "Sync derived service for serviceImport(%s) succeed.", svcImport.Name)
return controllerruntime.Result{}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,6 @@ const (
EventReasonSyncDerivedServiceSucceed = "SyncDerivedServiceSucceed"
// EventReasonSyncDerivedServiceFailed indicates that sync derived service failed.
EventReasonSyncDerivedServiceFailed = "SyncDerivedServiceFailed"
// EventReasonSetServiceImportIPsFailed indicates that set serviceImport's ips failed.
EventReasonSetServiceImportIPsFailed = "SetServiceImportIPsFailed"
)
21 changes: 21 additions & 0 deletions pkg/resourceinterpreter/default/native/retain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
Expand All @@ -24,6 +25,7 @@ func getAllDefaultRetentionInterpreter() map[schema.GroupVersionKind]retentionIn
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = retainPersistentVolumeClaimFields
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeKind)] = retainPersistentVolumeFields
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = retainJobSelectorFields
s[mcsv1alpha1.SchemeGroupVersion.WithKind(util.ServiceImportKind)] = retainServiceImportIPsFields
return s
}

Expand Down Expand Up @@ -122,3 +124,22 @@ func retainJobSelectorFields(desired, observed *unstructured.Unstructured) (*uns
}
return desired, nil
}

func retainServiceImportIPsFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
ips, ok, err := unstructured.NestedStringSlice(observed.Object, "spec", "ips")
if err != nil {
return nil, fmt.Errorf("error retrieving ips from serviceImport: %w", err)
}
// !ok could indicate that ips was not assigned
if ok && len(ips) != 0 {
if len(ips) != 1 {
return nil, fmt.Errorf("serviceImport ips length(%d) over 1", len(ips))
}

err = unstructured.SetNestedSlice(desired.Object, []interface{}{ips[0]}, "spec", "ips")
if err != nil {
return nil, fmt.Errorf("error setting ips for serviceImport: %w", err)
}
}
return desired, nil
}
Loading