Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
Merge pull request #464 from Fei-Guo/master
Browse files Browse the repository at this point in the history
[Incubator][VC]Make DWS request only contain immutable fields
  • Loading branch information
k8s-ci-robot authored Feb 28, 2020
2 parents 67a3189 + 43e90b0 commit ac8554b
Show file tree
Hide file tree
Showing 19 changed files with 574 additions and 509 deletions.
2 changes: 2 additions & 0 deletions incubator/virtualcluster/pkg/syncer/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
restclient "k8s.io/client-go/rest"
clientgocache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -291,6 +292,7 @@ func (c *Cluster) Start() error {
func (c *Cluster) WaitForCacheSync() bool {
ca, err := c.getCache()
if err != nil {
klog.Errorf("Fail to get cache: %v", err)
return false
}
return ca.WaitForCacheSync(c.stopCh)
Expand Down
12 changes: 7 additions & 5 deletions incubator/virtualcluster/pkg/syncer/handler/enqueue_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,29 @@ type EnqueueRequestForObject struct {
Queue Queue
}

func (e *EnqueueRequestForObject) enqueue(obj interface{}, event reconciler.EventType) {
func (e *EnqueueRequestForObject) enqueue(obj interface{}) {
o, err := meta.Accessor(obj)
if err != nil {
return
}

r := reconciler.Request{Cluster: e.Cluster, Event: event, Obj: obj}
r := reconciler.Request{}
r.ClusterName = e.Cluster.Name
r.Namespace = o.GetNamespace()
r.Name = o.GetName()
r.UID = string(o.GetUID())

e.Queue.Add(r)
}

func (e *EnqueueRequestForObject) OnAdd(obj interface{}) {
e.enqueue(obj, reconciler.AddEvent)
e.enqueue(obj)
}

func (e *EnqueueRequestForObject) OnUpdate(oldObj, newObj interface{}) {
e.enqueue(newObj, reconciler.UpdateEvent)
e.enqueue(newObj)
}

func (e *EnqueueRequestForObject) OnDelete(obj interface{}) {
e.enqueue(obj, reconciler.DeleteEvent)
e.enqueue(obj)
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,12 @@ func (c *MultiClusterController) RequeueObject(clusterName string, obj interface
if cluster == nil {
return fmt.Errorf("could not find cluster %s", clusterName)
}
r := reconciler.Request{Cluster: cluster.GetClientInfo(), Event: event, Obj: obj}
//FIXME: we dont need event here.
r := reconciler.Request{}
r.ClusterName = clusterName
r.Namespace = o.GetNamespace()
r.Name = o.GetName()
r.UID = string(o.GetUID())

c.Queue.Add(r)
return nil
Expand Down Expand Up @@ -369,9 +372,9 @@ func (c *MultiClusterController) processNextWorkItem() bool {
// Return true, don't take a break
return true
}
if c.getCluster(req.Cluster.Name) == nil {
if c.getCluster(req.ClusterName) == nil {
// The virtual cluster has been removed, do not reconcile for its dws requests.
klog.Warningf("The cluster %s has been removed, drop the dws request %v", req.Cluster.Name, req)
klog.Warningf("The cluster %s has been removed, drop the dws request %v", req.ClusterName, req)
c.Queue.Forget(obj)
return true
}
Expand Down
10 changes: 3 additions & 7 deletions incubator/virtualcluster/pkg/syncer/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,13 @@ const (
)

// Request contains the information needed by a multicluster Reconciler to Reconcile:
// a context, namespace, and name.
// It ONLY contains the meta that can uniquely identify an object without any state information which can lead to parallel reconcile.
type Request struct {
Cluster *ClusterInfo
ClusterName string
types.NamespacedName
Event EventType
Obj interface{}
UID string
}

// Result is the return type of a Reconciler's Reconcile method.
// By default, the Request is forgotten after it's been processed,
// but you can also requeue it immediately, or after some time.
type Result reconcile.Result

// Reconciler is the interface used by a Controller to reconcile.
Expand Down
76 changes: 40 additions & 36 deletions incubator/virtualcluster/pkg/syncer/resources/configmap/dws.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,62 +39,68 @@ func (c *controller) StartDWS(stopCh <-chan struct{}) error {

// The reconcile logic for tenant master configMap informer
func (c *controller) Reconcile(request reconciler.Request) (reconciler.Result, error) {
klog.V(4).Infof("reconcile configmap %s/%s %s event for cluster %s", request.Namespace, request.Name, request.Event, request.Cluster.Name)
klog.V(4).Infof("reconcile configmap %s/%s event for cluster %s", request.Namespace, request.Name, request.ClusterName)

switch request.Event {
case reconciler.AddEvent:
err := c.reconcileConfigMapCreate(request.Cluster.Name, request.Namespace, request.Name, request.Obj.(*v1.ConfigMap))
targetNamespace := conversion.ToSuperMasterNamespace(request.ClusterName, request.Namespace)
pConfigMap, err := c.configMapLister.ConfigMaps(targetNamespace).Get(request.Name)
pExists := true
if err != nil {
if !errors.IsNotFound(err) {
return reconciler.Result{Requeue: true}, err
}
pExists = false
}
vExists := true
vConfigMapObj, err := c.multiClusterConfigMapController.Get(request.ClusterName, request.Namespace, request.Name)
if err != nil {
if !errors.IsNotFound(err) {
return reconciler.Result{Requeue: true}, err
}
vExists = false
}

if vExists && !pExists {
vConfigMap := vConfigMapObj.(*v1.ConfigMap)
err := c.reconcileConfigMapCreate(request.ClusterName, targetNamespace, vConfigMap)
if err != nil {
klog.Errorf("failed reconcile configmap %s/%s CREATE of cluster %s %v", request.Namespace, request.Name, request.Cluster.Name, err)
return reconciler.Result{Requeue: true}, nil
klog.Errorf("failed reconcile configmap %s/%s CREATE of cluster %s %v", request.Namespace, request.Name, request.ClusterName, err)
return reconciler.Result{Requeue: true}, err
}
case reconciler.UpdateEvent:
err := c.reconcileConfigMapUpdate(request.Cluster.Name, request.Namespace, request.Name, request.Obj.(*v1.ConfigMap))
} else if !vExists && pExists {
err := c.reconcileConfigMapRemove(request.ClusterName, targetNamespace, request.Name)
if err != nil {
klog.Errorf("failed reconcile configmap %s/%s UPDATE of cluster %s %v", request.Namespace, request.Name, request.Cluster.Name, err)
klog.Errorf("failed reconcile configmap %s/%s DELETE of cluster %s %v", request.Namespace, request.Name, request.ClusterName, err)
return reconciler.Result{Requeue: true}, err
}
case reconciler.DeleteEvent:
err := c.reconcileConfigMapRemove(request.Cluster.Name, request.Namespace, request.Name)
} else if vExists && pExists {
vConfigMap := vConfigMapObj.(*v1.ConfigMap)
err := c.reconcileConfigMapUpdate(request.ClusterName, targetNamespace, pConfigMap, vConfigMap)
if err != nil {
klog.Errorf("failed reconcile configmap %s/%s DELETE of cluster %s %v", request.Namespace, request.Name, request.Cluster.Name, err)
klog.Errorf("failed reconcile configmap %s/%s UPDATE of cluster %s %v", request.Namespace, request.Name, request.ClusterName, err)
return reconciler.Result{Requeue: true}, err
}
} else {
// object is gone.
}
return reconciler.Result{}, nil
}

func (c *controller) reconcileConfigMapCreate(cluster, namespace, name string, configMap *v1.ConfigMap) error {
targetNamespace := conversion.ToSuperMasterNamespace(cluster, namespace)
_, err := c.configMapLister.ConfigMaps(targetNamespace).Get(name)
if err == nil {
return c.reconcileConfigMapUpdate(cluster, namespace, name, configMap)
}

newObj, err := conversion.BuildMetadata(cluster, targetNamespace, configMap)
func (c *controller) reconcileConfigMapCreate(clusterName, targetNamespace string, configMap *v1.ConfigMap) error {
newObj, err := conversion.BuildMetadata(clusterName, targetNamespace, configMap)
if err != nil {
return err
}

_, err = c.configMapClient.ConfigMaps(targetNamespace).Create(newObj.(*v1.ConfigMap))
if errors.IsAlreadyExists(err) {
klog.Infof("configmap %s/%s of cluster %s already exist in super master", namespace, name, cluster)
klog.Infof("configmap %s/%s of cluster %s already exist in super master", targetNamespace, configMap.Name, clusterName)
return nil
}
return err
}

func (c *controller) reconcileConfigMapUpdate(cluster, namespace, name string, vConfigMap *v1.ConfigMap) error {
targetNamespace := conversion.ToSuperMasterNamespace(cluster, namespace)
pConfigMap, err := c.configMapLister.ConfigMaps(targetNamespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}

spec, err := c.multiClusterConfigMapController.GetSpec(cluster)
func (c *controller) reconcileConfigMapUpdate(clusterName, targetNamespace string, pConfigMap, vConfigMap *v1.ConfigMap) error {
spec, err := c.multiClusterConfigMapController.GetSpec(clusterName)
if err != nil {
return err
}
Expand All @@ -105,18 +111,16 @@ func (c *controller) reconcileConfigMapUpdate(cluster, namespace, name string, v
return err
}
}

return nil
}

func (c *controller) reconcileConfigMapRemove(cluster, namespace, name string) error {
targetNamespace := conversion.ToSuperMasterNamespace(cluster, namespace)
func (c *controller) reconcileConfigMapRemove(clusterName, targetNamespace, name string) error {
opts := &metav1.DeleteOptions{
PropagationPolicy: &constants.DefaultDeletionPolicy,
}
err := c.configMapClient.ConfigMaps(targetNamespace).Delete(name, opts)
if errors.IsNotFound(err) {
klog.Warningf("configmap %s/%s of cluster %s not found in super master", namespace, name, cluster)
klog.Warningf("configmap %s/%s of cluster %s not found in super master", targetNamespace, name, clusterName)
return nil
}
return err
Expand Down
73 changes: 38 additions & 35 deletions incubator/virtualcluster/pkg/syncer/resources/endpoints/dws.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,39 +43,53 @@ func (c *controller) Reconcile(request reconciler.Request) (reconciler.Result, e
// For now, we bypass all ep events beside the default kubernetes ep. The tenant/master ep controllers handle ep lifecycle independently.
return reconciler.Result{}, nil
}
klog.Infof("reconcile endpoints %s/%s %s event for cluster %s", request.Namespace, request.Name, request.Event, request.Cluster.Name)
klog.Infof("reconcile endpoints %s/%s for cluster %s", request.Namespace, request.Name, request.ClusterName)
targetNamespace := conversion.ToSuperMasterNamespace(request.ClusterName, request.Namespace)
pEndpoints, err := c.endpointsLister.Endpoints(targetNamespace).Get(request.Name)
pExists := true
if err != nil {
if !errors.IsNotFound(err) {
return reconciler.Result{Requeue: true}, err
}
pExists = false
}
vExists := true
vEndpointsObj, err := c.multiClusterEndpointsController.Get(request.ClusterName, request.Namespace, request.Name)
if err != nil {
if !errors.IsNotFound(err) {
return reconciler.Result{Requeue: true}, err
}
vExists = false
}

switch request.Event {
case reconciler.AddEvent:
err := c.reconcileEndpointsCreate(request.Cluster.Name, request.Namespace, request.Name, request.Obj.(*v1.Endpoints))
if vExists && !pExists {
vEndpoints := vEndpointsObj.(*v1.Endpoints)
err := c.reconcileEndpointsCreate(request.ClusterName, targetNamespace, vEndpoints)
if err != nil {
klog.Errorf("failed reconcile endpoints %s/%s CREATE of cluster %s %v", request.Namespace, request.Name, request.Cluster.Name, err)
klog.Errorf("failed reconcile endpoints %s/%s CREATE of cluster %s %v", request.Namespace, request.Name, request.ClusterName, err)
return reconciler.Result{Requeue: true}, err
}
case reconciler.UpdateEvent:
err := c.reconcileEndpointsUpdate(request.Cluster.Name, request.Namespace, request.Name, request.Obj.(*v1.Endpoints))
} else if !vExists && pExists {
err := c.reconcileEndpointsRemove(request.ClusterName, targetNamespace, request.Name)
if err != nil {
klog.Errorf("failed reconcile endpoints %s/%s CREATE of cluster %s %v", request.Namespace, request.Name, request.Cluster.Name, err)
klog.Errorf("failed reconcile endpoints %s/%s DELETE of cluster %s %v", request.Namespace, request.Name, request.ClusterName, err)
return reconciler.Result{Requeue: true}, err
}
case reconciler.DeleteEvent:
err := c.reconcileEndpointsRemove(request.Cluster.Name, request.Namespace, request.Name, request.Obj.(*v1.Endpoints))
} else if vExists && pExists {
vEndpoints := vEndpointsObj.(*v1.Endpoints)
err := c.reconcileEndpointsUpdate(request.ClusterName, targetNamespace, pEndpoints, vEndpoints)
if err != nil {
klog.Errorf("failed reconcile endpoints %s/%s DELETE of cluster %s %v", request.Namespace, request.Name, request.Cluster.Name, err)
klog.Errorf("failed reconcile endpoints %s/%s UPDATE of cluster %s %v", request.Namespace, request.Name, request.ClusterName, err)
return reconciler.Result{Requeue: true}, err
}
} else {
// object is gone.
}
return reconciler.Result{}, nil
}

func (c *controller) reconcileEndpointsCreate(cluster, namespace, name string, ep *v1.Endpoints) error {
targetNamespace := conversion.ToSuperMasterNamespace(cluster, namespace)
_, err := c.endpointsLister.Endpoints(targetNamespace).Get(name)
if err == nil {
return c.reconcileEndpointsUpdate(cluster, namespace, name, ep)
}

newObj, err := conversion.BuildMetadata(cluster, targetNamespace, ep)
func (c *controller) reconcileEndpointsCreate(clusterName, targetNamespace string, ep *v1.Endpoints) error {
newObj, err := conversion.BuildMetadata(clusterName, targetNamespace, ep)
if err != nil {
return err
}
Expand All @@ -84,23 +98,14 @@ func (c *controller) reconcileEndpointsCreate(cluster, namespace, name string, e

_, err = c.endpointClient.Endpoints(targetNamespace).Create(pEndpoints)
if errors.IsAlreadyExists(err) {
klog.Infof("endpoints %s/%s of cluster %s already exist in super master", namespace, name, cluster)
klog.Infof("endpoints %s/%s of cluster %s already exist in super master", targetNamespace, pEndpoints.Name, clusterName)
return nil
}
return err
}

func (c *controller) reconcileEndpointsUpdate(cluster, namespace, name string, vEP *v1.Endpoints) error {
targetNamespace := conversion.ToSuperMasterNamespace(cluster, namespace)
pEP, err := c.endpointsLister.Endpoints(targetNamespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}

spec, err := c.multiClusterEndpointsController.GetSpec(cluster)
func (c *controller) reconcileEndpointsUpdate(clusterName, targetNamespace string, pEP, vEP *v1.Endpoints) error {
spec, err := c.multiClusterEndpointsController.GetSpec(clusterName)
if err != nil {
return err
}
Expand All @@ -111,18 +116,16 @@ func (c *controller) reconcileEndpointsUpdate(cluster, namespace, name string, v
return err
}
}

return nil
}

func (c *controller) reconcileEndpointsRemove(cluster, namespace, name string, ep *v1.Endpoints) error {
targetNamespace := conversion.ToSuperMasterNamespace(cluster, namespace)
func (c *controller) reconcileEndpointsRemove(clusterName, targetNamespace, name string) error {
opts := &metav1.DeleteOptions{
PropagationPolicy: &constants.DefaultDeletionPolicy,
}
err := c.endpointClient.Endpoints(targetNamespace).Delete(name, opts)
if errors.IsNotFound(err) {
klog.Warningf("endpoints %s/%s of cluster not found in super master", namespace, name)
klog.Warningf("endpoints %s/%s of %s cluster not found in super master", targetNamespace, name, clusterName)
return nil
}
return err
Expand Down
Loading

0 comments on commit ac8554b

Please sign in to comment.