Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support one-to-many model for the leaf node feature #287

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, fmt.Errorf("new manager with err %v, cluster %s", err, cluster.Name)
}

leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.Root, mgr.GetClient(), c.RootClientset, leafClient)
leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.RootClientset, leafClient)
c.LeafModelHandler = leafModelHandler

nodes, err := c.createNode(ctx, cluster, leafClient)
nodes, leafNodeSelectors, err := c.createNode(ctx, cluster, leafClient)
if err != nil {
return reconcile.Result{RequeueAfter: RequeueTime}, fmt.Errorf("create node with err %v, cluster %s", err, cluster.Name)
}
Expand All @@ -206,7 +206,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
c.ManagerCancelFuncs[cluster.Name] = &cancel
c.ControllerManagersLock.Unlock()

if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafClient, kosmosClient, config); err != nil {
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
}

Expand Down Expand Up @@ -240,6 +240,7 @@ func (c *ClusterController) setupControllers(
cluster *kosmosv1alpha1.Cluster,
nodes []*corev1.Node,
clientDynamic *dynamic.DynamicClient,
leafNodeSelector map[string]kosmosv1alpha1.NodeSelector,
leafClientset kubernetes.Interface,
kosmosClient kosmosversioned.Interface,
leafRestConfig *rest.Config) error {
Expand All @@ -262,14 +263,15 @@ func (c *ClusterController) setupControllers(
Root: c.Root,
RootClientset: c.RootClientset,
Nodes: nodes,
LeafNodeSelectors: leafNodeSelector,
LeafModelHandler: c.LeafModelHandler,
Cluster: cluster,
}
if err := nodeResourcesController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", controllers.NodeResourcesControllerName, err)
}

nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, c.RootClientset, c.LeafModelHandler)
nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, leafNodeSelector, c.RootClientset, c.LeafModelHandler)
if err := mgr.Add(nodeLeaseController); err != nil {
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
}
Expand Down Expand Up @@ -333,19 +335,19 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, isOne2O
return nil
}

func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, error) {
func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) {
serverVersion, err := leafClient.Discovery().ServerVersion()
if err != nil {
klog.Errorf("create node failed, can not connect to leaf %s", cluster.Name)
return nil, err
return nil, nil, err
}

nodes, err := c.LeafModelHandler.CreateNodeInRoot(ctx, cluster, c.Options.ListenPort, serverVersion.GitVersion)
nodes, leafNodeSelectors, err := c.LeafModelHandler.CreateRootNode(ctx, c.Options.ListenPort, serverVersion.GitVersion)
if err != nil {
klog.Errorf("create node for cluster %s failed, err: %v", cluster.Name, err)
return nil, err
return nil, nil, err
}
return nodes, nil
return nodes, leafNodeSelectors, nil
}

func (c *ClusterController) deleteNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"

kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
)

Expand All @@ -38,19 +39,21 @@ type NodeLeaseController struct {
leaseInterval time.Duration
statusInterval time.Duration

nodes []*corev1.Node
nodeLock sync.Mutex
nodes []*corev1.Node
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
nodeLock sync.Mutex
}

func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
c := &NodeLeaseController{
leafClient: leafClient,
rootClient: rootClient,
root: root,
nodes: nodes,
LeafModelHandler: LeafModelHandler,
leaseInterval: getRenewInterval(),
statusInterval: DefaultNodeStatusUpdateInterval,
leafClient: leafClient,
rootClient: rootClient,
root: root,
nodes: nodes,
LeafModelHandler: LeafModelHandler,
LeafNodeSelectors: LeafNodeSelectors,
leaseInterval: getRenewInterval(),
statusInterval: DefaultNodeStatusUpdateInterval,
}
return c
}
Expand All @@ -71,15 +74,15 @@ func (c *NodeLeaseController) syncNodeStatus(ctx context.Context) {
}
c.nodeLock.Unlock()

err := c.updateNodeStatus(ctx, nodes)
err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors)
if err != nil {
klog.Errorf(err.Error())
}
}

// nolint
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node) error {
err := c.LeafModelHandler.UpdateNodeStatus(ctx, n)
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error {
err := c.LeafModelHandler.UpdateRootNodeStatus(ctx, n, leafNodeSelector)
if err != nil {
klog.Errorf("Could not update node status in root cluster,Error: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type NodeResourcesController struct {
GlobalLeafManager leafUtils.LeafResourceManager
RootClientset kubernetes.Interface

Nodes []*corev1.Node
LeafModelHandler leafUtils.LeafModelHandler
Cluster *kosmosv1alpha1.Cluster
EventRecorder record.EventRecorder
Nodes []*corev1.Node
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
LeafModelHandler leafUtils.LeafModelHandler
Cluster *kosmosv1alpha1.Cluster
EventRecorder record.EventRecorder
}

var predicatesFunc = predicate.Funcs{
Expand Down Expand Up @@ -110,15 +111,15 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
}, fmt.Errorf("cannot get node while update nodeInRoot resources %s, err: %v", rootNode.Name, err)
}

nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode)
nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
if err != nil {
klog.Errorf("Could not get node in leaf cluster %s,Error: %v", c.Cluster.Name, err)
return controllerruntime.Result{
RequeueAfter: RequeueTime,
}, err
}

pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode)
pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
if err != nil {
klog.Errorf("Could not list pod in leaf cluster %s,Error: %v", c.Cluster.Name, err)
return controllerruntime.Result{
Expand All @@ -130,7 +131,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
clone.Status.Conditions = utils.NodeConditions()

// Node2Node mode should sync leaf node's labels and annotations to root nodeInRoot
if c.LeafModelHandler.GetLeafModelType() == leafUtils.DispersionModel {
if c.LeafModelHandler.GetLeafMode() == leafUtils.Node {
getNode := func(nodes *corev1.NodeList) *corev1.Node {
for _, nodeInLeaf := range nodes.Items {
if nodeInLeaf.Name == rootNode.Name {
Expand All @@ -156,7 +157,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
}
}
}

// TODO ggregation Labels and Annotations for classificationModel
clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods)
clone.Status.Allocatable = clusterResources
clone.Status.Capacity = clusterResources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
"github.com/kosmos.io/kosmos/pkg/utils"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
// create pod in leaf
if err != nil {
if errors.IsNotFound(err) {
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod); err != nil {
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
klog.Errorf("create pod inleaf error, err: %s", err)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
} else {
Expand All @@ -212,7 +213,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req

// update pod in leaf
if podutils.ShouldEnqueue(leafPod, &rootpod) {
if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod); err != nil {
if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
}
}
Expand Down Expand Up @@ -700,7 +701,7 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea
return nil
}

func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) error {
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil {
// span.SetStatus(err)
return err
Expand All @@ -711,7 +712,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
return fmt.Errorf("clusternode info is nil , name: %s", pod.Spec.NodeName)
}

basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode == leafUtils.ALL)
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
klog.V(4).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name)

// create ns
Expand Down Expand Up @@ -765,24 +766,28 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
return nil
}

func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootpod *corev1.Pod, leafpod *corev1.Pod) error {
func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootPod *corev1.Pod, leafPod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
// TODO: update env
// TODO: update config secret pv pvc ...
klog.V(4).Infof("Updating pod %v/%+v", rootpod.Namespace, rootpod.Name)
klog.V(4).Infof("Updating pod %v/%+v", rootPod.Namespace, rootPod.Name)

if !podutils.IsKosmosPod(leafpod) {
if !podutils.IsKosmosPod(leafPod) {
klog.V(4).Info("Pod is not created by kosmos tree, ignore")
return nil
}
// not used
podutils.FitLabels(leafpod.ObjectMeta.Labels, lr.IgnoreLabels)
podCopy := leafpod.DeepCopy()
podutils.FitLabels(leafPod.ObjectMeta.Labels, lr.IgnoreLabels)
podCopy := leafPod.DeepCopy()
// util.GetUpdatedPod update PodCopy container image, annotations, labels.
// recover toleration, affinity, tripped ignore labels.
podutils.GetUpdatedPod(podCopy, rootpod, lr.IgnoreLabels)
if reflect.DeepEqual(leafpod.Spec, podCopy.Spec) &&
reflect.DeepEqual(leafpod.Annotations, podCopy.Annotations) &&
reflect.DeepEqual(leafpod.Labels, podCopy.Labels) {
clusterNodeInfo := r.GlobalLeafManager.GetClusterNode(rootPod.Spec.NodeName)
if clusterNodeInfo == nil {
return fmt.Errorf("clusternode info is nil , name: %s", rootPod.Spec.NodeName)
}
podutils.GetUpdatedPod(podCopy, rootPod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
if reflect.DeepEqual(leafPod.Spec, podCopy.Spec) &&
reflect.DeepEqual(leafPod.Annotations, podCopy.Annotations) &&
reflect.DeepEqual(leafPod.Labels, podCopy.Labels) {
return nil
}

Expand All @@ -798,7 +803,7 @@ func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leaf
if err != nil {
return fmt.Errorf("could not update pod: %v", err)
}
klog.V(4).Infof("Update pod %v/%+v success ", rootpod.Namespace, rootpod.Name)
klog.V(4).Infof("Update pod %v/%+v success ", rootPod.Namespace, rootPod.Name)
return nil
}

Expand Down
Loading
Loading