diff --git a/pkg/pillar/cmd/monitor/subscriptions.go b/pkg/pillar/cmd/monitor/subscriptions.go index ae92c77c15..4812f8c76b 100644 --- a/pkg/pillar/cmd/monitor/subscriptions.go +++ b/pkg/pillar/cmd/monitor/subscriptions.go @@ -416,29 +416,22 @@ func (ctx *monitor) process(ps *pubsub.PubSub) { watches := make([]pubsub.ChannelWatch, 0) for i := range ctx.subscriptions { sub := ctx.subscriptions[i] - watches = append(watches, pubsub.ChannelWatch{ - Chan: reflect.ValueOf(sub.MsgChan()), - Callback: func(value interface{}) { - change, ok := value.(pubsub.Change) - if !ok { - return - } - sub.ProcessChange(change) - }, - }) + watches = append(watches, pubsub.WatchAndProcessSubChanges(sub)) } watches = append(watches, pubsub.ChannelWatch{ Chan: reflect.ValueOf(stillRunning.C), - Callback: func(_ interface{}) { + Callback: func(_ interface{}) (exit bool) { ps.StillRunning(agentName, warningTime, errorTime) + return false }, }) watches = append(watches, pubsub.ChannelWatch{ Chan: reflect.ValueOf(ctx.clientConnected), - Callback: func(_ interface{}) { + Callback: func(_ interface{}) (exit bool) { ctx.handleClientConnected() + return false }, }) diff --git a/pkg/pillar/cmd/nim/nim.go b/pkg/pillar/cmd/nim/nim.go index e47b910f29..ac64e16dfa 100644 --- a/pkg/pillar/cmd/nim/nim.go +++ b/pkg/pillar/cmd/nim/nim.go @@ -85,6 +85,7 @@ type nim struct { subOnboardStatus pubsub.Subscription subWwanStatus pubsub.Subscription subNetworkInstanceConfig pubsub.Subscription + subEdgeNodeClusterStatus pubsub.Subscription // Publications pubDummyDevicePortConfig pubsub.Publication // For logging @@ -326,6 +327,9 @@ func (n *nim) run(ctx context.Context) (err error) { case change := <-n.subControllerCert.MsgChan(): n.subControllerCert.ProcessChange(change) + case change := <-n.subEdgeNodeClusterStatus.MsgChan(): + n.subEdgeNodeClusterStatus.ProcessChange(change) + case change := <-n.subEdgeNodeCert.MsgChan(): n.subEdgeNodeCert.ProcessChange(change) @@ -697,6 +701,23 @@ func (n *nim) initSubscriptions() (err error) { if err != nil { return err } + + // Subscribe to EdgeNodeClusterStatus to get the cluster interface and the cluster + // IP address which DPC Reconciler should assign statically. + n.subEdgeNodeClusterStatus, err = n.PubSub.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedkube", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeClusterStatus{}, + Activate: false, + CreateHandler: n.handleEdgeNodeClusterStatusCreate, + ModifyHandler: n.handleEdgeNodeClusterStatusModify, + DeleteHandler: n.handleEdgeNodeClusterStatusDelete, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + return err + } return nil } @@ -908,6 +929,23 @@ func (n *nim) handleNetworkInstanceUpdate() { n.dpcManager.UpdateFlowlogState(flowlogEnabled) } +func (n *nim) handleEdgeNodeClusterStatusCreate(_ interface{}, _ string, + statusArg interface{}) { + status := statusArg.(types.EdgeNodeClusterStatus) + n.dpcManager.UpdateClusterStatus(status) +} + +func (n *nim) handleEdgeNodeClusterStatusModify(_ interface{}, _ string, + statusArg, _ interface{}) { + status := statusArg.(types.EdgeNodeClusterStatus) + n.dpcManager.UpdateClusterStatus(status) +} + +func (n *nim) handleEdgeNodeClusterStatusDelete(_ interface{}, _ string, _ interface{}) { + // Apply empty cluster status, which effectively removes the cluster IP. + n.dpcManager.UpdateClusterStatus(types.EdgeNodeClusterStatus{}) +} + func (n *nim) isDeviceOnboarded() bool { obj, err := n.subOnboardStatus.Get("global") if err != nil { diff --git a/pkg/pillar/cmd/usbmanager/subscriptions.go b/pkg/pillar/cmd/usbmanager/subscriptions.go index 1e6c3bae0b..06515f01c7 100644 --- a/pkg/pillar/cmd/usbmanager/subscriptions.go +++ b/pkg/pillar/cmd/usbmanager/subscriptions.go @@ -18,22 +18,14 @@ func (usbCtx *usbmanagerContext) process(ps *pubsub.PubSub) { watches := make([]pubsub.ChannelWatch, 0) for i := range usbCtx.subscriptions { sub := usbCtx.subscriptions[i] - watches = append(watches, pubsub.ChannelWatch{ - Chan: reflect.ValueOf(sub.MsgChan()), - Callback: func(value interface{}) { - change, ok := value.(pubsub.Change) - if !ok { - return - } - sub.ProcessChange(change) - }, - }) + watches = append(watches, pubsub.WatchAndProcessSubChanges(sub)) } watches = append(watches, pubsub.ChannelWatch{ Chan: reflect.ValueOf(stillRunning.C), - Callback: func(_ interface{}) { + Callback: func(_ interface{}) (exit bool) { ps.StillRunning(agentName, warningTime, errorTime) + return false }, }) diff --git a/pkg/pillar/cmd/zedkube/applogs.go b/pkg/pillar/cmd/zedkube/applogs.go index 1cfa0234f3..2a17d37a48 100644 --- a/pkg/pillar/cmd/zedkube/applogs.go +++ b/pkg/pillar/cmd/zedkube/applogs.go @@ -8,6 +8,7 @@ package zedkube import ( "bufio" "context" + "fmt" "io" "regexp" "strings" @@ -16,32 +17,32 @@ import ( "github.com/lf-edge/eve/pkg/pillar/base" "github.com/lf-edge/eve/pkg/pillar/kubeapi" "github.com/lf-edge/eve/pkg/pillar/types" + uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// collectAppLogs - collect App logs from pods which covers both containers and virt-launcher pods -func collectAppLogs(ctx *zedkubeContext) { - sub := ctx.subAppInstanceConfig +// collect App logs from pods which covers both containers and virt-launcher pods +func (z *zedkube) collectAppLogs() { + sub := z.subAppInstanceConfig items := sub.GetAll() if len(items) == 0 { return } - if ctx.config == nil { - config, err := kubeapi.GetKubeConfig() - if err != nil { - return - } - ctx.config = config - } - clientset, err := kubernetes.NewForConfig(ctx.config) + clientset, err := getKubeClientSet() if err != nil { log.Errorf("collectAppLogs: can't get clientset %v", err) return } + err = z.getnodeNameAndUUID() + if err != nil { + log.Errorf("collectAppLogs: can't get edgeNodeInfo %v", err) + return + } + // "Thu Aug 17 05:39:04 UTC 2023" timestampRegex := regexp.MustCompile(`(\w{3} \w{3} \d{2} \d{2}:\d{2}:\d{2} \w+ \d{4})`) nowStr := time.Now().String() @@ -50,15 +51,35 @@ func collectAppLogs(ctx *zedkubeContext) { sinceSec = logcollectInterval for _, item := range items { aiconfig := item.(types.AppInstanceConfig) - contName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID) - + if aiconfig.FixedResources.VirtualizationMode != types.NOHYPER { + continue + } + if aiconfig.DesignatedNodeID != uuid.Nil && aiconfig.DesignatedNodeID.String() != z.nodeuuid { + continue + } + kubeName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID) + contName := kubeName opt := &corev1.PodLogOptions{} - if ctx.appLogStarted { + if z.appLogStarted { opt = &corev1.PodLogOptions{ SinceSeconds: &sinceSec, } } else { - ctx.appLogStarted = true + z.appLogStarted = true + } + + pods, err := clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", kubeName), + }) + if err != nil { + logrus.Errorf("checkReplicaSetMetrics: can't get pod %v", err) + continue + } + for _, pod := range pods.Items { + if strings.HasPrefix(pod.ObjectMeta.Name, kubeName) { + contName = pod.ObjectMeta.Name + break + } } req := clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).GetLogs(contName, opt) podLogs, err := req.Stream(context.Background()) @@ -84,7 +105,7 @@ func collectAppLogs(ctx *zedkubeContext) { timeStr = nowStr } // Process and print the log line here - aiLogger := ctx.appContainerLogger.WithFields(logrus.Fields{ + aiLogger := z.appContainerLogger.WithFields(logrus.Fields{ "appuuid": aiconfig.UUIDandVersion.UUID.String(), "containername": contName, "eventtime": timeStr, @@ -92,10 +113,99 @@ func collectAppLogs(ctx *zedkubeContext) { aiLogger.Infof("%s", logLine) } if scanner.Err() != nil { - if scanner.Err() != io.EOF { - log.Errorf("collectAppLogs: pod %s, scanner error %v", contName, scanner.Err()) + if scanner.Err() == io.EOF { + break // Break out of the loop when EOF is reached + } + } + } +} + +func (z *zedkube) checkAppsStatus() { + sub := z.subAppInstanceConfig + items := sub.GetAll() + if len(items) == 0 { + return + } + + err := z.getnodeNameAndUUID() + if err != nil { + log.Errorf("checkAppsStatus: can't get edgeNodeInfo %v", err) + return + } + + u, err := uuid.FromString(z.nodeuuid) + if err != nil { + return + } + + clientset, err := getKubeClientSet() + if err != nil { + log.Errorf("checkAppsStatus: can't get clientset %v", err) + return + } + + options := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s", z.nodeName), + } + pods, err := clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), options) + if err != nil { + log.Errorf("checkAppsStatus: can't get pods %v", err) + return + } + + pub := z.pubENClusterAppStatus + stItmes := pub.GetAll() + var oldStatus *types.ENClusterAppStatus + for _, item := range items { + aiconfig := item.(types.AppInstanceConfig) + if aiconfig.DesignatedNodeID == uuid.Nil { // if not for cluster app, skip + continue + } + encAppStatus := types.ENClusterAppStatus{ + AppUUID: aiconfig.UUIDandVersion.UUID, + IsDNidNode: aiconfig.DesignatedNodeID == u, + } + contName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID) + + for _, pod := range pods.Items { + contVMIName := "virt-launcher-" + contName + log.Functionf("checkAppsStatus: pod %s, cont %s", pod.Name, contName) + if strings.HasPrefix(pod.Name, contName) || strings.HasPrefix(pod.Name, contVMIName) { + encAppStatus.ScheduledOnThisNode = true + if pod.Status.Phase == corev1.PodRunning { + encAppStatus.StatusRunning = true + } + break + } + } + + for _, st := range stItmes { + aiStatus := st.(types.ENClusterAppStatus) + if aiStatus.AppUUID == aiconfig.UUIDandVersion.UUID { + oldStatus = &aiStatus + break } - break // Break out of the loop when EOF is reached or error occurs } + log.Functionf("checkAppsStatus: devname %s, pod (%d) status %+v, old %+v", z.nodeName, len(pods.Items), encAppStatus, oldStatus) + + if oldStatus == nil || oldStatus.IsDNidNode != encAppStatus.IsDNidNode || + oldStatus.ScheduledOnThisNode != encAppStatus.ScheduledOnThisNode || oldStatus.StatusRunning != encAppStatus.StatusRunning { + log.Functionf("checkAppsStatus: status differ, publish") + z.pubENClusterAppStatus.Publish(aiconfig.Key(), encAppStatus) + } + } +} + +func (z *zedkube) getnodeNameAndUUID() error { + if z.nodeuuid == "" || z.nodeName == "" { + NodeInfo, err := z.subEdgeNodeInfo.Get("global") + if err != nil { + log.Errorf("getnodeNameAndUUID: can't get edgeNodeInfo %v", err) + return err + } + enInfo := NodeInfo.(types.EdgeNodeInfo) + z.nodeName = strings.ToLower(enInfo.DeviceName) + z.nodeuuid = enInfo.DeviceID.String() } + return nil } diff --git a/pkg/pillar/cmd/zedkube/clusterstatus.go b/pkg/pillar/cmd/zedkube/clusterstatus.go new file mode 100644 index 0000000000..69ad5c1ef2 --- /dev/null +++ b/pkg/pillar/cmd/zedkube/clusterstatus.go @@ -0,0 +1,269 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build kubevirt + +package zedkube + +import ( + "context" + "fmt" + "net/http" + "reflect" + "time" + + "github.com/lf-edge/eve/pkg/pillar/cipher" + "github.com/lf-edge/eve/pkg/pillar/types" + "github.com/lf-edge/eve/pkg/pillar/utils/netutils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func handleDNSCreate(ctxArg interface{}, _ string, statusArg interface{}) { + z := ctxArg.(*zedkube) + dns := statusArg.(types.DeviceNetworkStatus) + z.applyDNS(dns) +} + +func handleDNSModify(ctxArg interface{}, _ string, statusArg interface{}, _ interface{}) { + z := ctxArg.(*zedkube) + dns := statusArg.(types.DeviceNetworkStatus) + z.applyDNS(dns) +} + +func (z *zedkube) applyDNS(dns types.DeviceNetworkStatus) { + z.deviceNetworkStatus = dns + changed := z.updateClusterIPReadiness() + if changed { + if z.clusterIPIsReady { + if z.statusServer == nil { + z.startClusterStatusServer() + } + } else { + if z.statusServer != nil { + z.stopClusterStatusServer() + } + } + z.publishKubeConfigStatus() + } +} + +func (z *zedkube) applyClusterConfig(config, oldconfig *types.EdgeNodeClusterConfig) { + noChange := reflect.DeepEqual(config, oldconfig) + if noChange { + log.Noticef("getKubeConfig: no change in cluster config") + return + } + if config == nil { + // Before we let NIM to remove the cluster IP, we need to remove the node + // from the cluster. + z.stopClusterStatusServer() + z.clusterConfig = types.EdgeNodeClusterConfig{} + return + } else { + clusterIPChanged := !netutils.EqualIPNets(z.clusterConfig.ClusterIPPrefix, + config.ClusterIPPrefix) + z.clusterConfig = *config + if clusterIPChanged { + z.stopClusterStatusServer() + z.updateClusterIPReadiness() + if z.clusterIPIsReady { + z.startClusterStatusServer() + } + } + } + z.publishKubeConfigStatus() +} + +// publishKubeConfigStatus publishes the cluster config status +func (z *zedkube) publishKubeConfigStatus() { + status := types.EdgeNodeClusterStatus{ + ClusterName: z.clusterConfig.ClusterName, + ClusterID: z.clusterConfig.ClusterID, + ClusterInterface: z.clusterConfig.ClusterInterface, + ClusterIPPrefix: z.clusterConfig.ClusterIPPrefix, + ClusterIPIsReady: z.clusterIPIsReady, + IsWorkerNode: z.clusterConfig.IsWorkerNode, + JoinServerIP: z.clusterConfig.JoinServerIP, + BootstrapNode: z.clusterConfig.BootstrapNode, + } + + if z.clusterConfig.CipherToken.IsCipher { + decToken, err := z.decryptClusterToken() + if err != nil { + log.Errorf("publishKubeConfigStatus: failed to decrypt cluster token: %v", err) + status.Error = types.ErrorDescription{ + Error: err.Error(), + ErrorTime: time.Now(), + } + } else { + status.EncryptedClusterToken = decToken + } + } else { + log.Errorf("publishKubeConfigStatus: cluster token is not from configitme or encrypted") + } + // publish the cluster status for the kube container + z.pubEdgeNodeClusterStatus.Publish("global", status) +} + +func (z *zedkube) decryptClusterToken() (string, error) { + if !z.clusterConfig.CipherToken.IsCipher { + return "", fmt.Errorf("decryptClusterToken: cluster token is not encrypted") + } + + decryptAvailable := z.subControllerCert != nil && z.subEdgeNodeCert != nil + if !decryptAvailable { + return "", fmt.Errorf("decryptClusterToken: certificates are not available") + } + status, decBlock, err := cipher.GetCipherCredentials( + &cipher.DecryptCipherContext{ + Log: log, + AgentName: agentName, + AgentMetrics: z.cipherMetrics, + PubSubControllerCert: z.subControllerCert, + PubSubEdgeNodeCert: z.subEdgeNodeCert, + }, + z.clusterConfig.CipherToken) + if z.pubCipherBlockStatus != nil { + err2 := z.pubCipherBlockStatus.Publish(status.Key(), status) + if err2 != nil { + return "", fmt.Errorf("decryptClusterToken: publish failed %v", err2) + } + } + if err != nil { + z.cipherMetrics.RecordFailure(log, types.DecryptFailed) + return "", fmt.Errorf("decryptClusterToken: failed to decrypt cluster token: %v", err) + } + + err = z.cipherMetrics.Publish(log, z.pubCipherMetrics, "global") + if err != nil { + log.Errorf("decryptClusterToken: publish failed for cipher metrics: %v", err) + return "", fmt.Errorf("decryptClusterToken: failed to publish cipher metrics: %v", err) + } + + return decBlock.ClusterToken, nil +} + +func (z *zedkube) updateClusterIPReadiness() (changed bool) { + var ready bool + haveClusterIPConfig := z.clusterConfig.ClusterInterface != "" && + z.clusterConfig.ClusterIPPrefix != nil + if haveClusterIPConfig { + for _, port := range z.deviceNetworkStatus.Ports { + if port.InvalidConfig || port.IfName == "" { + continue + } + if port.Logicallabel != z.clusterConfig.ClusterInterface { + continue + } + for _, addr := range port.AddrInfoList { + if addr.Addr.Equal(z.clusterConfig.ClusterIPPrefix.IP) { + ready = true + break + } + } + if ready { + break + } + } + } + if z.clusterIPIsReady != ready { + z.clusterIPIsReady = ready + return true + } + return false +} + +func (z *zedkube) startClusterStatusServer() { + if z.statusServer != nil { + // Already running. + return + } + mux := http.NewServeMux() + mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { + z.clusterStatusHTTPHandler(w, r) + }) + z.statusServer = &http.Server{ + // Listen on the ClusterIPPrefix IP and the ClusterStatusPort + // the firewall rule is explicitly added to allow traffic to this port in kubevirt + // this is documented in pkg/pillar/docs/zedkube.md section "Cluster Status Server" + Addr: z.clusterConfig.ClusterIPPrefix.IP.String() + ":" + types.ClusterStatusPort, + Handler: mux, + } + z.statusServerWG.Add(1) + + // Start the server in a goroutine + go func() { + defer z.statusServerWG.Done() + if err := z.statusServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf("Cluster status server ListenAndServe failed: %v", err) + } + log.Noticef("Cluster status server stopped") + }() +} + +func (z *zedkube) stopClusterStatusServer() { + if z.statusServer == nil { + return + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := z.statusServer.Shutdown(shutdownCtx); err != nil { + log.Errorf("Cluster status server shutdown failed: %v", err) + } else { + log.Noticef("Cluster status server shutdown completed") + } + + // Wait for the server goroutine to finish + z.statusServerWG.Wait() + z.statusServer = nil + log.Noticef("Cluster status server goroutine has stopped") +} + +func (z *zedkube) clusterStatusHTTPHandler(w http.ResponseWriter, r *http.Request) { + // Check if the request method is GET + if r.Method != http.MethodGet { + // Respond with 405 Method Not Allowed if the method is not GET + w.WriteHeader(http.StatusMethodNotAllowed) + w.Write([]byte("405 - Method Not Allowed")) + return + } + + clientset, err := getKubeClientSet() + if err != nil { + log.Errorf("clusterStatusHTTPHandler: can't get clientset %v", err) + fmt.Fprint(w, "") + return + } + + err = z.getnodeNameAndUUID() + if err != nil { + log.Errorf("clusterStatusHTTPHandler: Error getting nodeName and nodeUUID") + fmt.Fprint(w, "") + return + } + + node, err := clientset.CoreV1().Nodes().Get(context.Background(), z.nodeName, metav1.GetOptions{}) + if err != nil { + log.Errorf("clusterStatusHTTPHandler: can't get node %v, for %s", err, z.nodeName) + fmt.Fprint(w, "") + return + } + + var isMaster, useEtcd bool + labels := node.GetLabels() + if _, ok := labels["node-role.kubernetes.io/master"]; ok { + isMaster = true + } + if _, ok := labels["node-role.kubernetes.io/etcd"]; ok { + useEtcd = true + } + + if isMaster && useEtcd { + fmt.Fprint(w, "cluster") + return + } + log.Functionf("clusterStatusHTTPHandler: not master or etcd") + fmt.Fprint(w, "") +} diff --git a/pkg/pillar/cmd/zedkube/etherpassthrough.go b/pkg/pillar/cmd/zedkube/etherpassthrough.go index 7a5b804b2b..f53660410f 100644 --- a/pkg/pillar/cmd/zedkube/etherpassthrough.go +++ b/pkg/pillar/cmd/zedkube/etherpassthrough.go @@ -14,7 +14,7 @@ import ( ) // checkIoAdapterEthernet - check and create NAD for direct-attached ethernet -func checkIoAdapterEthernet(ctx *zedkubeContext, aiConfig *types.AppInstanceConfig) error { +func (z *zedkube) checkIoAdapterEthernet(aiConfig *types.AppInstanceConfig) error { if aiConfig.FixedResources.VirtualizationMode != types.NOHYPER { return nil @@ -23,14 +23,14 @@ func checkIoAdapterEthernet(ctx *zedkubeContext, aiConfig *types.AppInstanceConf for _, io := range ioAdapter { if io.Type == types.IoNetEth { nadname := "host-" + io.Name - _, ok := ctx.networkInstanceStatusMap.Load(nadname) + _, ok := z.networkInstanceStatusMap.Load(nadname) if !ok { bringupInterface(io.Name) - err := ioEtherCreate(ctx, &io) + err := z.ioEtherCreate(&io) if err != nil { log.Errorf("checkIoAdapterEthernet: create io adapter error %v", err) } - ctx.ioAdapterMap.Store(nadname, true) + z.ioAdapterMap.Store(nadname, true) log.Functionf("ccheckIoAdapterEthernet: nad created %v", nadname) } else { log.Functionf("checkIoAdapterEthernet: nad already exist %v", nadname) @@ -40,7 +40,7 @@ func checkIoAdapterEthernet(ctx *zedkubeContext, aiConfig *types.AppInstanceConf return nil } -func checkDelIoAdapterEthernet(ctx *zedkubeContext, aiConfig *types.AppInstanceConfig) { +func (z *zedkube) checkDelIoAdapterEthernet(aiConfig *types.AppInstanceConfig) { if aiConfig.FixedResources.VirtualizationMode != types.NOHYPER { return @@ -49,10 +49,10 @@ func checkDelIoAdapterEthernet(ctx *zedkubeContext, aiConfig *types.AppInstanceC for _, io := range ioAdapter { if io.Type == types.IoNetEth { nadname := "host-" + io.Name - _, ok := ctx.ioAdapterMap.Load(nadname) + _, ok := z.ioAdapterMap.Load(nadname) if ok { // remove the syncMap entry - ctx.ioAdapterMap.Delete(nadname) + z.ioAdapterMap.Delete(nadname) } // delete the NAD in kubernetes kubeapi.DeleteNAD(log, nadname) @@ -62,7 +62,7 @@ func checkDelIoAdapterEthernet(ctx *zedkubeContext, aiConfig *types.AppInstanceC } // ioEtherCreate - create and send NAD for direct-attached ethernet -func ioEtherCreate(ctx *zedkubeContext, ioAdapt *types.IoAdapter) error { +func (z *zedkube) ioEtherCreate(ioAdapt *types.IoAdapter) error { name := ioAdapt.Name spec := fmt.Sprintf( `{ diff --git a/pkg/pillar/cmd/zedkube/kubestatscollect.go b/pkg/pillar/cmd/zedkube/kubestatscollect.go new file mode 100644 index 0000000000..b3f6f25d0d --- /dev/null +++ b/pkg/pillar/cmd/zedkube/kubestatscollect.go @@ -0,0 +1,308 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build kubevirt + +package zedkube + +import ( + "context" + "strings" + "time" + + "github.com/lf-edge/eve/pkg/pillar/kubeapi" + "github.com/lf-edge/eve/pkg/pillar/types" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + virtv1 "kubevirt.io/api/core/v1" + "kubevirt.io/client-go/kubecli" +) + +func (z *zedkube) collectKubeStats() { + // we are the elected leader, start collecting kube stats + // regardless if we are in cluster or single node mode + if z.isKubeStatsLeader { + log.Functionf("collectKubeStats: Started collecting kube stats") + + clientset, err := getKubeClientSet() + if err != nil { + log.Errorf("collectKubeStats: can't get clientset %v", err) + return + } + + var podsInfo []types.KubePodInfo + var nodesInfo []types.KubeNodeInfo + var vmisInfo []types.KubeVMIInfo + + // get nodes + nodes, err := getKubeNodes(clientset) + if err != nil { + log.Errorf("collectKubeStats: can't get nodes %v", err) + return + } + for _, node := range nodes { + nodeInfo := getKubeNodeInfo(node) + nodesInfo = append(nodesInfo, *nodeInfo) + } + + // get app pods + pods, err := getAppKubePods(clientset) + if err != nil { + log.Errorf("collectKubeStats: can't get pods %v", err) + return + } + for _, pod := range pods { + if strings.HasPrefix(pod.ObjectMeta.Name, "virt-launcher-") { // skip virt-launcher pods + continue + } + podInfo := getKubePodInfo(pod) + podsInfo = append(podsInfo, *podInfo) + } + + // get VMIs + virtClient, err := getVirtClient() + if err != nil { + log.Errorf("collectKubeStats: can't get virtClient %v", err) + return + } + vmis, err := getAppVMIs(virtClient) + if err != nil { + log.Errorf("collectKubeStats: can't get VMIs %v", err) + return + } + for _, vmi := range vmis { + vmiInfo := getAppVMIInfo(vmi) + vmisInfo = append(vmisInfo, *vmiInfo) + } + + // Publish the cluster info, first w/ nodes and app pods and VMIs + clusterInfo := types.KubeClusterInfo{ + Nodes: nodesInfo, + AppPods: podsInfo, + AppVMIs: vmisInfo, + } + z.pubKubeClusterInfo.Publish("global", clusterInfo) + } +} + +func getKubeNodes(clientset *kubernetes.Clientset) ([]corev1.Node, error) { + nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + log.Errorf("getKubeNodes: can't get nodes %v", err) + return nil, err + } + return nodes.Items, nil +} + +func getKubeNodeInfo(node corev1.Node) *types.KubeNodeInfo { + status := types.KubeNodeStatusUnknown + var lastTransitionTime time.Time + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + if condition.Status == corev1.ConditionTrue { + status = types.KubeNodeStatusReady + } else if condition.Status == corev1.ConditionFalse { + status = types.KubeNodeStatusNotReady + } else if condition.Status == corev1.ConditionUnknown { + status = types.KubeNodeStatusNotReachable + } + lastTransitionTime = condition.LastTransitionTime.Time + break + } + } + + isMaster := false + usesEtcd := false + if _, ok := node.Labels["node-role.kubernetes.io/master"]; ok { + isMaster = true + } + if _, ok := node.Labels["node-role.kubernetes.io/etcd"]; ok { + usesEtcd = true + } + + // Get creation time + creationTimestamp := node.CreationTimestamp.Time + + // Get API version + kubeletVersion := node.Status.NodeInfo.KubeletVersion + + // Get internal and external IPs + var internalIP, externalIP string + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + internalIP = address.Address + } else if address.Type == corev1.NodeExternalIP { + externalIP = address.Address + } + } + + // Check if the node is schedulable + schedulable := !node.Spec.Unschedulable + log.Functionf("getKubeNodeInfo: node %s, status %v, isMaster %v, usesEtcd %v, creationTime %v, lastTrasitionTime %v, kubeletVersion %s, internalIP %s, externalIP %s, schedulable %v", + node.Name, status, isMaster, usesEtcd, creationTimestamp, lastTransitionTime, kubeletVersion, internalIP, externalIP, schedulable) + + nodeInfo := types.KubeNodeInfo{ + Name: node.Name, + Status: status, + IsMaster: isMaster, + UsesEtcd: usesEtcd, + CreationTime: creationTimestamp, + LastTransitionTime: lastTransitionTime, + KubeletVersion: kubeletVersion, + InternalIP: internalIP, + ExternalIP: externalIP, + Schedulable: schedulable, + } + + return &nodeInfo +} + +func getAppKubePods(clientset *kubernetes.Clientset) ([]corev1.Pod, error) { + // List pods in the namespace + pods, err := clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Errorf("getAppKubePods: can't get nodes %v", err) + return nil, err + } + return pods.Items, nil +} + +func getKubePodInfo(pod corev1.Pod) *types.KubePodInfo { + status := pod.Status.Phase + restartCount := int32(0) + var restartTimestamp time.Time + for _, containerStatus := range pod.Status.ContainerStatuses { + restartCount += containerStatus.RestartCount + if containerStatus.LastTerminationState.Terminated != nil { + restartTimestamp = containerStatus.LastTerminationState.Terminated.FinishedAt.Time + } + } + + CreationTimestamp := pod.CreationTimestamp.Time + PodIP := pod.Status.PodIP + NodeName := pod.Spec.NodeName + + log.Functionf("getKubePodInfo: pod %s, status %s, restartCount %d, restartTimestamp %v, creationTime %v, podIP %s, nodeName %s", + pod.Name, status, restartCount, restartTimestamp, CreationTimestamp, PodIP, NodeName) + + podInfo := types.KubePodInfo{ + Name: pod.Name, + Status: convertStringToKubePodStatus(status), + RestartCount: restartCount, + RestartTimestamp: restartTimestamp, + CreationTimestamp: CreationTimestamp, + PodIP: PodIP, + NodeName: NodeName, + } + return &podInfo +} + +func getAppVMIs(virtClient kubecli.KubevirtClient) ([]virtv1.VirtualMachineInstance, error) { + // List pods in the namespace + vmiList, err := virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).List(context.Background(), &metav1.ListOptions{}) + if err != nil { + log.Errorf("getAppVMIs: can't get nodes %v", err) + return nil, err + } + return vmiList.Items, nil +} + +func getAppVMIInfo(vmi virtv1.VirtualMachineInstance) *types.KubeVMIInfo { + // Extract information from the VMI + name := vmi.Name + creationTime := vmi.CreationTimestamp.Time + phase := vmi.Status.Phase + nodeName := vmi.Status.NodeName + ready := false + var lastTransitionTime time.Time + for _, condition := range vmi.Status.Conditions { + if condition.Type == virtv1.VirtualMachineInstanceReady && condition.Status == corev1.ConditionTrue { + lastTransitionTime = condition.LastTransitionTime.Time + ready = true + break + } + } + + // Log the information + log.Functionf("getAppVMIInfo: VMI %s, createtime %v, phase %s, lastTransitionTime %v, nodeName %s, ready %t", + name, creationTime, phase, lastTransitionTime, nodeName, ready) + + vmiInfo := types.KubeVMIInfo{ + Name: name, + Status: convertStringToKubeVMIStatus(phase), + CreationTime: creationTime, + LastTransitionTime: lastTransitionTime, + IsReady: ready, + NodeName: nodeName, + } + + return &vmiInfo +} + +// convertStringToKubePodStatus converts a string status to a KubePodStatus. +func convertStringToKubePodStatus(phase corev1.PodPhase) types.KubePodStatus { + switch phase { + case corev1.PodPending: + return types.KubePodStatusPending + case corev1.PodRunning: + return types.KubePodStatusRunning + case corev1.PodSucceeded: + return types.KubePodStatusSucceeded + case corev1.PodFailed: + return types.KubePodStatusFailed + default: + return types.KubePodStatusUnknown + } +} + +func convertStringToKubeVMIStatus(status virtv1.VirtualMachineInstancePhase) types.KubeVMIStatus { + switch status { + case virtv1.VmPhaseUnset: + return types.KubeVMIStatusUnset + case virtv1.Pending: + return types.KubeVMIStatusPending + case virtv1.Scheduling: + return types.KubeVMIStatusScheduling + case virtv1.Scheduled: + return types.KubeVMIStatusScheduled + case virtv1.Running: + return types.KubeVMIStatusRunning + case virtv1.Succeeded: + return types.KubeVMIStatusSucceeded + case virtv1.Failed: + return types.KubeVMIStatusFailed + default: + return types.KubeVMIStatusUnknown + } +} + +func getKubeClientSet() (*kubernetes.Clientset, error) { + config, err := kubeapi.GetKubeConfig() + if err != nil { + log.Errorf("getKubeClientSet: can't get config %v", err) + return nil, err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Errorf("getKubeClientSet: can't get clientset %v", err) + return nil, err + } + return clientset, nil +} + +func getVirtClient() (kubecli.KubevirtClient, error) { + config, err := kubeapi.GetKubeConfig() + if err != nil { + log.Errorf("getVirtClient: can't get config %v", err) + return nil, err + } + + virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(config) + if err != nil { + log.Errorf("getVirtClient: can't get client %v", err) + return nil, err + } + return virtClient, nil +} diff --git a/pkg/pillar/cmd/zedkube/lease.go b/pkg/pillar/cmd/zedkube/lease.go new file mode 100644 index 0000000000..9845a6d09f --- /dev/null +++ b/pkg/pillar/cmd/zedkube/lease.go @@ -0,0 +1,129 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build kubevirt + +package zedkube + +import ( + "context" + "time" + + "github.com/lf-edge/eve/pkg/pillar/kubeapi" + "github.com/lf-edge/eve/pkg/pillar/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" +) + +func (z *zedkube) handleLeaderElection() { + var cancelFunc context.CancelFunc + for { + log.Functionf("handleLeaderElection: Waiting for signal") // XXX + select { + case <-z.electionStartCh: + + // Create a cancellable context + baseCtx, cancel := context.WithCancel(context.Background()) + cancelFunc = cancel + + clientset, err := getKubeClientSet() + if err != nil { + z.inKubeLeaderElection = false + log.Errorf("handleLeaderElection: can't get clientset %v", err) + return + } + + err = z.getnodeNameAndUUID() + if err != nil { + z.inKubeLeaderElection = false + log.Errorf("handleLeaderElection: can't get nodeName and UUID %v", err) + return + } + + // Create a new lease lock + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: "eve-kube-stats-leader", + Namespace: kubeapi.EVEKubeNameSpace, + }, + Client: clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: z.nodeName, + }, + } + + // Define the leader election configuration + lec := leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + ReleaseOnCancel: true, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(baseCtx context.Context) { + z.isKubeStatsLeader = true + log.Functionf("handleLeaderElection: Started leading") + }, + OnStoppedLeading: func() { + z.isKubeStatsLeader = false + log.Functionf("handleLeaderElection: Stopped leading") + }, + OnNewLeader: func(identity string) { + log.Functionf("handleLeaderElection: New leader elected: %s", identity) + }, + }, + } + + // Start the leader election in a separate goroutine + go leaderelection.RunOrDie(baseCtx, lec) + log.Noticef("handleLeaderElection: Started leader election for %s", z.nodeName) + + case <-z.electionStopCh: + z.isKubeStatsLeader = false + z.inKubeLeaderElection = false + log.Noticef("handleLeaderElection: Stopped leading signal received") + if cancelFunc != nil { + cancelFunc() + cancelFunc = nil + } + } + } +} + +// SignalStartLeaderElection - to signal the start of leader election +func (z *zedkube) SignalStartLeaderElection() { + z.inKubeLeaderElection = true + select { + case z.electionStartCh <- struct{}{}: + log.Functionf("SignalStartLeaderElection: Signal sent successfully") + default: + log.Warningf("SignalStartLeaderElection: Channel is full, signal not sent") + } +} + +// SignalStopLeaderElection - to signal the stop of leader election +func (z *zedkube) SignalStopLeaderElection() { + select { + case z.electionStopCh <- struct{}{}: + log.Functionf("SignalStopLeaderElection: Signal sent successfully") + default: + log.Warningf("SignalStopLeaderElection: Channel is full, signal not sent") + } +} + +func (z *zedkube) handleControllerStatusChange(status *types.ZedAgentStatus) { + configStatus := status.ConfigGetStatus + + log.Functionf("handleControllerStatusChange: Leader enter, status %v", configStatus) + switch configStatus { + case types.ConfigGetSuccess, types.ConfigGetReadSaved: // either read success or read from saved config + if !z.inKubeLeaderElection { + z.SignalStartLeaderElection() + } + default: + if z.inKubeLeaderElection { + z.SignalStopLeaderElection() + } + } +} diff --git a/pkg/pillar/cmd/zedkube/vnc.go b/pkg/pillar/cmd/zedkube/vnc.go index 082864e8a4..bbd42c14b8 100644 --- a/pkg/pillar/cmd/zedkube/vnc.go +++ b/pkg/pillar/cmd/zedkube/vnc.go @@ -21,7 +21,7 @@ import ( ) // runAppVNC - run vnc for kubevirt VMI remote console -func runAppVNC(ctx *zedkubeContext, config *types.AppInstanceConfig) { +func (z *zedkube) runAppVNC(config *types.AppInstanceConfig) { vmconfig := config.FixedResources //vmiName := findXenCfgName(config.UUIDandVersion.UUID.String()) @@ -29,7 +29,7 @@ func runAppVNC(ctx *zedkubeContext, config *types.AppInstanceConfig) { i := 5 for { var err error - vmiName, err = getVMIdomainName(ctx, config) + vmiName, err = z.getVMIdomainName(config) if err != nil { log.Functionf("runAppVNC: get vmi domainname error %v", err) if i >= 0 { @@ -68,8 +68,16 @@ func runAppVNC(ctx *zedkubeContext, config *types.AppInstanceConfig) { log.Functionf("runAppVNC: %v, done", vmiName) } -func getVMIdomainName(ctx *zedkubeContext, config *types.AppInstanceConfig) (string, error) { - virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(ctx.config) +func (z *zedkube) getVMIdomainName(config *types.AppInstanceConfig) (string, error) { + if z.config == nil { + config, err := kubeapi.GetKubeConfig() + if err != nil { + log.Errorf("getVMIs: config is nil") + return "", fmt.Errorf("getVMIs: config get failed error %v", err) + } + z.config = config + } + virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(z.config) if err != nil { log.Errorf("getVMIs: get virtclient error %v", err) return "", err diff --git a/pkg/pillar/cmd/zedkube/zedkube.go b/pkg/pillar/cmd/zedkube/zedkube.go index a2808f002b..6115712ee2 100644 --- a/pkg/pillar/cmd/zedkube/zedkube.go +++ b/pkg/pillar/cmd/zedkube/zedkube.go @@ -6,12 +6,15 @@ package zedkube import ( + "net/http" + "strings" "sync" "time" "github.com/lf-edge/eve/pkg/pillar/agentbase" "github.com/lf-edge/eve/pkg/pillar/agentlog" "github.com/lf-edge/eve/pkg/pillar/base" + "github.com/lf-edge/eve/pkg/pillar/cipher" "github.com/lf-edge/eve/pkg/pillar/kubeapi" "github.com/lf-edge/eve/pkg/pillar/pubsub" "github.com/lf-edge/eve/pkg/pillar/types" @@ -35,16 +38,42 @@ var ( log *base.LogObject ) -type zedkubeContext struct { +type zedkube struct { agentbase.AgentBase globalConfig *types.ConfigItemValueMap subAppInstanceConfig pubsub.Subscription subGlobalConfig pubsub.Subscription + subDeviceNetworkStatus pubsub.Subscription + subEdgeNodeClusterConfig pubsub.Subscription + subEdgeNodeInfo pubsub.Subscription + subZedAgentStatus pubsub.Subscription + + subControllerCert pubsub.Subscription + subEdgeNodeCert pubsub.Subscription + cipherMetrics *cipher.AgentMetrics + pubCipherBlockStatus pubsub.Publication + pubCipherMetrics pubsub.Publication + + pubEdgeNodeClusterStatus pubsub.Publication + pubENClusterAppStatus pubsub.Publication + pubKubeClusterInfo pubsub.Publication + networkInstanceStatusMap sync.Map ioAdapterMap sync.Map + deviceNetworkStatus types.DeviceNetworkStatus + clusterConfig types.EdgeNodeClusterConfig config *rest.Config appLogStarted bool appContainerLogger *logrus.Logger + clusterIPIsReady bool + nodeuuid string + nodeName string + isKubeStatsLeader bool + inKubeLeaderElection bool + electionStartCh chan struct{} + electionStopCh chan struct{} + statusServer *http.Server + statusServerWG sync.WaitGroup } // Run - an zedkube run @@ -52,7 +81,7 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar logger = loggerArg log = logArg - zedkubeCtx := zedkubeContext{ + zedkubeCtx := zedkube{ globalConfig: types.DefaultConfigItemValueMap(), } agentbase.Init(&zedkubeCtx, logger, log, agentName, @@ -84,6 +113,77 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar zedkubeCtx.subAppInstanceConfig = subAppInstanceConfig subAppInstanceConfig.Activate() + // Look for controller certs which will be used for decryption. + subControllerCert, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.ControllerCert{}, + Persistent: true, + Activate: true, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.subControllerCert = subControllerCert + + // Look for edge node certs which will be used for decryption + subEdgeNodeCert, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "tpmmgr", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeCert{}, + Persistent: true, + Activate: true, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.subEdgeNodeCert = subEdgeNodeCert + + pubCipherBlockStatus, err := ps.NewPublication( + pubsub.PublicationOptions{ + AgentName: agentName, + TopicType: types.CipherBlockStatus{}, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.pubCipherBlockStatus = pubCipherBlockStatus + pubCipherBlockStatus.ClearRestarted() + + pubEdgeNodeClusterStatus, err := ps.NewPublication( + pubsub.PublicationOptions{ + AgentName: agentName, + TopicType: types.EdgeNodeClusterStatus{}, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.pubEdgeNodeClusterStatus = pubEdgeNodeClusterStatus + + pubENClusterAppStatus, err := ps.NewPublication( + pubsub.PublicationOptions{ + AgentName: agentName, + TopicType: types.ENClusterAppStatus{}, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.pubENClusterAppStatus = pubENClusterAppStatus + + pubKubeClusterInfo, err := ps.NewPublication( + pubsub.PublicationOptions{ + AgentName: agentName, + TopicType: types.KubeClusterInfo{}, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.pubKubeClusterInfo = pubKubeClusterInfo + // Look for global config such as log levels subGlobalConfig, err := ps.NewSubscription(pubsub.SubscriptionOptions{ AgentName: "zedagent", @@ -103,31 +203,173 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar zedkubeCtx.subGlobalConfig = subGlobalConfig subGlobalConfig.Activate() - err = kubeapi.WaitForKubernetes(agentName, ps, stillRunning) + // Watch DNS to learn if the Cluster Interface and Cluster Prefix is ready to use + subDeviceNetworkStatus, err := ps.NewSubscription( + pubsub.SubscriptionOptions{ + AgentName: "nim", + MyAgentName: agentName, + TopicImpl: types.DeviceNetworkStatus{}, + Activate: false, + Ctx: &zedkubeCtx, + CreateHandler: handleDNSCreate, + ModifyHandler: handleDNSModify, + WarningTime: warningTime, + ErrorTime: errorTime, + }) if err != nil { - log.Errorf("zedkube: WaitForKubenetes %v", err) + log.Fatal(err) + } + zedkubeCtx.subDeviceNetworkStatus = subDeviceNetworkStatus + subDeviceNetworkStatus.Activate() + + zedkubeCtx.cipherMetrics = cipher.NewAgentMetrics(agentName) + pubCipherMetrics, err := ps.NewPublication( + pubsub.PublicationOptions{ + AgentName: agentName, + TopicType: types.CipherMetrics{}, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.pubCipherMetrics = pubCipherMetrics + + // start the leader election + zedkubeCtx.electionStartCh = make(chan struct{}) + zedkubeCtx.electionStopCh = make(chan struct{}) + go zedkubeCtx.handleLeaderElection() + + // Wait for the certs, which are needed to decrypt the token inside the cluster config. + var controllerCertInitialized, edgenodeCertInitialized bool + for !controllerCertInitialized || !edgenodeCertInitialized { + log.Noticef("zedkube run: waiting for controller cert (initialized=%t), "+ + "edgenode cert (initialized=%t)", controllerCertInitialized, + edgenodeCertInitialized) + select { + case change := <-subControllerCert.MsgChan(): + subControllerCert.ProcessChange(change) + controllerCertInitialized = true + + case change := <-subEdgeNodeCert.MsgChan(): + subEdgeNodeCert.ProcessChange(change) + edgenodeCertInitialized = true + + case <-stillRunning.C: + } + ps.StillRunning(agentName, warningTime, errorTime) } + log.Noticef("zedkube run: controller and edge node certs are ready") + + // EdgeNodeClusterConfig subscription + subEdgeNodeClusterConfig, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeClusterConfig{}, + Persistent: true, + Activate: false, + Ctx: &zedkubeCtx, + CreateHandler: handleEdgeNodeClusterConfigCreate, + ModifyHandler: handleEdgeNodeClusterConfigModify, + DeleteHandler: handleEdgeNodeClusterConfigDelete, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.subEdgeNodeClusterConfig = subEdgeNodeClusterConfig + subEdgeNodeClusterConfig.Activate() + zedkubeCtx.config, err = kubeapi.GetKubeConfig() if err != nil { - log.Errorf("zedkube: GetKubeConfi %v", err) + log.Errorf("zedkube: GetKubeConfig %v", err) } else { log.Noticef("zedkube: running") } + // Look for edge node info + subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeInfo{}, + Persistent: true, + Activate: false, + Ctx: &zedkubeCtx, + CreateHandler: handleEdgeNodeInfoCreate, + ModifyHandler: handleEdgeNodeInfoModify, + DeleteHandler: handleEdgeNodeInfoDelete, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.subEdgeNodeInfo = subEdgeNodeInfo + subEdgeNodeInfo.Activate() + + // subscribe to zedagent status events, for controller connection status + subZedAgentStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.ZedAgentStatus{}, + Activate: false, + Ctx: &zedkubeCtx, + CreateHandler: handleZedAgentStatusCreate, + ModifyHandler: handleZedAgentStatusModify, + DeleteHandler: handleZedAgentStatusDelete, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + log.Fatal(err) + } + zedkubeCtx.subZedAgentStatus = subZedAgentStatus + subZedAgentStatus.Activate() + + err = kubeapi.WaitForKubernetes(agentName, ps, stillRunning, + // Make sure we keep ClusterIPIsReady up to date while we wait + // for Kubernetes to come up. + pubsub.WatchAndProcessSubChanges(subEdgeNodeClusterConfig), + pubsub.WatchAndProcessSubChanges(subDeviceNetworkStatus)) + if err != nil { + log.Errorf("zedkube: WaitForKubenetes %v", err) + } + appLogTimer := time.NewTimer(logcollectInterval * time.Second) + log.Notice("zedkube online") + for { select { + case change := <-subDeviceNetworkStatus.MsgChan(): + subDeviceNetworkStatus.ProcessChange(change) + case change := <-subAppInstanceConfig.MsgChan(): subAppInstanceConfig.ProcessChange(change) case <-appLogTimer.C: - collectAppLogs(&zedkubeCtx) + zedkubeCtx.collectAppLogs() + zedkubeCtx.checkAppsStatus() + zedkubeCtx.collectKubeStats() appLogTimer = time.NewTimer(logcollectInterval * time.Second) case change := <-subGlobalConfig.MsgChan(): subGlobalConfig.ProcessChange(change) + case change := <-subEdgeNodeClusterConfig.MsgChan(): + subEdgeNodeClusterConfig.ProcessChange(change) + + case change := <-subControllerCert.MsgChan(): + subControllerCert.ProcessChange(change) + + case change := <-subEdgeNodeCert.MsgChan(): + subEdgeNodeCert.ProcessChange(change) + + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + + case change := <-subZedAgentStatus.MsgChan(): + subZedAgentStatus.ProcessChange(change) + case <-stillRunning.C: } ps.StillRunning(agentName, warningTime, errorTime) @@ -136,30 +378,30 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar func handleAppInstanceConfigCreate(ctxArg interface{}, key string, configArg interface{}) { - ctx := ctxArg.(*zedkubeContext) + z := ctxArg.(*zedkube) config := configArg.(types.AppInstanceConfig) log.Functionf("handleAppInstanceConfigCreate(%v) spec for %s", config.UUIDandVersion, config.DisplayName) - err := checkIoAdapterEthernet(ctx, &config) + err := z.checkIoAdapterEthernet(&config) log.Functionf("handleAppInstancConfigModify: genAISpec %v", err) } func handleAppInstanceConfigModify(ctxArg interface{}, key string, configArg interface{}, oldConfigArg interface{}) { - ctx := ctxArg.(*zedkubeContext) + z := ctxArg.(*zedkube) config := configArg.(types.AppInstanceConfig) oldconfig := oldConfigArg.(types.AppInstanceConfig) log.Functionf("handleAppInstancConfigModify(%v) spec for %s", config.UUIDandVersion, config.DisplayName) - err := checkIoAdapterEthernet(ctx, &config) + err := z.checkIoAdapterEthernet(&config) if oldconfig.RemoteConsole != config.RemoteConsole { log.Functionf("handleAppInstancConfigModify: new remote console %v", config.RemoteConsole) - go runAppVNC(ctx, &config) + go z.runAppVNC(&config) } log.Functionf("handleAppInstancConfigModify: genAISpec %v", err) } @@ -168,11 +410,22 @@ func handleAppInstanceConfigDelete(ctxArg interface{}, key string, configArg interface{}) { log.Functionf("handleAppInstanceConfigDelete(%s)", key) - ctx := ctxArg.(*zedkubeContext) + z := ctxArg.(*zedkube) config := configArg.(types.AppInstanceConfig) - checkDelIoAdapterEthernet(ctx, &config) + z.checkDelIoAdapterEthernet(&config) log.Functionf("handleAppInstanceConfigDelete(%s) done", key) + + // remove the cluster app status publication + pub := z.pubENClusterAppStatus + stItmes := pub.GetAll() + for _, st := range stItmes { + aiStatus := st.(types.ENClusterAppStatus) + if aiStatus.AppUUID == config.UUIDandVersion.UUID { + z.pubENClusterAppStatus.Unpublish(config.UUIDandVersion.UUID.String()) + break + } + } } func handleGlobalConfigCreate(ctxArg interface{}, key string, @@ -188,13 +441,107 @@ func handleGlobalConfigModify(ctxArg interface{}, key string, func handleGlobalConfigImpl(ctxArg interface{}, key string, statusArg interface{}) { - ctx := ctxArg.(*zedkubeContext) + z := ctxArg.(*zedkube) if key != "global" { log.Functionf("handleGlobalConfigImpl: ignoring %s", key) return } log.Functionf("handleGlobalConfigImpl for %s", key) - _ = agentlog.HandleGlobalConfig(log, ctx.subGlobalConfig, agentName, - ctx.CLIParams().DebugOverride, ctx.Logger()) + _ = agentlog.HandleGlobalConfig(log, z.subGlobalConfig, agentName, + z.CLIParams().DebugOverride, z.Logger()) log.Functionf("handleGlobalConfigImpl(%s): done", key) } + +func handleEdgeNodeClusterConfigCreate(ctxArg interface{}, key string, + configArg interface{}) { + log.Functionf("handleEdgeNodeClusterConfigCreate: %s", key) + handleEdgeNodeClusterConfigImpl(ctxArg, key, configArg, nil) +} + +func handleEdgeNodeClusterConfigModify(ctxArg interface{}, key string, + configArg interface{}, oldConfigArg interface{}) { + log.Functionf("handleEdgeNodeClusterConfigModify: %s", key) + handleEdgeNodeClusterConfigImpl(ctxArg, key, configArg, oldConfigArg) +} + +func handleEdgeNodeClusterConfigImpl(ctxArg interface{}, key string, + configArg interface{}, oldConfigArg interface{}) { + + var config, oldconfig types.EdgeNodeClusterConfig + var oldConfigPtr *types.EdgeNodeClusterConfig + config = configArg.(types.EdgeNodeClusterConfig) + if oldConfigArg != nil { + oldconfig = oldConfigArg.(types.EdgeNodeClusterConfig) + oldConfigPtr = &oldconfig + } + + z := ctxArg.(*zedkube) + log.Functionf("handleEdgeNodeClusterConfigImpl for %s, config %+v, oldconfig %+v", + key, config, oldconfig) + + z.applyClusterConfig(&config, oldConfigPtr) +} + +func handleEdgeNodeClusterConfigDelete(ctxArg interface{}, key string, + statusArg interface{}) { + z := ctxArg.(*zedkube) + log.Functionf("handleEdgeNodeClusterConfigDelete for %s", key) + config := statusArg.(types.EdgeNodeClusterConfig) + z.applyClusterConfig(nil, &config) + z.pubEdgeNodeClusterStatus.Unpublish("global") +} + +// handle zedagent status events, for cloud connectivity +func handleZedAgentStatusCreate(ctxArg interface{}, key string, + statusArg interface{}) { + handleZedAgentStatusImpl(ctxArg, key, statusArg) +} + +func handleZedAgentStatusModify(ctxArg interface{}, key string, + statusArg interface{}, oldStatusArg interface{}) { + handleZedAgentStatusImpl(ctxArg, key, statusArg) +} + +func handleZedAgentStatusImpl(ctxArg interface{}, key string, + statusArg interface{}) { + + z := ctxArg.(*zedkube) + status := statusArg.(types.ZedAgentStatus) + z.handleControllerStatusChange(&status) + log.Functionf("handleZedAgentStatusImpl: for Leader status %v, done", status) +} + +func handleZedAgentStatusDelete(ctxArg interface{}, key string, + statusArg interface{}) { + // do nothing + log.Functionf("handleZedAgentStatusDelete(%s) done", key) +} + +func handleEdgeNodeInfoCreate(ctxArg interface{}, key string, + statusArg interface{}) { + handleEdgeNodeInfoImpl(ctxArg, key, statusArg) +} + +func handleEdgeNodeInfoModify(ctxArg interface{}, key string, + statusArg interface{}, oldStatusArg interface{}) { + handleEdgeNodeInfoImpl(ctxArg, key, statusArg) +} + +func handleEdgeNodeInfoImpl(ctxArg interface{}, key string, + statusArg interface{}) { + z := ctxArg.(*zedkube) + nodeInfo := statusArg.(types.EdgeNodeInfo) + if err := z.getnodeNameAndUUID(); err != nil { + log.Errorf("handleEdgeNodeInfoImpl: getnodeNameAndUUID failed: %v", err) + return + } + + z.nodeName = strings.ToLower(nodeInfo.DeviceName) + z.nodeuuid = nodeInfo.DeviceID.String() +} + +func handleEdgeNodeInfoDelete(ctxArg interface{}, key string, + statusArg interface{}) { + // do nothing? + log.Functionf("handleEdgeNodeInfoDelete(%s) done", key) +} diff --git a/pkg/pillar/devicenetwork/pbr.go b/pkg/pillar/devicenetwork/pbr.go index 541d48dca7..50308c2158 100644 --- a/pkg/pillar/devicenetwork/pbr.go +++ b/pkg/pillar/devicenetwork/pbr.go @@ -4,6 +4,10 @@ package devicenetwork const ( + // KubeSvcRT : index of the routing table used for the Kubernetes service prefix. + // Only used in the Kubevirt mode. + KubeSvcRT = 400 + // DPCBaseRTIndex : base index for per-port routing tables used for device // connectivity (between EVE and remote endpoints, such as the controller), // i.e. used for DevicePortConfig (abbreviated to DPC). diff --git a/pkg/pillar/docs/zedkube.md b/pkg/pillar/docs/zedkube.md index ad899c8c88..7aad723a52 100644 --- a/pkg/pillar/docs/zedkube.md +++ b/pkg/pillar/docs/zedkube.md @@ -1,9 +1,43 @@ -# Clustered eve nodes (zedkube) +# Clustered eve nodes (aka zedkube) ## Overview +zedkube is a service in pillar/cmd/zedkube. The main purpose of the service is to interact with the kubernetes cluster in the container 'kube' and supply some of the pillar commands to the cluster, and relay some information from the cluster to the pillar. It handles some of the actions for the cluster which do not belong to 'volumemgr' or 'domainmgr' services. + ## Components +### App VNC for remote console + +For Kubevirt VMs, it does not like in KVM image to have QEMU service the VNC port 5900s ready for connection into the VM's console. In kubevirt image, we need to use 'virtctl' tool and specify the needed KubeConfig YAML file for access to the VMI's console. + +zedkube service subscribe to the AppInstanceConfig, and monitor if the user has requested the RemoteConsole access. If there is, the service will write to a file in /run/zedkube/vmiVNC.run with specifying the VMI name, VNC port number. The container 'kube' process is monitoring this file, and launch the 'virtctl vnc' commands with the VMI and port. This will enable the VNC port 590x to be enabled and ready to handle the VNC client request. The user can then use the RemoteConsole to connect through the 'guacd' to the VNC port as the same in the KVM image. + +### Cluster Status + +The kubevirt EVE image supports either running in kubernetes with single node or with cluster mode (at least 3 nodes). In single-node mode, there is no change to the EVE API from the controller with EVE devices. In the cluster mode, controller will send 'EdgeNodeClusterConfig' to the device, and being published in 'zedagent' and zedkube subscribe to that, then it will publish the 'EdgeNodeClusterStatus'. The container 'kube' will monitor the 'EdgeNodeClusterStatus' for cluster mode changes. zedkube will subscribe also to the deviceNetworkStatus from 'nim' and make sure the cluster prefix is ready for the kubernetes. + +### Collect App Container Logs + +A timer job is used to search through the AppInstanceConfig subscription list, and if the application is a native container, without a VM shim layer, it searches the pods with the application name, and use the kubernetes pods 'GetLogs' API to acquire the logs since last query. It then makes the log entry with App UUID, container name and timestamp, so the newlogd will pick this up for the app log collection. + +### Publish Cluster Application Status + +zedkube periodically query all the PODs running on this device and report each of the applications in AppInstanceConfig. The main purpose of this status is for application migration handling. We may have the App which sets the Designated Node ID to itself, and the application may moved onto another node; or we are the node which does not match the App's Designated Node ID, but it appears to be scheduled on this node. The 'zedmanager' listens on this ENClusterAppStatus, and makes the decision for the 'effective' activate flag of the application. + +In VMI case, there will always be a POD having the name with prefix 'virt-launcher-', we only use this to determine the migration status of the VMI. + +### Cluster Status Server + +In the Kubernetes cluster mode with multiple HA servers, when it starts up, it needs to join the cluster by specifying the 'bootstrap' server IP address. Even if the IP address is there, sometimes the 'bootstrap' node is still in single-node mode or it has not been converted into the cluster server yet. This will create problem for the joining server, and will later have conflicts with the status and certificates or tokens. To handle this server joining, zedkube is responsible for reporting it's cluster status through HTTP service. Each of the cluster servers will have a HTTP service on the cluster interface with port number '12346' using URL /status. It will report status of 'cluster' if the node has the property of 'master' and 'etcd'. The new joining server node or agent node will not move forward for starting the kubernetes node unless the http query returns 'cluster' status over the cluster network. The 'ClusterStatus' port for the HTTP is explicitly allowed on EVE firewall. + +### App Ethernet Passthrough + +When the application uses passthrough on ethernet ports, zedkube creates a special NAD, Network Attachment Definition, for the direct connection, uses the name 'host-eth1' for example. It creates the NAD to the kubernetes cluster, and in the case of native container applications, the domainmgr will use this NAD when setup the application configure to kubernetes. + +### Kubernetes Stats Collection + +This collection is specific for the kubernetes status and stats. Although EVE has the device info, domain status, etc, but kubernetes has a different sets of 'nodes', 'pods', 'vmis', cluster storage stats, etc. This will be reported by zedkube through 'KubeClusterInfo' publication. It will also have some simple non-EVE App related POD status. + ### kubenodeop kubenodeop handles cordoning, uncordoning, and draining of clustered eve-os nodes. @@ -11,103 +45,113 @@ Any given node could be hosting one or more longhorn volume replicas and thus co A drain operation should be performed before any Node Operation / Node Command which can cause an extended outage of a node such as a reboot, shutdown, reset. kubenodeop handles NodeDrainRequest objects which zedkube subscribes to, initiates the drain, and publishes NodeDrainStatus objects. +## Kubernetes Node Draining + ### kubeapi 1. `kubeapi.GetNodeDrainStatus()` to determine if system supports drain - - HV!=kubevirt: NOTSUPPORTED - - HV=kubevirt will return: - - NOTSUPPORTED if in single node. - - NOTREQUESTED if in cluster mode + - HV!=kubevirt: NOTSUPPORTED + - HV=kubevirt will return: + - NOTSUPPORTED if in single node. + - NOTREQUESTED if in cluster mode 1. `kubeapi.RequestNodeDrain()` to begin a drain ### Drain PubSub setup (node reboot/shutdown) 1. zedagent/handlenodedrain.go:`initNodeDrainPubSub()` - - subscribes to NodeDrainStatus from zedkube - - creates publication of NodeDrainRequest + - subscribes to NodeDrainStatus from zedkube + - creates publication of NodeDrainRequest 1. nodeagent/handlenodedrain.go:`initNodeDrainPubSub()` - - subscribe to NodeDrainStatus from zedkube + - subscribe to NodeDrainStatus from zedkube ### Drain Request path (node reboot/shutdown) 1. zedagent/parseconfig.go:`scheduleDeviceOperation()` - - If `shouldDeferForNodeDrain()` is true - - Set Reboot or shutdown cmd deferred state in zedagentContext + - If `shouldDeferForNodeDrain()` is true + - Set Reboot or shutdown cmd deferred state in zedagentContext 1. zedagent/handlenodedrain.go:`shouldDeferForNodeDrain()` - - NodeDrainStatus == (NOTREQUESTED || FAILEDCORDON || FAILEDDRAIN): - - Drain is requested via `kubeapi.RequestNodeDrain()` - - return Defer - - NodeDrainStatus == (UNKNOWN || NOTSUPPORTED || COMPLETE ) - - return !Defer - - NodeDrainStatus == (REQUESTED || STARTING || CORDONED || DRAINRETRYING ): - - return Defer + - NodeDrainStatus == (NOTREQUESTED || FAILEDCORDON || FAILEDDRAIN): + - Drain is requested via `kubeapi.RequestNodeDrain()` + - return Defer + - NodeDrainStatus == (UNKNOWN || NOTSUPPORTED || COMPLETE ) + - return !Defer + - NodeDrainStatus == (REQUESTED || STARTING || CORDONED || DRAINRETRYING ): + - return Defer ### Drain Status Handler (node reboot/shutdown) 1. zedagent/handlenodedrain.go:`handleNodeDrainStatusImpl()` - - NodeDrainStatus = FAILEDCORDON or FAILEDDRAIN - - Unpublish NodeDrainRequest + - NodeDrainStatus = FAILEDCORDON or FAILEDDRAIN + - Unpublish NodeDrainRequest 1. nodeagent/handlenodedrain.go:`handleNodeDrainStatusImplNA()` - - NodeDrainStatus >= REQUESTED and < COMPLETE - - republish nodeagentstatus with drainInProgress set - - NodeDrainStatus == COMPLETE - - republish nodeagentstatus with drainInProgress cleared + - NodeDrainStatus >= REQUESTED and < COMPLETE + - republish nodeagentstatus with drainInProgress set + - NodeDrainStatus == COMPLETE + - republish nodeagentstatus with drainInProgress cleared 1. zedagent/zedagent.go:`handleNodeAgentStatusImpl()` - - If there is: - - a deferred device op - - nodeagent configctx reports drain complete - - Then process deferred reboot/shutdown + - If there is: + - a deferred device op + - nodeagent configctx reports drain complete + - Then process deferred reboot/shutdown ### Drain PubSub setup (node eveimage-update) 1. baseosmgr/handlenodedrain.go:`initNodeDrainPubSub()` - - subscribe to NodeDrainStatus from zedkube - - setup publication to NodeDrainRequest + - subscribe to NodeDrainStatus from zedkube + - setup publication to NodeDrainRequest ### Drain Request path (node eveimage-update) 1. baseosmgr/handlebaseos.go:`baseOsHandleStatusUpdateUUID()` - - If BaseOs download complete (LOADING||LOADED||INSTALLED), not currently Activated, and new config requested it Activated - - Check `shouldDeferForNodeDrain()`, if defer requested return as Completion will later will complete this BaseOsStatusUpdate. + - If BaseOs download complete (LOADING||LOADED||INSTALLED), not currently Activated, and new config requested it Activated + - Check `shouldDeferForNodeDrain()`, if defer requested return as Completion will later will complete this BaseOsStatusUpdate. 1. baseosmgr/handlenodedrain.go:`shouldDeferForNodeDrain()` - - NodeDrainStatus == (NOTREQUESTED || FAILEDCORDON || FAILEDDRAIN): - - save BaseOsId in baseOsMgrContext.deferredBaseOsID - - Drain is requested via `kubeapi.RequestNodeDrain()` - - return Defer - - NodeDrainStatus == (UNKNOWN || NOTSUPPORTED || COMPLETE ) - - return !Defer - - NodeDrainStatus == (REQUESTED || STARTING || CORDONED || DRAINRETRYING ): - - return Defer + - NodeDrainStatus == (NOTREQUESTED || FAILEDCORDON || FAILEDDRAIN): + - save BaseOsId in baseOsMgrContext.deferredBaseOsID + - Drain is requested via `kubeapi.RequestNodeDrain()` + - return Defer + - NodeDrainStatus == (UNKNOWN || NOTSUPPORTED || COMPLETE ) + - return !Defer + - NodeDrainStatus == (REQUESTED || STARTING || CORDONED || DRAINRETRYING ): + - return Defer ### Drain Status Handler (node eve-image update) 1. baseosmgr/handlenodedrain.go:`handleNodeDrainStatusImpl()` - - NodeDrainStatus == FAILEDCORDON or FAILEDDRAIN: - - Unpublish NodeDrainRequest - - NodeDrainStatus == COMPLETE: - - Complete deferred baseOsMgrContext.deferredBaseOsID to `baseOsHandleStatusUpdateUUID()` + - NodeDrainStatus == FAILEDCORDON or FAILEDDRAIN: + - Unpublish NodeDrainRequest + - NodeDrainStatus == COMPLETE: + - Complete deferred baseOsMgrContext.deferredBaseOsID to `baseOsHandleStatusUpdateUUID()` ### General DrainRequest Processing 1. zedkube/zedkube.go:Run() - - sub to NodeDrainRequest from zedagent and baseosmgr - - new publication of NodeDrainStatus - - Init NodeDrainStatus to NOTSUPPORTED + - sub to NodeDrainRequest from zedagent and baseosmgr + - new publication of NodeDrainStatus + - Init NodeDrainStatus to NOTSUPPORTED 1. zedkube/zedkube.go:`handleEdgeNodeClusterConfigImpl()` - - System switching to cluster membership: NodeDrainStatus -> NOTREQUESTED + - System switching to cluster membership: NodeDrainStatus -> NOTREQUESTED 1. zedkube/zedkube.go:`handleEdgeNodeClusterConfigDelete()` - - System switching to single node: NodeDrainStatus -> NOTSUPPORTED + - System switching to single node: NodeDrainStatus -> NOTSUPPORTED 1. zedkube/handlenodedrain.go:`handleNodeDrainRequestImpl()` - - NodeDrainStatus -> REQUESTED + - NodeDrainStatus -> REQUESTED 1. zedkube/kubenodeop.go:`cordonAndDrainNode()` - - NodeDrainStatus -> STARTING - - Retry Cordon up to 10 times (in case k8s api states object changed) - - when retries exhausted: NodeDrainStatus -> FAILEDCORDON - - NodeDrainStatus -> CORDONED - - Retry Drain up to 5 times - - between tries: NodeDrainStatus -> DRAINRETRYING - - on failure: NodeDrainStatus -> FAILEDDRAIN - - NodeDrainStatus -> COMPLETE + - NodeDrainStatus -> STARTING + - Retry Cordon up to 10 times (in case k8s api states object changed) + - when retries exhausted: NodeDrainStatus -> FAILEDCORDON + - NodeDrainStatus -> CORDONED + - Retry Drain up to 5 times + - between tries: NodeDrainStatus -> DRAINRETRYING + - on failure: NodeDrainStatus -> FAILEDDRAIN + - NodeDrainStatus -> COMPLETE + +## Cluster Leader Election + +In the cluster mode, each node/device will report its own info and metrics as in KVM image, the cluster also needs to report the kubernetes stats as described above. There is no need for every node to report this. The cluster elects a leader as the cluster reporter. If the node can access the cluster, and it can get the configuration successfully from the controller, then it is eligible for participate in the election. + +the election request is to request a 'lease' for the name "eve-kube-stats-leader" in the cluster with name space of 'eve-kube-app' for the node name of this device. If a node is a leader and lose the connection to the cluster, it will time out the 'lease', and next available node will be elected. If the node can not get the configuration from the controller, then it will stop the participation of the election. + +If all the nodes in cluster are not connected to the cloud, then there is no need to report the stats anyway. ## Debugging @@ -147,7 +191,7 @@ The statefulset must be deleted. - baseOsHandleStatusUpdateUUID - nodedrain-step - kubevirt_node_drain_completion_time_seconds -... - zgrep 'kubevirt_node_drain_completion_time_seconds' /persist/newlog/keepSentQueue/dev.log.1725511530990.gz | jq -r .content | jq -r .msg | cut -d ':' -f 2 - s34.559219 -... + ... + zgrep 'kubevirt_node_drain_completion_time_seconds' /persist/newlog/keepSentQueue/dev.log.1725511530990.gz | jq -r .content | jq -r .msg | cut -d ':' -f 2 + s34.559219 + ... diff --git a/pkg/pillar/dpcmanager/dpcmanager.go b/pkg/pillar/dpcmanager/dpcmanager.go index dca125f4bb..3be72c2bc6 100644 --- a/pkg/pillar/dpcmanager/dpcmanager.go +++ b/pkg/pillar/dpcmanager/dpcmanager.go @@ -97,6 +97,7 @@ type DpcManager struct { enableLastResort bool devUUID uuid.UUID flowlogEnabled bool + clusterStatus types.EdgeNodeClusterStatus // Boot-time configuration dpclPresentAtBoot bool @@ -191,17 +192,19 @@ const ( commandUpdateDevUUID commandProcessWwanStatus commandUpdateFlowlogState + commandUpdateClusterStatus ) type inputCommand struct { cmd command - dpc types.DevicePortConfig // for commandAddDPC and commandDelDPC - gcp types.ConfigItemValueMap // for commandUpdateGCP - aa types.AssignableAdapters // for commandUpdateAA - rs types.RadioSilence // for commandUpdateRS - devUUID uuid.UUID // for commandUpdateDevUUID - wwanStatus types.WwanStatus // for commandProcessWwanStatus - flowlogEnabled bool // for commandUpdateFlowlogState + dpc types.DevicePortConfig // for commandAddDPC and commandDelDPC + gcp types.ConfigItemValueMap // for commandUpdateGCP + aa types.AssignableAdapters // for commandUpdateAA + rs types.RadioSilence // for commandUpdateRS + devUUID uuid.UUID // for commandUpdateDevUUID + wwanStatus types.WwanStatus // for commandProcessWwanStatus + flowlogEnabled bool // for commandUpdateFlowlogState + clusterStatus types.EdgeNodeClusterStatus // for commandUpdateClusterStatus } type dpcVerify struct { @@ -276,6 +279,8 @@ func (m *DpcManager) run(ctx context.Context) { m.processWwanStatus(ctx, inputCmd.wwanStatus) case commandUpdateFlowlogState: m.doUpdateFlowlogState(ctx, inputCmd.flowlogEnabled) + case commandUpdateClusterStatus: + m.doUpdateClusterStatus(ctx, inputCmd.clusterStatus) } m.resumeVerifyIfAsyncDone(ctx) @@ -412,6 +417,7 @@ func (m *DpcManager) reconcilerArgs() dpcreconciler.Args { AA: m.adapters, RS: m.rsConfig, FlowlogEnabled: m.flowlogEnabled, + ClusterStatus: m.clusterStatus, } if m.currentDPC() != nil { args.DPC = *m.currentDPC() @@ -491,6 +497,14 @@ func (m *DpcManager) UpdateFlowlogState(flowlogEnabled bool) { } } +// UpdateClusterStatus : apply an updated cluster status. +func (m *DpcManager) UpdateClusterStatus(status types.EdgeNodeClusterStatus) { + m.inputCommands <- inputCommand{ + cmd: commandUpdateClusterStatus, + clusterStatus: status, + } +} + // GetDNS returns device network state information. func (m *DpcManager) GetDNS() types.DeviceNetworkStatus { return m.deviceNetStatus @@ -629,3 +643,9 @@ func (m *DpcManager) doUpdateFlowlogState(ctx context.Context, flowlogEnabled bo m.flowlogEnabled = flowlogEnabled m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs()) } + +func (m *DpcManager) doUpdateClusterStatus(ctx context.Context, + status types.EdgeNodeClusterStatus) { + m.clusterStatus = status + m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs()) +} diff --git a/pkg/pillar/dpcreconciler/dpcreconciler.go b/pkg/pillar/dpcreconciler/dpcreconciler.go index 86fb8723ed..44613b47c1 100644 --- a/pkg/pillar/dpcreconciler/dpcreconciler.go +++ b/pkg/pillar/dpcreconciler/dpcreconciler.go @@ -32,6 +32,8 @@ type Args struct { GCP types.ConfigItemValueMap // True if flow logging is enabled in at least one network instance. FlowlogEnabled bool + // Cluster network status used when edge node is part of a kubernetes cluster. + ClusterStatus types.EdgeNodeClusterStatus } // ReconcileStatus : state data related to config reconciliation. diff --git a/pkg/pillar/dpcreconciler/linux.go b/pkg/pillar/dpcreconciler/linux.go index 43ce725e67..a147ab5a2f 100644 --- a/pkg/pillar/dpcreconciler/linux.go +++ b/pkg/pillar/dpcreconciler/linux.go @@ -304,7 +304,8 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) { } } if ev.Deleted { - changed := r.updateCurrentRoutes(r.lastArgs.DPC) + changed := r.updateCurrentRoutes(r.lastArgs.DPC, + r.lastArgs.ClusterStatus) if changed { r.addPendingReconcile( L3SG, "interface delete triggered route change", true) @@ -315,13 +316,14 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) { if changed { r.addPendingReconcile(L3SG, "address change", true) } - changed = r.updateCurrentRoutes(r.lastArgs.DPC) + changed = r.updateCurrentRoutes(r.lastArgs.DPC, r.lastArgs.ClusterStatus) if changed { r.addPendingReconcile(L3SG, "address change triggered route change", true) } case netmonitor.DNSInfoChange: - newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC) + newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC, + r.lastArgs.ClusterStatus) prevGlobalCfg := r.intendedState.SubGraph(GlobalSG) if len(prevGlobalCfg.DiffItems(newGlobalCfg)) > 0 { r.addPendingReconcile(GlobalSG, "DNS info change", true) @@ -414,6 +416,10 @@ func (r *LinuxDpcReconciler) Reconcile(ctx context.Context, args Args) Reconcile if r.flowlogStateChanged(args.FlowlogEnabled) { r.addPendingReconcile(ACLsSG, "Flowlog state change", false) } + if r.clusterStatusChanged(args.ClusterStatus) { + // Reconcile all items. + r.addPendingReconcile(GraphName, "Cluster status change", false) + } } if r.pendingReconcile.isPending { reconcileSG = r.pendingReconcile.forSubGraph @@ -445,7 +451,7 @@ func (r *LinuxDpcReconciler) Reconcile(ctx context.Context, args Args) Reconcile var intSG dg.Graph switch reconcileSG { case GlobalSG: - intSG = r.getIntendedGlobalCfg(args.DPC) + intSG = r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus) case NetworkIoSG: intSG = r.getIntendedNetworkIO(args.DPC) case PhysicalIfsSG: @@ -456,11 +462,12 @@ func (r *LinuxDpcReconciler) Reconcile(ctx context.Context, args Args) Reconcile intSG = r.getIntendedLogicalIO(args.DPC) case L3SG: r.rebuildMTUMap(args.DPC) - intSG = r.getIntendedL3Cfg(args.DPC) + intSG = r.getIntendedL3Cfg(args.DPC, args.ClusterStatus) case WirelessSG: intSG = r.getIntendedWirelessCfg(args.DPC, args.AA, args.RS) case ACLsSG: - intSG = r.getIntendedACLs(args.DPC, args.GCP, args.FlowlogEnabled) + intSG = r.getIntendedACLs(args.DPC, args.ClusterStatus, args.GCP, + args.FlowlogEnabled) default: // Only these top-level subgraphs are used for selective-reconcile for now. r.Log.Fatalf("Unexpected SG select for reconcile: %s", reconcileSG) @@ -672,6 +679,14 @@ func (r *LinuxDpcReconciler) flowlogStateChanged(flowlogEnabled bool) bool { return r.lastArgs.FlowlogEnabled != flowlogEnabled } +func (r *LinuxDpcReconciler) clusterStatusChanged( + newStatus types.EdgeNodeClusterStatus) bool { + // DPCReconciler cares only about the networking-related fields of the ClusterStatus. + return r.lastArgs.ClusterStatus.ClusterInterface != newStatus.ClusterInterface || + !netutils.EqualIPNets(r.lastArgs.ClusterStatus.ClusterIPPrefix, + newStatus.ClusterIPPrefix) +} + func (r *LinuxDpcReconciler) updateCurrentState(args Args) (changed bool) { if r.currentState == nil { // Initialize only subgraphs with external items. @@ -690,7 +705,7 @@ func (r *LinuxDpcReconciler) updateCurrentState(args Args) (changed bool) { if addrsChanged := r.updateCurrentAdapterAddrs(args.DPC); addrsChanged { changed = true } - if routesChanged := r.updateCurrentRoutes(args.DPC); routesChanged { + if routesChanged := r.updateCurrentRoutes(args.DPC, args.ClusterStatus); routesChanged { changed = true } return changed @@ -779,7 +794,8 @@ func (r *LinuxDpcReconciler) updateCurrentAdapterAddrs( return false } -func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (changed bool) { +func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) (changed bool) { sgPath := dg.NewSubGraphPath(L3SG, RoutesSG) currentRoutes := dg.New(dg.InitArgs{Name: RoutesSG}) for _, port := range dpc.Ports { @@ -806,6 +822,20 @@ func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (ch r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d: %v", ifIndex, err) } + if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel { + k3sSvcRoutes, err := r.NetworkMonitor.ListRoutes(netmonitor.RouteFilters{ + FilterByTable: true, + Table: devicenetwork.KubeSvcRT, + FilterByIf: true, + IfIndex: ifIndex, + }) + if err == nil { + routes = append(routes, k3sSvcRoutes...) + } else { + r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d "+ + "and the KubeSvc table: %v", ifIndex, err) + } + } for _, rt := range routes { currentRoutes.PutItem(linux.Route{ Route: rt.Data.(netlink.Route), @@ -831,16 +861,18 @@ func (r *LinuxDpcReconciler) updateIntendedState(args Args) { Description: "Device Connectivity provided using Linux network stack", } r.intendedState = dg.New(graphArgs) - r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC)) + r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus)) r.intendedState.PutSubGraph(r.getIntendedNetworkIO(args.DPC)) r.intendedState.PutSubGraph(r.getIntendedPhysicalIfs(args.DPC)) r.intendedState.PutSubGraph(r.getIntendedLogicalIO(args.DPC)) - r.intendedState.PutSubGraph(r.getIntendedL3Cfg(args.DPC)) + r.intendedState.PutSubGraph(r.getIntendedL3Cfg(args.DPC, args.ClusterStatus)) r.intendedState.PutSubGraph(r.getIntendedWirelessCfg(args.DPC, args.AA, args.RS)) - r.intendedState.PutSubGraph(r.getIntendedACLs(args.DPC, args.GCP, args.FlowlogEnabled)) + r.intendedState.PutSubGraph(r.getIntendedACLs(args.DPC, args.ClusterStatus, args.GCP, + args.FlowlogEnabled)) } -func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: GlobalSG, Description: "Global configuration", @@ -857,10 +889,14 @@ func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg Priority: devicenetwork.PbrKubeNetworkPrio, Table: unix.RT_TABLE_MAIN, }, nil) + tableForKubeSvc := unix.RT_TABLE_MAIN + if clusterStatus.ClusterInterface != "" { + tableForKubeSvc = devicenetwork.KubeSvcRT + } intendedCfg.PutItem(linux.IPRule{ Dst: kubeSvcCIDR, Priority: devicenetwork.PbrKubeNetworkPrio, - Table: unix.RT_TABLE_MAIN, + Table: tableForKubeSvc, }, nil) } if len(dpc.Ports) == 0 { @@ -1053,20 +1089,22 @@ func (r *LinuxDpcReconciler) getIntendedLogicalIO(dpc types.DevicePortConfig) dg return intendedIO } -func (r *LinuxDpcReconciler) getIntendedL3Cfg(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedL3Cfg(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: L3SG, Description: "Network Layer3 configuration", } intendedL3 := dg.New(graphArgs) - intendedL3.PutSubGraph(r.getIntendedAdapters(dpc)) + intendedL3.PutSubGraph(r.getIntendedAdapters(dpc, clusterStatus)) intendedL3.PutSubGraph(r.getIntendedSrcIPRules(dpc)) - intendedL3.PutSubGraph(r.getIntendedRoutes(dpc)) + intendedL3.PutSubGraph(r.getIntendedRoutes(dpc, clusterStatus)) intendedL3.PutSubGraph(r.getIntendedArps(dpc)) return intendedL3 } -func (r *LinuxDpcReconciler) getIntendedAdapters(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedAdapters(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: AdaptersSG, Description: "L3 configuration assigned to network interfaces", @@ -1082,6 +1120,13 @@ func (r *LinuxDpcReconciler) getIntendedAdapters(dpc types.DevicePortConfig) dg. if !port.IsL3Port || port.IfName == "" || port.InvalidConfig { continue } + var staticIPs []*net.IPNet + if r.HVTypeKube { + if port.Logicallabel == clusterStatus.ClusterInterface && + clusterStatus.ClusterIPPrefix != nil { + staticIPs = append(staticIPs, clusterStatus.ClusterIPPrefix) + } + } adapter := linux.Adapter{ LogicalLabel: port.Logicallabel, IfName: port.IfName, @@ -1090,6 +1135,7 @@ func (r *LinuxDpcReconciler) getIntendedAdapters(dpc types.DevicePortConfig) dg. UsedAsVlanParent: dpc.IsPortUsedAsVlanParent(port.Logicallabel), DhcpType: port.Dhcp, MTU: r.intfMTU[port.Logicallabel], + StaticIPs: staticIPs, } intendedAdapters.PutItem(adapter, nil) if port.Dhcp != types.DhcpTypeNone && @@ -1152,7 +1198,8 @@ func (r *LinuxDpcReconciler) getIntendedSrcIPRules(dpc types.DevicePortConfig) d return intendedRules } -func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: RoutesSG, Description: "IP routes", @@ -1194,6 +1241,29 @@ func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Gr AdapterLL: port.Logicallabel, }, nil) } + if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel && + clusterStatus.ClusterIPPrefix != nil { + // Ensure that packets destined for K3s services do not use the default route, + // but are instead routed through the cluster port. This guarantees that traffic + // handled by kube-proxy is properly SNATed to the cluster IP. That's the theory + // at least. We're not entirely certain. Without this route, however, + // some Longhorn pods fail to access K3s services when the cluster IP is configured + // on a non-default port. + intendedRoutes.PutItem(linux.Route{ + Route: netlink.Route{ + LinkIndex: ifIndex, + Family: netlink.FAMILY_V4, + Scope: netlink.SCOPE_UNIVERSE, + Protocol: unix.RTPROT_STATIC, + Type: unix.RTN_UNICAST, + Dst: kubeSvcCIDR, + Gw: clusterStatus.ClusterIPPrefix.IP, + Table: devicenetwork.KubeSvcRT, + }, + AdapterIfName: port.IfName, + AdapterLL: port.Logicallabel, + }, nil) + } } return intendedRoutes } @@ -1497,8 +1567,9 @@ func (r *LinuxDpcReconciler) getIntendedWwanConfig(dpc types.DevicePortConfig, return generic.Wwan{Config: config} } -func (r *LinuxDpcReconciler) getIntendedACLs( - dpc types.DevicePortConfig, gcp types.ConfigItemValueMap, withFlowlog bool) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedACLs(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus, gcp types.ConfigItemValueMap, + withFlowlog bool) dg.Graph { graphArgs := dg.InitArgs{ Name: ACLsSG, Description: "Device-wide ACLs", @@ -1581,7 +1652,7 @@ func (r *LinuxDpcReconciler) getIntendedACLs( } } - r.getIntendedFilterRules(gcp, dpc, intendedIPv4ACLs, intendedIPv6ACLs) + r.getIntendedFilterRules(gcp, dpc, clusterStatus, intendedIPv4ACLs, intendedIPv6ACLs) if withFlowlog { r.getIntendedMarkingRules(dpc, intendedIPv4ACLs, intendedIPv6ACLs) } @@ -1589,7 +1660,8 @@ func (r *LinuxDpcReconciler) getIntendedACLs( } func (r *LinuxDpcReconciler) getIntendedFilterRules(gcp types.ConfigItemValueMap, - dpc types.DevicePortConfig, intendedIPv4ACLs, intendedIPv6ACLs dg.Graph) { + dpc types.DevicePortConfig, clusterStatus types.EdgeNodeClusterStatus, intendedIPv4ACLs, + intendedIPv6ACLs dg.Graph) { // Prepare filter/INPUT rules. var inputV4Rules, inputV6Rules []iptables.Rule @@ -1720,6 +1792,62 @@ func (r *LinuxDpcReconciler) getIntendedFilterRules(gcp types.ConfigItemValueMap icmpV6Rule.MatchOpts = []string{"-p", "ipv6-icmp"} inputV6Rules = append(inputV6Rules, icmpV6Rule) + clusterPort := dpc.LookupPortByLogicallabel(clusterStatus.ClusterInterface) + if r.HVTypeKube && clusterPort != nil && !clusterPort.InvalidConfig && + clusterPort.IfName != "" && clusterStatus.ClusterIPPrefix != nil { + // LookupExtInterface in k3s/pkg/agent/flannel/flannel.go will pick + // whatever the first IP address is returned by netlink for the cluster + // interface. This means that VXLAN tunnel may be configured with EVE + // mgmt/app-shared IP instead of the cluster IP and we have to allow it. + // Therefore, we do not use "-d" filter for the VXLAN rule. + vxlanRule := iptables.Rule{ + RuleLabel: "Allow VXLAN", + MatchOpts: []string{"-p", "udp", "-i", clusterPort.IfName, + "--dport", "8472"}, + Target: "ACCEPT", + Description: "Allow VXLAN-encapsulated traffic to enter the device " + + "via cluster interface", + } + etcdRule := iptables.Rule{ + RuleLabel: "Allow etcd traffic", + MatchOpts: []string{"-p", "tcp", "-i", clusterPort.IfName, + "-d", clusterStatus.ClusterIPPrefix.IP.String(), "--dport", "2379:2380"}, + Target: "ACCEPT", + Description: "Allow etcd client and server-to-server communication", + } + k3sMetricsRule := iptables.Rule{ + RuleLabel: "Allow K3s metrics", + MatchOpts: []string{"-p", "tcp", "-i", clusterPort.IfName, + "-d", clusterStatus.ClusterIPPrefix.IP.String(), "--dport", "10250"}, + Target: "ACCEPT", + Description: "Allow traffic carrying K3s metrics to enter the device " + + "via cluster interface", + } + k3sAPIServerRule := iptables.Rule{ + RuleLabel: "Allow K3s API requests", + MatchOpts: []string{"-p", "tcp", "-i", clusterPort.IfName, + "-d", clusterStatus.ClusterIPPrefix.IP.String(), "--dport", "6443"}, + Target: "ACCEPT", + Description: "Allow K3s API requests to enter the device " + + "via cluster interface", + } + clusterStatusRule := iptables.Rule{ + RuleLabel: "Allow access to Cluster Status", + MatchOpts: []string{"-p", "tcp", "-i", clusterPort.IfName, + "-d", clusterStatus.ClusterIPPrefix.IP.String(), "--dport", "12346"}, + Target: "ACCEPT", + Description: "Allow access to Cluster Status via cluster interface", + } + forIPv6 := clusterStatus.ClusterIPPrefix.IP.To4() == nil + if forIPv6 { + inputV6Rules = append(inputV6Rules, vxlanRule, etcdRule, + k3sMetricsRule, k3sAPIServerRule, clusterStatusRule) + } else { + inputV4Rules = append(inputV4Rules, vxlanRule, etcdRule, + k3sMetricsRule, k3sAPIServerRule, clusterStatusRule) + } + } + // Allow all traffic that belongs to an already established connection. allowEstablishedConn := iptables.Rule{ RuleLabel: "Allow established connection", diff --git a/pkg/pillar/dpcreconciler/linuxitems/adapter.go b/pkg/pillar/dpcreconciler/linuxitems/adapter.go index 9dfdd37396..33888de274 100644 --- a/pkg/pillar/dpcreconciler/linuxitems/adapter.go +++ b/pkg/pillar/dpcreconciler/linuxitems/adapter.go @@ -13,6 +13,8 @@ import ( "github.com/lf-edge/eve/pkg/pillar/dpcreconciler/genericitems" "github.com/lf-edge/eve/pkg/pillar/netmonitor" "github.com/lf-edge/eve/pkg/pillar/types" + "github.com/lf-edge/eve/pkg/pillar/utils/generics" + "github.com/lf-edge/eve/pkg/pillar/utils/netutils" "github.com/vishvananda/netlink" ) @@ -39,6 +41,8 @@ type Adapter struct { DhcpType types.DhcpType // MTU : Maximum transmission unit size. MTU uint16 + // StaticIPs : IP addresses assigned to the adapter statically. + StaticIPs []*net.IPNet } // Name uses the interface name to identify the adapter. @@ -63,7 +67,8 @@ func (a Adapter) Equal(other depgraph.Item) bool { a.WirelessType == a2.WirelessType && a.UsedAsVlanParent == a2.UsedAsVlanParent && a.DhcpType == a2.DhcpType && - a.MTU == a2.MTU + a.MTU == a2.MTU && + generics.EqualSetsFn(a.StaticIPs, a2.StaticIPs, netutils.EqualIPNets) } // External returns false. @@ -153,8 +158,12 @@ func (c *AdapterConfigurator) Create(ctx context.Context, item depgraph.Item) er c.Log.Error(err) return err } - // Do not proceed with bridging the adapter, just set the MTU. - return c.setAdapterMTU(adapter, link) + // Do not proceed with bridging the adapter, just set the MTU and static IPs. + err = c.setAdapterMTU(adapter, link) + if err != nil { + return err + } + return c.updateAdapterStaticIPs(link, adapter.StaticIPs, nil) } kernIfname := "k" + adapter.IfName _, err = netlink.LinkByName(kernIfname) @@ -233,7 +242,8 @@ func (c *AdapterConfigurator) Create(ctx context.Context, item depgraph.Item) er c.Log.Error(err) return err } - return nil + // Finally, assign statically configured IPs. + return c.updateAdapterStaticIPs(bridge, adapter.StaticIPs, nil) } // Return true if NIM is responsible for creating a Linux bridge for the adapter. @@ -263,6 +273,30 @@ func (c *AdapterConfigurator) setAdapterMTU(adapter Adapter, link netlink.Link) return nil } +func (c *AdapterConfigurator) updateAdapterStaticIPs(link netlink.Link, + newIPs, prevIPs []*net.IPNet) error { + newIPs, obsoleteIPs := generics.DiffSetsFn(newIPs, prevIPs, netutils.EqualIPNets) + for _, ipNet := range obsoleteIPs { + addr := &netlink.Addr{IPNet: ipNet} + if err := netlink.AddrDel(link, addr); err != nil { + err = fmt.Errorf("failed to del addr %v from adapter %s: %v", + ipNet, link.Attrs().Name, err) + c.Log.Error(err) + return err + } + } + for _, ipNet := range newIPs { + addr := &netlink.Addr{IPNet: ipNet} + if err := netlink.AddrAdd(link, addr); err != nil { + err = fmt.Errorf("failed to add addr %v to adapter %s: %v", + ipNet, link.Attrs().Name, err) + c.Log.Error(err) + return err + } + } + return nil +} + // Create alternate MAC address with the group bit toggled. func (c *AdapterConfigurator) alternativeMAC(mac net.HardwareAddr) net.HardwareAddr { var altMacAddr net.HardwareAddr @@ -274,8 +308,12 @@ func (c *AdapterConfigurator) alternativeMAC(mac net.HardwareAddr) net.HardwareA return altMacAddr } -// Modify is able to update the MTU attribute. -func (c *AdapterConfigurator) Modify(_ context.Context, _, newItem depgraph.Item) (err error) { +// Modify is able to update the MTU attribute and the set of static IPs. +func (c *AdapterConfigurator) Modify(_ context.Context, oldItem, newItem depgraph.Item) (err error) { + oldAdapter, isAdapter := oldItem.(Adapter) + if !isAdapter { + return fmt.Errorf("invalid item type %T, expected Adapter", newItem) + } adapter, isAdapter := newItem.(Adapter) if !isAdapter { return fmt.Errorf("invalid item type %T, expected Adapter", newItem) @@ -290,15 +328,30 @@ func (c *AdapterConfigurator) Modify(_ context.Context, _, newItem depgraph.Item c.Log.Error(err) return err } - return c.setAdapterMTU(adapter, adapterLink) + err = c.setAdapterMTU(adapter, adapterLink) + if err != nil { + return err + } + return c.updateAdapterStaticIPs(adapterLink, adapter.StaticIPs, oldAdapter.StaticIPs) } // Delete undoes Create - i.e. moves MAC address and ifName back to the interface // and removes the bridge. func (c *AdapterConfigurator) Delete(ctx context.Context, item depgraph.Item) error { adapter := item.(Adapter) + // First, remove all statically assigned IPs. + adapterLink, err := netlink.LinkByName(adapter.IfName) + if err != nil { + err = fmt.Errorf("failed to get adapter %s link: %v", adapter.IfName, err) + c.Log.Error(err) + return err + } + err = c.updateAdapterStaticIPs(adapterLink, nil, adapter.StaticIPs) + if err != nil { + return err + } if !c.isAdapterBridgedByNIM(adapter) { - // Adapter is not bridged by NIM, nothing to undo here. + // Adapter is not bridged by NIM, nothing else to undo here. return nil } // After removing/renaming interfaces it is best to clear the cache. @@ -318,11 +371,8 @@ func (c *AdapterConfigurator) Delete(ctx context.Context, item depgraph.Item) er c.Log.Error(err) return err } - // delete bridge link - attrs := netlink.NewLinkAttrs() - attrs.Name = adapter.IfName - bridge := &netlink.Bridge{LinkAttrs: attrs} - if err := netlink.LinkDel(bridge); err != nil { + // Delete the bridge interface. + if err := netlink.LinkDel(adapterLink); err != nil { err = fmt.Errorf("netlink.LinkDel(%s) failed: %v", adapter.IfName, err) c.Log.Error(err) @@ -358,7 +408,7 @@ func (c *AdapterConfigurator) Delete(ctx context.Context, item depgraph.Item) er } // NeedsRecreate returns true if L2Type or WirelessType have changed. -// On the other hand, Modify is able to update the MTU attribute. +// On the other hand, Modify is able to update the MTU attribute and the set of static IPs. func (c *AdapterConfigurator) NeedsRecreate(oldItem, newItem depgraph.Item) (recreate bool) { oldCfg, isAdapter := oldItem.(Adapter) if !isAdapter { diff --git a/pkg/pillar/go.sum b/pkg/pillar/go.sum index d7f2e324ea..c8ef8ff53e 100644 --- a/pkg/pillar/go.sum +++ b/pkg/pillar/go.sum @@ -994,6 +994,8 @@ github.com/eriknordmark/ipinfo v0.0.0-20230728132417-2d8f4da903d7/go.mod h1:m5kR github.com/eshard/uevent v1.0.2-0.20220110110621-d8d2be286cec h1:6yjNScIckOkANX2IF3j67YJL93cgfan72CJcwkyRsPA= github.com/eshard/uevent v1.0.2-0.20220110110621-d8d2be286cec/go.mod h1:fgPg/9WMh1CvD0dIh08gjDdrgLueDUJH7aPzM0932CY= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= +github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/facebook/time v0.0.0-20240605113323-bdee26e8523f h1:qEpaI5a4QYn7voX3z4pa4Fkyuk7PQ7CUHpargUHZkYE= github.com/facebook/time v0.0.0-20240605113323-bdee26e8523f/go.mod h1:2UFAomOuD2vAK1x68czUtCVjAqmyWCEnAXOlmGqf+G0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= diff --git a/pkg/pillar/kubeapi/kubeapi.go b/pkg/pillar/kubeapi/kubeapi.go index d8d6a5d47a..fecfb10031 100644 --- a/pkg/pillar/kubeapi/kubeapi.go +++ b/pkg/pillar/kubeapi/kubeapi.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "os" + "reflect" "strings" "time" @@ -125,26 +126,48 @@ func GetKubevirtClientSet(kubeconfig *rest.Config) (KubevirtClientset, error) { */ // WaitForKubernetes : Wait until kubernetes server is ready -func WaitForKubernetes(agentName string, ps *pubsub.PubSub, stillRunning *time.Ticker) error { - checkTimer := time.NewTimer(5 * time.Second) - configFileExist := false +func WaitForKubernetes(agentName string, ps *pubsub.PubSub, stillRunning *time.Ticker, + alsoWatch ...pubsub.ChannelWatch) (err error) { + + var watches []pubsub.ChannelWatch + stillRunningWatch := pubsub.ChannelWatch{ + Chan: reflect.ValueOf(stillRunning.C), + Callback: func(_ interface{}) (exit bool) { + ps.StillRunning(agentName, warningTime, errorTime) + return false + }, + } + watches = append(watches, stillRunningWatch) var config *rest.Config - // wait until the Kubernetes server is started - for !configFileExist { - select { - case <-checkTimer.C: + checkTicker := time.NewTicker(5 * time.Second) + startTime := time.Now() + const maxWaitTime = 10 * time.Minute + watches = append(watches, pubsub.ChannelWatch{ + Chan: reflect.ValueOf(checkTicker.C), + Callback: func(_ interface{}) (exit bool) { + currentTime := time.Now() + if currentTime.Sub(startTime) > maxWaitTime { + err = fmt.Errorf("time exceeded 10 minutes") + return true + } if _, err := os.Stat(EVEkubeConfigFile); err == nil { config, err = GetKubeConfig() if err == nil { - configFileExist = true - break + return true } } - checkTimer = time.NewTimer(5 * time.Second) - case <-stillRunning.C: - } - ps.StillRunning(agentName, warningTime, errorTime) + return false + }, + }) + + watches = append(watches, alsoWatch...) + + // wait until the Kubernetes server is started + pubsub.MultiChannelWatch(watches) + + if err != nil { + return err } client, err := kubernetes.NewForConfig(config) @@ -161,17 +184,16 @@ func WaitForKubernetes(agentName string, ps *pubsub.PubSub, stillRunning *time.T readyCh := make(chan bool) go waitForNodeReady(client, readyCh, devUUID) - kubeNodeReady := false - for !kubeNodeReady { - select { - case <-readyCh: - kubeNodeReady = true - break - case <-stillRunning.C: - } - ps.StillRunning(agentName, warningTime, errorTime) - } - + watches = nil + watches = append(watches, stillRunningWatch) + watches = append(watches, pubsub.ChannelWatch{ + Chan: reflect.ValueOf(readyCh), + Callback: func(_ interface{}) (exit bool) { + return true + }, + }) + watches = append(watches, alsoWatch...) + pubsub.MultiChannelWatch(watches) return nil } diff --git a/pkg/pillar/kubeapi/nokube.go b/pkg/pillar/kubeapi/nokube.go index f8f7d86ac5..c090bd9f87 100644 --- a/pkg/pillar/kubeapi/nokube.go +++ b/pkg/pillar/kubeapi/nokube.go @@ -14,8 +14,8 @@ import ( ) // WaitForKubernetes in this file is just stub for non-kubevirt hypervisors. -func WaitForKubernetes( - string, *pubsub.PubSub, *time.Ticker) error { +func WaitForKubernetes(string, *pubsub.PubSub, *time.Ticker, + ...pubsub.ChannelWatch) error { panic("WaitForKubernetes is not built") } diff --git a/pkg/pillar/pubsub/util.go b/pkg/pillar/pubsub/util.go index 28c857c926..34606c96e0 100644 --- a/pkg/pillar/pubsub/util.go +++ b/pkg/pillar/pubsub/util.go @@ -111,8 +111,9 @@ func ConnReadCheck(conn net.Conn) error { type ChannelWatch struct { // Chan is the channel to watch for incoming data Chan reflect.Value - // Callback is the function to call with that data (or empty if no data) - Callback func(value interface{}) + // Callback is the function to call with that data (or empty if no data). + // Return true to terminate MultiChannelWatch. + Callback func(value interface{}) (exitWatch bool) } // MultiChannelWatch allows listening to several receiving channels of different types at the same time @@ -130,10 +131,31 @@ func MultiChannelWatch(watches []ChannelWatch) { for { index, value, _ := reflect.Select(cases) if value.CanInterface() { - watches[index].Callback(value.Interface()) + exit := watches[index].Callback(value.Interface()) + if exit { + return + } } else { - watches[index].Callback(struct{}{}) + exit := watches[index].Callback(struct{}{}) + if exit { + return + } } } +} +// WatchAndProcessSubChanges returns ChannelWatch for use with MultiChannelWatch, +// which simply watches for subscription changes and calls Subscription.ProcessChange() +// to process each. +func WatchAndProcessSubChanges(sub Subscription) ChannelWatch { + return ChannelWatch{ + Chan: reflect.ValueOf(sub.MsgChan()), + Callback: func(value interface{}) (exit bool) { + change, ok := value.(Change) + if ok { + sub.ProcessChange(change) + } + return false + }, + } } diff --git a/pkg/pillar/types/clustertypes.go b/pkg/pillar/types/clustertypes.go index b8ccd8ee73..f3a00af8e7 100644 --- a/pkg/pillar/types/clustertypes.go +++ b/pkg/pillar/types/clustertypes.go @@ -80,6 +80,10 @@ type EdgeNodeClusterStatus struct { // This node needs to be up first before other nodes can join the cluster. This BootstrapNode // will own the 'JoinServerIP' on it's cluster interface. BootstrapNode bool + // EncryptedClusterToken - for kubernetes cluster server token + // This token string is the decrypted from the CipherBlock in the EdgeNodeClusterConfig + // by zedkube using the Controller and Edge-node certificates. See decryptClusterToken() + EncryptedClusterToken string Error ErrorDescription } diff --git a/pkg/pillar/types/zedmanagertypes.go b/pkg/pillar/types/zedmanagertypes.go index 816b2aa4a6..b86faf4101 100644 --- a/pkg/pillar/types/zedmanagertypes.go +++ b/pkg/pillar/types/zedmanagertypes.go @@ -143,6 +143,10 @@ type AppInstanceConfig struct { // allow AppInstance to discover other AppInstances attached to its network instances AllowToDiscover bool + + // XXX Cluster Designated Node Id + // temp, this will be changed to bool in later PR + DesignatedNodeID uuid.UUID } type AppInstanceOpsCmd struct { diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/OWNERS b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/OWNERS new file mode 100644 index 0000000000..908bdacdfe --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/OWNERS @@ -0,0 +1,11 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - mikedanese +reviewers: + - wojtek-t + - deads2k + - mikedanese + - ingvagabund +emeritus_approvers: + - timothysc diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go new file mode 100644 index 0000000000..b935372919 --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go @@ -0,0 +1,69 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "net/http" + "sync" + "time" +) + +// HealthzAdaptor associates the /healthz endpoint with the LeaderElection object. +// It helps deal with the /healthz endpoint being set up prior to the LeaderElection. +// This contains the code needed to act as an adaptor between the leader +// election code the health check code. It allows us to provide health +// status about the leader election. Most specifically about if the leader +// has failed to renew without exiting the process. In that case we should +// report not healthy and rely on the kubelet to take down the process. +type HealthzAdaptor struct { + pointerLock sync.Mutex + le *LeaderElector + timeout time.Duration +} + +// Name returns the name of the health check we are implementing. +func (l *HealthzAdaptor) Name() string { + return "leaderElection" +} + +// Check is called by the healthz endpoint handler. +// It fails (returns an error) if we own the lease but had not been able to renew it. +func (l *HealthzAdaptor) Check(req *http.Request) error { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + if l.le == nil { + return nil + } + return l.le.Check(l.timeout) +} + +// SetLeaderElection ties a leader election object to a HealthzAdaptor +func (l *HealthzAdaptor) SetLeaderElection(le *LeaderElector) { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + l.le = le +} + +// NewLeaderHealthzAdaptor creates a basic healthz adaptor to monitor a leader election. +// timeout determines the time beyond the lease expiry to be allowed for timeout. +// checks within the timeout period after the lease expires will still return healthy. +func NewLeaderHealthzAdaptor(timeout time.Duration) *HealthzAdaptor { + result := &HealthzAdaptor{ + timeout: timeout, + } + return result +} diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go new file mode 100644 index 0000000000..c64ba9b26b --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -0,0 +1,418 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package leaderelection implements leader election of a set of endpoints. +// It uses an annotation in the endpoints object to store the record of the +// election state. This implementation does not guarantee that only one +// client is acting as a leader (a.k.a. fencing). +// +// A client only acts on timestamps captured locally to infer the state of the +// leader election. The client does not consider timestamps in the leader +// election record to be accurate because these timestamps may not have been +// produced by a local clock. The implemention does not depend on their +// accuracy and only uses their change to indicate that another client has +// renewed the leader lease. Thus the implementation is tolerant to arbitrary +// clock skew, but is not tolerant to arbitrary clock skew rate. +// +// However the level of tolerance to skew rate can be configured by setting +// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a +// maximum tolerated ratio of time passed on the fastest node to time passed on +// the slowest node can be approximately achieved with a configuration that sets +// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted +// to tolerate some nodes progressing forward in time twice as fast as other nodes, +// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds. +// +// While not required, some method of clock synchronization between nodes in the +// cluster is highly recommended. It's important to keep in mind when configuring +// this client that the tolerance to skew rate varies inversely to master +// availability. +// +// Larger clusters often have a more lenient SLA for API latency. This should be +// taken into account when configuring the client. The rate of leader transitions +// should be monitored and RetryPeriod and LeaseDuration should be increased +// until the rate is stable and acceptably low. It's important to keep in mind +// when configuring this client that the tolerance to API latency varies inversely +// to master availability. +// +// DISCLAIMER: this is an alpha API. This library will likely change significantly +// or even be removed entirely in subsequent releases. Depend on this API at +// your own risk. +package leaderelection + +import ( + "bytes" + "context" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + rl "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/utils/clock" + + "k8s.io/klog/v2" +) + +const ( + JitterFactor = 1.2 +) + +// NewLeaderElector creates a LeaderElector from a LeaderElectionConfig +func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { + if lec.LeaseDuration <= lec.RenewDeadline { + return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline") + } + if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { + return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") + } + if lec.LeaseDuration < 1 { + return nil, fmt.Errorf("leaseDuration must be greater than zero") + } + if lec.RenewDeadline < 1 { + return nil, fmt.Errorf("renewDeadline must be greater than zero") + } + if lec.RetryPeriod < 1 { + return nil, fmt.Errorf("retryPeriod must be greater than zero") + } + if lec.Callbacks.OnStartedLeading == nil { + return nil, fmt.Errorf("OnStartedLeading callback must not be nil") + } + if lec.Callbacks.OnStoppedLeading == nil { + return nil, fmt.Errorf("OnStoppedLeading callback must not be nil") + } + + if lec.Lock == nil { + return nil, fmt.Errorf("Lock must not be nil.") + } + le := LeaderElector{ + config: lec, + clock: clock.RealClock{}, + metrics: globalMetricsFactory.newLeaderMetrics(), + } + le.metrics.leaderOff(le.config.Name) + return &le, nil +} + +type LeaderElectionConfig struct { + // Lock is the resource that will be used for locking + Lock rl.Interface + + // LeaseDuration is the duration that non-leader candidates will + // wait to force acquire leadership. This is measured against time of + // last observed ack. + // + // A client needs to wait a full LeaseDuration without observing a change to + // the record before it can attempt to take over. When all clients are + // shutdown and a new set of clients are started with different names against + // the same leader record, they must wait the full LeaseDuration before + // attempting to acquire the lease. Thus LeaseDuration should be as short as + // possible (within your tolerance for clock skew rate) to avoid a possible + // long waits in the scenario. + // + // Core clients default this value to 15 seconds. + LeaseDuration time.Duration + // RenewDeadline is the duration that the acting master will retry + // refreshing leadership before giving up. + // + // Core clients default this value to 10 seconds. + RenewDeadline time.Duration + // RetryPeriod is the duration the LeaderElector clients should wait + // between tries of actions. + // + // Core clients default this value to 2 seconds. + RetryPeriod time.Duration + + // Callbacks are callbacks that are triggered during certain lifecycle + // events of the LeaderElector + Callbacks LeaderCallbacks + + // WatchDog is the associated health checker + // WatchDog may be null if it's not needed/configured. + WatchDog *HealthzAdaptor + + // ReleaseOnCancel should be set true if the lock should be released + // when the run context is cancelled. If you set this to true, you must + // ensure all code guarded by this lease has successfully completed + // prior to cancelling the context, or you may have two processes + // simultaneously acting on the critical path. + ReleaseOnCancel bool + + // Name is the name of the resource lock for debugging + Name string +} + +// LeaderCallbacks are callbacks that are triggered during certain +// lifecycle events of the LeaderElector. These are invoked asynchronously. +// +// possible future callbacks: +// - OnChallenge() +type LeaderCallbacks struct { + // OnStartedLeading is called when a LeaderElector client starts leading + OnStartedLeading func(context.Context) + // OnStoppedLeading is called when a LeaderElector client stops leading + OnStoppedLeading func() + // OnNewLeader is called when the client observes a leader that is + // not the previously observed leader. This includes the first observed + // leader when the client starts. + OnNewLeader func(identity string) +} + +// LeaderElector is a leader election client. +type LeaderElector struct { + config LeaderElectionConfig + // internal bookkeeping + observedRecord rl.LeaderElectionRecord + observedRawRecord []byte + observedTime time.Time + // used to implement OnNewLeader(), may lag slightly from the + // value observedRecord.HolderIdentity if the transition has + // not yet been reported. + reportedLeader string + + // clock is wrapper around time to allow for less flaky testing + clock clock.Clock + + // used to lock the observedRecord + observedRecordLock sync.Mutex + + metrics leaderMetricsAdapter +} + +// Run starts the leader election loop. Run will not return +// before leader election loop is stopped by ctx or it has +// stopped holding the leader lease +func (le *LeaderElector) Run(ctx context.Context) { + defer runtime.HandleCrash() + defer func() { + le.config.Callbacks.OnStoppedLeading() + }() + + if !le.acquire(ctx) { + return // ctx signalled done + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go le.config.Callbacks.OnStartedLeading(ctx) + le.renew(ctx) +} + +// RunOrDie starts a client with the provided config or panics if the config +// fails to validate. RunOrDie blocks until leader election loop is +// stopped by ctx or it has stopped holding the leader lease +func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { + le, err := NewLeaderElector(lec) + if err != nil { + panic(err) + } + if lec.WatchDog != nil { + lec.WatchDog.SetLeaderElection(le) + } + le.Run(ctx) +} + +// GetLeader returns the identity of the last observed leader or returns the empty string if +// no leader has yet been observed. +// This function is for informational purposes. (e.g. monitoring, logs, etc.) +func (le *LeaderElector) GetLeader() string { + return le.getObservedRecord().HolderIdentity +} + +// IsLeader returns true if the last observed leader was this client else returns false. +func (le *LeaderElector) IsLeader() bool { + return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity() +} + +// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. +// Returns false if ctx signals done. +func (le *LeaderElector) acquire(ctx context.Context) bool { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + succeeded := false + desc := le.config.Lock.Describe() + klog.Infof("attempting to acquire leader lease %v...", desc) + wait.JitterUntil(func() { + succeeded = le.tryAcquireOrRenew(ctx) + le.maybeReportTransition() + if !succeeded { + klog.V(4).Infof("failed to acquire lease %v", desc) + return + } + le.config.Lock.RecordEvent("became leader") + le.metrics.leaderOn(le.config.Name) + klog.Infof("successfully acquired lease %v", desc) + cancel() + }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) + return succeeded +} + +// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. +func (le *LeaderElector) renew(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + wait.Until(func() { + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) + defer timeoutCancel() + err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { + return le.tryAcquireOrRenew(timeoutCtx), nil + }, timeoutCtx.Done()) + + le.maybeReportTransition() + desc := le.config.Lock.Describe() + if err == nil { + klog.V(5).Infof("successfully renewed lease %v", desc) + return + } + le.config.Lock.RecordEvent("stopped leading") + le.metrics.leaderOff(le.config.Name) + klog.Infof("failed to renew lease %v: %v", desc, err) + cancel() + }, le.config.RetryPeriod, ctx.Done()) + + // if we hold the lease, give it up + if le.config.ReleaseOnCancel { + le.release() + } +} + +// release attempts to release the leader lease if we have acquired it. +func (le *LeaderElector) release() bool { + if !le.IsLeader() { + return true + } + now := metav1.Now() + leaderElectionRecord := rl.LeaderElectionRecord{ + LeaderTransitions: le.observedRecord.LeaderTransitions, + LeaseDurationSeconds: 1, + RenewTime: now, + AcquireTime: now, + } + if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil { + klog.Errorf("Failed to release lock: %v", err) + return false + } + + le.setObservedRecord(&leaderElectionRecord) + return true +} + +// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, +// else it tries to renew the lease if it has already been acquired. Returns true +// on success else returns false. +func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { + now := metav1.Now() + leaderElectionRecord := rl.LeaderElectionRecord{ + HolderIdentity: le.config.Lock.Identity(), + LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), + RenewTime: now, + AcquireTime: now, + } + + // 1. obtain or create the ElectionRecord + oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) + if err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + return false + } + if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { + klog.Errorf("error initially creating leader election record: %v", err) + return false + } + + le.setObservedRecord(&leaderElectionRecord) + + return true + } + + // 2. Record obtained, check the Identity & Time + if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { + le.setObservedRecord(oldLeaderElectionRecord) + + le.observedRawRecord = oldLeaderElectionRawRecord + } + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && + le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && + !le.IsLeader() { + klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) + return false + } + + // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // here. Let's correct it before updating. + if le.IsLeader() { + leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + } else { + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 + } + + // update the lock itself + if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { + klog.Errorf("Failed to update lock: %v", err) + return false + } + + le.setObservedRecord(&leaderElectionRecord) + return true +} + +func (le *LeaderElector) maybeReportTransition() { + if le.observedRecord.HolderIdentity == le.reportedLeader { + return + } + le.reportedLeader = le.observedRecord.HolderIdentity + if le.config.Callbacks.OnNewLeader != nil { + go le.config.Callbacks.OnNewLeader(le.reportedLeader) + } +} + +// Check will determine if the current lease is expired by more than timeout. +func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { + if !le.IsLeader() { + // Currently not concerned with the case that we are hot standby + return nil + } + // If we are more than timeout seconds after the lease duration that is past the timeout + // on the lease renew. Time to start reporting ourselves as unhealthy. We should have + // died but conditions like deadlock can prevent this. (See #70819) + if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease { + return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name) + } + + return nil +} + +// setObservedRecord will set a new observedRecord and update observedTime to the current time. +// Protect critical sections with lock. +func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) { + le.observedRecordLock.Lock() + defer le.observedRecordLock.Unlock() + + le.observedRecord = *observedRecord + le.observedTime = le.clock.Now() +} + +// getObservedRecord returns observersRecord. +// Protect critical sections with lock. +func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord { + le.observedRecordLock.Lock() + defer le.observedRecordLock.Unlock() + + return le.observedRecord +} diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/metrics.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/metrics.go new file mode 100644 index 0000000000..65917bf88e --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/metrics.go @@ -0,0 +1,109 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "sync" +) + +// This file provides abstractions for setting the provider (e.g., prometheus) +// of metrics. + +type leaderMetricsAdapter interface { + leaderOn(name string) + leaderOff(name string) +} + +// GaugeMetric represents a single numerical value that can arbitrarily go up +// and down. +type SwitchMetric interface { + On(name string) + Off(name string) +} + +type noopMetric struct{} + +func (noopMetric) On(name string) {} +func (noopMetric) Off(name string) {} + +// defaultLeaderMetrics expects the caller to lock before setting any metrics. +type defaultLeaderMetrics struct { + // leader's value indicates if the current process is the owner of name lease + leader SwitchMetric +} + +func (m *defaultLeaderMetrics) leaderOn(name string) { + if m == nil { + return + } + m.leader.On(name) +} + +func (m *defaultLeaderMetrics) leaderOff(name string) { + if m == nil { + return + } + m.leader.Off(name) +} + +type noMetrics struct{} + +func (noMetrics) leaderOn(name string) {} +func (noMetrics) leaderOff(name string) {} + +// MetricsProvider generates various metrics used by the leader election. +type MetricsProvider interface { + NewLeaderMetric() SwitchMetric +} + +type noopMetricsProvider struct{} + +func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric { + return noopMetric{} +} + +var globalMetricsFactory = leaderMetricsFactory{ + metricsProvider: noopMetricsProvider{}, +} + +type leaderMetricsFactory struct { + metricsProvider MetricsProvider + + onlyOnce sync.Once +} + +func (f *leaderMetricsFactory) setProvider(mp MetricsProvider) { + f.onlyOnce.Do(func() { + f.metricsProvider = mp + }) +} + +func (f *leaderMetricsFactory) newLeaderMetrics() leaderMetricsAdapter { + mp := f.metricsProvider + if mp == (noopMetricsProvider{}) { + return noMetrics{} + } + return &defaultLeaderMetrics{ + leader: mp.NewLeaderMetric(), + } +} + +// SetProvider sets the metrics provider for all subsequently created work +// queues. Only the first call has an effect. +func SetProvider(metricsProvider MetricsProvider) { + globalMetricsFactory.setProvider(metricsProvider) +} diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go new file mode 100644 index 0000000000..e811fff03c --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go @@ -0,0 +1,126 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +// TODO: This is almost a exact replica of Endpoints lock. +// going forwards as we self host more and more components +// and use ConfigMaps as the means to pass that configuration +// data we will likely move to deprecate the Endpoints lock. + +type configMapLock struct { + // ConfigMapMeta should contain a Name and a Namespace of a + // ConfigMapMeta object that the LeaderElector will attempt to lead. + ConfigMapMeta metav1.ObjectMeta + Client corev1client.ConfigMapsGetter + LockConfig ResourceLockConfig + cm *v1.ConfigMap +} + +// Get returns the election record from a ConfigMap Annotation +func (cml *configMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { + var record LeaderElectionRecord + cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + cml.cm = cm + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + recordStr, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey] + recordBytes := []byte(recordStr) + if found { + if err := json.Unmarshal(recordBytes, &record); err != nil { + return nil, nil, err + } + } + return &record, recordBytes, nil +} + +// Create attempts to create a LeaderElectionRecord annotation +func (cml *configMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cml.ConfigMapMeta.Name, + Namespace: cml.ConfigMapMeta.Namespace, + Annotations: map[string]string{ + LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }, metav1.CreateOptions{}) + return err +} + +// Update will update an existing annotation on a given resource. +func (cml *configMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error { + if cml.cm == nil { + return errors.New("configmap not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) + cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{}) + if err != nil { + return err + } + cml.cm = cm + return nil +} + +// RecordEvent in leader election while adding meta-data +func (cml *configMapLock) RecordEvent(s string) { + if cml.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) + subject := &v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta} + // Populate the type meta, so we don't have to get it from the schema + subject.Kind = "ConfigMap" + subject.APIVersion = v1.SchemeGroupVersion.String() + cml.LockConfig.EventRecorder.Eventf(subject, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (cml *configMapLock) Describe() string { + return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) +} + +// Identity returns the Identity of the lock +func (cml *configMapLock) Identity() string { + return cml.LockConfig.Identity +} diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go new file mode 100644 index 0000000000..eb36d2210a --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go @@ -0,0 +1,121 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +type endpointsLock struct { + // EndpointsMeta should contain a Name and a Namespace of an + // Endpoints object that the LeaderElector will attempt to lead. + EndpointsMeta metav1.ObjectMeta + Client corev1client.EndpointsGetter + LockConfig ResourceLockConfig + e *v1.Endpoints +} + +// Get returns the election record from a Endpoints Annotation +func (el *endpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { + var record LeaderElectionRecord + ep, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + el.e = ep + if el.e.Annotations == nil { + el.e.Annotations = make(map[string]string) + } + recordStr, found := el.e.Annotations[LeaderElectionRecordAnnotationKey] + recordBytes := []byte(recordStr) + if found { + if err := json.Unmarshal(recordBytes, &record); err != nil { + return nil, nil, err + } + } + return &record, recordBytes, nil +} + +// Create attempts to create a LeaderElectionRecord annotation +func (el *endpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: el.EndpointsMeta.Name, + Namespace: el.EndpointsMeta.Namespace, + Annotations: map[string]string{ + LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }, metav1.CreateOptions{}) + return err +} + +// Update will update and existing annotation on a given resource. +func (el *endpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error { + if el.e == nil { + return errors.New("endpoint not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + if el.e.Annotations == nil { + el.e.Annotations = make(map[string]string) + } + el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) + e, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{}) + if err != nil { + return err + } + el.e = e + return nil +} + +// RecordEvent in leader election while adding meta-data +func (el *endpointsLock) RecordEvent(s string) { + if el.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) + subject := &v1.Endpoints{ObjectMeta: el.e.ObjectMeta} + // Populate the type meta, so we don't have to get it from the schema + subject.Kind = "Endpoints" + subject.APIVersion = v1.SchemeGroupVersion.String() + el.LockConfig.EventRecorder.Eventf(subject, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (el *endpointsLock) Describe() string { + return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name) +} + +// Identity returns the Identity of the lock +func (el *endpointsLock) Identity() string { + return el.LockConfig.Identity +} diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go new file mode 100644 index 0000000000..c6e23bda16 --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go @@ -0,0 +1,227 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "context" + "fmt" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +const ( + LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader" + endpointsResourceLock = "endpoints" + configMapsResourceLock = "configmaps" + LeasesResourceLock = "leases" + // When using EndpointsLeasesResourceLock, you need to ensure that + // API Priority & Fairness is configured with non-default flow-schema + // that will catch the necessary operations on leader-election related + // endpoint objects. + // + // The example of such flow scheme could look like this: + // apiVersion: flowcontrol.apiserver.k8s.io/v1beta2 + // kind: FlowSchema + // metadata: + // name: my-leader-election + // spec: + // distinguisherMethod: + // type: ByUser + // matchingPrecedence: 200 + // priorityLevelConfiguration: + // name: leader-election # reference the PL + // rules: + // - resourceRules: + // - apiGroups: + // - "" + // namespaces: + // - '*' + // resources: + // - endpoints + // verbs: + // - get + // - create + // - update + // subjects: + // - kind: ServiceAccount + // serviceAccount: + // name: '*' + // namespace: kube-system + EndpointsLeasesResourceLock = "endpointsleases" + // When using EndpointsLeasesResourceLock, you need to ensure that + // API Priority & Fairness is configured with non-default flow-schema + // that will catch the necessary operations on leader-election related + // configmap objects. + // + // The example of such flow scheme could look like this: + // apiVersion: flowcontrol.apiserver.k8s.io/v1beta2 + // kind: FlowSchema + // metadata: + // name: my-leader-election + // spec: + // distinguisherMethod: + // type: ByUser + // matchingPrecedence: 200 + // priorityLevelConfiguration: + // name: leader-election # reference the PL + // rules: + // - resourceRules: + // - apiGroups: + // - "" + // namespaces: + // - '*' + // resources: + // - configmaps + // verbs: + // - get + // - create + // - update + // subjects: + // - kind: ServiceAccount + // serviceAccount: + // name: '*' + // namespace: kube-system + ConfigMapsLeasesResourceLock = "configmapsleases" +) + +// LeaderElectionRecord is the record that is stored in the leader election annotation. +// This information should be used for observational purposes only and could be replaced +// with a random string (e.g. UUID) with only slight modification of this code. +// TODO(mikedanese): this should potentially be versioned +type LeaderElectionRecord struct { + // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and + // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not + // attempt to acquire leases with empty identities and will wait for the full lease + // interval to expire before attempting to reacquire. This value is set to empty when + // a client voluntarily steps down. + HolderIdentity string `json:"holderIdentity"` + LeaseDurationSeconds int `json:"leaseDurationSeconds"` + AcquireTime metav1.Time `json:"acquireTime"` + RenewTime metav1.Time `json:"renewTime"` + LeaderTransitions int `json:"leaderTransitions"` +} + +// EventRecorder records a change in the ResourceLock. +type EventRecorder interface { + Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{}) +} + +// ResourceLockConfig common data that exists across different +// resource locks +type ResourceLockConfig struct { + // Identity is the unique string identifying a lease holder across + // all participants in an election. + Identity string + // EventRecorder is optional. + EventRecorder EventRecorder +} + +// Interface offers a common interface for locking on arbitrary +// resources used in leader election. The Interface is used +// to hide the details on specific implementations in order to allow +// them to change over time. This interface is strictly for use +// by the leaderelection code. +type Interface interface { + // Get returns the LeaderElectionRecord + Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) + + // Create attempts to create a LeaderElectionRecord + Create(ctx context.Context, ler LeaderElectionRecord) error + + // Update will update and existing LeaderElectionRecord + Update(ctx context.Context, ler LeaderElectionRecord) error + + // RecordEvent is used to record events + RecordEvent(string) + + // Identity will return the locks Identity + Identity() string + + // Describe is used to convert details on current resource lock + // into a string + Describe() string +} + +// Manufacture will create a lock of a given type according to the input parameters +func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) { + endpointsLock := &endpointsLock{ + EndpointsMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coreClient, + LockConfig: rlc, + } + configmapLock := &configMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coreClient, + LockConfig: rlc, + } + leaseLock := &LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coordinationClient, + LockConfig: rlc, + } + switch lockType { + case endpointsResourceLock: + return nil, fmt.Errorf("endpoints lock is removed, migrate to %s", EndpointsLeasesResourceLock) + case configMapsResourceLock: + return nil, fmt.Errorf("configmaps lock is removed, migrate to %s", ConfigMapsLeasesResourceLock) + case LeasesResourceLock: + return leaseLock, nil + case EndpointsLeasesResourceLock: + return &MultiLock{ + Primary: endpointsLock, + Secondary: leaseLock, + }, nil + case ConfigMapsLeasesResourceLock: + return &MultiLock{ + Primary: configmapLock, + Secondary: leaseLock, + }, nil + default: + return nil, fmt.Errorf("Invalid lock-type %s", lockType) + } +} + +// NewFromKubeconfig will create a lock of a given type according to the input parameters. +// Timeout set for a client used to contact to Kubernetes should be lower than +// RenewDeadline to keep a single hung request from forcing a leader loss. +// Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic. +func NewFromKubeconfig(lockType string, ns string, name string, rlc ResourceLockConfig, kubeconfig *restclient.Config, renewDeadline time.Duration) (Interface, error) { + // shallow copy, do not modify the kubeconfig + config := *kubeconfig + timeout := renewDeadline / 2 + if timeout < time.Second { + timeout = time.Second + } + config.Timeout = timeout + leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election")) + return New(lockType, ns, name, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), rlc) +} diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go new file mode 100644 index 0000000000..185ef0e500 --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go @@ -0,0 +1,139 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" +) + +type LeaseLock struct { + // LeaseMeta should contain a Name and a Namespace of a + // LeaseMeta object that the LeaderElector will attempt to lead. + LeaseMeta metav1.ObjectMeta + Client coordinationv1client.LeasesGetter + LockConfig ResourceLockConfig + lease *coordinationv1.Lease +} + +// Get returns the election record from a Lease spec +func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { + lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + ll.lease = lease + record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec) + recordByte, err := json.Marshal(*record) + if err != nil { + return nil, nil, err + } + return record, recordByte, nil +} + +// Create attempts to create a Lease +func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error { + var err error + ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: ll.LeaseMeta.Name, + Namespace: ll.LeaseMeta.Namespace, + }, + Spec: LeaderElectionRecordToLeaseSpec(&ler), + }, metav1.CreateOptions{}) + return err +} + +// Update will update an existing Lease spec. +func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error { + if ll.lease == nil { + return errors.New("lease not initialized, call get or create first") + } + ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler) + + lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{}) + if err != nil { + return err + } + + ll.lease = lease + return nil +} + +// RecordEvent in leader election while adding meta-data +func (ll *LeaseLock) RecordEvent(s string) { + if ll.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s) + subject := &coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta} + // Populate the type meta, so we don't have to get it from the schema + subject.Kind = "Lease" + subject.APIVersion = coordinationv1.SchemeGroupVersion.String() + ll.LockConfig.EventRecorder.Eventf(subject, corev1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (ll *LeaseLock) Describe() string { + return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name) +} + +// Identity returns the Identity of the lock +func (ll *LeaseLock) Identity() string { + return ll.LockConfig.Identity +} + +func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord { + var r LeaderElectionRecord + if spec.HolderIdentity != nil { + r.HolderIdentity = *spec.HolderIdentity + } + if spec.LeaseDurationSeconds != nil { + r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds) + } + if spec.LeaseTransitions != nil { + r.LeaderTransitions = int(*spec.LeaseTransitions) + } + if spec.AcquireTime != nil { + r.AcquireTime = metav1.Time{spec.AcquireTime.Time} + } + if spec.RenewTime != nil { + r.RenewTime = metav1.Time{spec.RenewTime.Time} + } + return &r + +} + +func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec { + leaseDurationSeconds := int32(ler.LeaseDurationSeconds) + leaseTransitions := int32(ler.LeaderTransitions) + return coordinationv1.LeaseSpec{ + HolderIdentity: &ler.HolderIdentity, + LeaseDurationSeconds: &leaseDurationSeconds, + AcquireTime: &metav1.MicroTime{ler.AcquireTime.Time}, + RenewTime: &metav1.MicroTime{ler.RenewTime.Time}, + LeaseTransitions: &leaseTransitions, + } +} diff --git a/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go new file mode 100644 index 0000000000..5ee1dcbb50 --- /dev/null +++ b/pkg/pillar/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go @@ -0,0 +1,104 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "bytes" + "context" + "encoding/json" + + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +const ( + UnknownLeader = "leaderelection.k8s.io/unknown" +) + +// MultiLock is used for lock's migration +type MultiLock struct { + Primary Interface + Secondary Interface +} + +// Get returns the older election record of the lock +func (ml *MultiLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { + primary, primaryRaw, err := ml.Primary.Get(ctx) + if err != nil { + return nil, nil, err + } + + secondary, secondaryRaw, err := ml.Secondary.Get(ctx) + if err != nil { + // Lock is held by old client + if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() { + return primary, primaryRaw, nil + } + return nil, nil, err + } + + if primary.HolderIdentity != secondary.HolderIdentity { + primary.HolderIdentity = UnknownLeader + primaryRaw, err = json.Marshal(primary) + if err != nil { + return nil, nil, err + } + } + return primary, ConcatRawRecord(primaryRaw, secondaryRaw), nil +} + +// Create attempts to create both primary lock and secondary lock +func (ml *MultiLock) Create(ctx context.Context, ler LeaderElectionRecord) error { + err := ml.Primary.Create(ctx, ler) + if err != nil && !apierrors.IsAlreadyExists(err) { + return err + } + return ml.Secondary.Create(ctx, ler) +} + +// Update will update and existing annotation on both two resources. +func (ml *MultiLock) Update(ctx context.Context, ler LeaderElectionRecord) error { + err := ml.Primary.Update(ctx, ler) + if err != nil { + return err + } + _, _, err = ml.Secondary.Get(ctx) + if err != nil && apierrors.IsNotFound(err) { + return ml.Secondary.Create(ctx, ler) + } + return ml.Secondary.Update(ctx, ler) +} + +// RecordEvent in leader election while adding meta-data +func (ml *MultiLock) RecordEvent(s string) { + ml.Primary.RecordEvent(s) + ml.Secondary.RecordEvent(s) +} + +// Describe is used to convert details on current resource lock +// into a string +func (ml *MultiLock) Describe() string { + return ml.Primary.Describe() +} + +// Identity returns the Identity of the lock +func (ml *MultiLock) Identity() string { + return ml.Primary.Identity() +} + +func ConcatRawRecord(primaryRaw, secondaryRaw []byte) []byte { + return bytes.Join([][]byte{primaryRaw, secondaryRaw}, []byte(",")) +} diff --git a/pkg/pillar/vendor/modules.txt b/pkg/pillar/vendor/modules.txt index 2c6a09c26e..b9c75fa5c6 100644 --- a/pkg/pillar/vendor/modules.txt +++ b/pkg/pillar/vendor/modules.txt @@ -1440,6 +1440,8 @@ k8s.io/client-go/tools/clientcmd k8s.io/client-go/tools/clientcmd/api k8s.io/client-go/tools/clientcmd/api/latest k8s.io/client-go/tools/clientcmd/api/v1 +k8s.io/client-go/tools/leaderelection +k8s.io/client-go/tools/leaderelection/resourcelock k8s.io/client-go/tools/metrics k8s.io/client-go/tools/reference k8s.io/client-go/transport