Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR4408 and networking changes for Clustering #4454

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions pkg/pillar/cmd/monitor/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,29 +416,22 @@ func (ctx *monitor) process(ps *pubsub.PubSub) {
watches := make([]pubsub.ChannelWatch, 0)
for i := range ctx.subscriptions {
sub := ctx.subscriptions[i]
watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(sub.MsgChan()),
Callback: func(value interface{}) {
change, ok := value.(pubsub.Change)
if !ok {
return
}
sub.ProcessChange(change)
},
})
watches = append(watches, pubsub.WatchAndProcessSubChanges(sub))
}

watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(stillRunning.C),
Callback: func(_ interface{}) {
Callback: func(_ interface{}) (exit bool) {
ps.StillRunning(agentName, warningTime, errorTime)
return false
},
})

watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(ctx.clientConnected),
Callback: func(_ interface{}) {
Callback: func(_ interface{}) (exit bool) {
ctx.handleClientConnected()
return false
},
})

Expand Down
38 changes: 38 additions & 0 deletions pkg/pillar/cmd/nim/nim.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type nim struct {
subOnboardStatus pubsub.Subscription
subWwanStatus pubsub.Subscription
subNetworkInstanceConfig pubsub.Subscription
subEdgeNodeClusterStatus pubsub.Subscription

// Publications
pubDummyDevicePortConfig pubsub.Publication // For logging
Expand Down Expand Up @@ -326,6 +327,9 @@ func (n *nim) run(ctx context.Context) (err error) {
case change := <-n.subControllerCert.MsgChan():
n.subControllerCert.ProcessChange(change)

case change := <-n.subEdgeNodeClusterStatus.MsgChan():
n.subEdgeNodeClusterStatus.ProcessChange(change)

case change := <-n.subEdgeNodeCert.MsgChan():
n.subEdgeNodeCert.ProcessChange(change)

Expand Down Expand Up @@ -697,6 +701,23 @@ func (n *nim) initSubscriptions() (err error) {
if err != nil {
return err
}

// Subscribe to EdgeNodeClusterStatus to get the cluster interface and the cluster
// IP address which DPC Reconciler should assign statically.
n.subEdgeNodeClusterStatus, err = n.PubSub.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: types.EdgeNodeClusterStatus{},
Activate: false,
CreateHandler: n.handleEdgeNodeClusterStatusCreate,
ModifyHandler: n.handleEdgeNodeClusterStatusModify,
DeleteHandler: n.handleEdgeNodeClusterStatusDelete,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -908,6 +929,23 @@ func (n *nim) handleNetworkInstanceUpdate() {
n.dpcManager.UpdateFlowlogState(flowlogEnabled)
}

func (n *nim) handleEdgeNodeClusterStatusCreate(_ interface{}, _ string,
statusArg interface{}) {
status := statusArg.(types.EdgeNodeClusterStatus)
n.dpcManager.UpdateClusterStatus(status)
}

func (n *nim) handleEdgeNodeClusterStatusModify(_ interface{}, _ string,
statusArg, _ interface{}) {
status := statusArg.(types.EdgeNodeClusterStatus)
n.dpcManager.UpdateClusterStatus(status)
}

func (n *nim) handleEdgeNodeClusterStatusDelete(_ interface{}, _ string, _ interface{}) {
// Apply empty cluster status, which effectively removes the cluster IP.
n.dpcManager.UpdateClusterStatus(types.EdgeNodeClusterStatus{})
}

func (n *nim) isDeviceOnboarded() bool {
obj, err := n.subOnboardStatus.Get("global")
if err != nil {
Expand Down
14 changes: 3 additions & 11 deletions pkg/pillar/cmd/usbmanager/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,14 @@ func (usbCtx *usbmanagerContext) process(ps *pubsub.PubSub) {
watches := make([]pubsub.ChannelWatch, 0)
for i := range usbCtx.subscriptions {
sub := usbCtx.subscriptions[i]
watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(sub.MsgChan()),
Callback: func(value interface{}) {
change, ok := value.(pubsub.Change)
if !ok {
return
}
sub.ProcessChange(change)
},
})
watches = append(watches, pubsub.WatchAndProcessSubChanges(sub))
}

watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(stillRunning.C),
Callback: func(_ interface{}) {
Callback: func(_ interface{}) (exit bool) {
ps.StillRunning(agentName, warningTime, errorTime)
return false
},
})

Expand Down
150 changes: 130 additions & 20 deletions pkg/pillar/cmd/zedkube/applogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package zedkube
import (
"bufio"
"context"
"fmt"
"io"
"regexp"
"strings"
Expand All @@ -16,32 +17,32 @@ import (
"github.com/lf-edge/eve/pkg/pillar/base"
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/types"
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// collectAppLogs - collect App logs from pods which covers both containers and virt-launcher pods
func collectAppLogs(ctx *zedkubeContext) {
sub := ctx.subAppInstanceConfig
// collect App logs from pods which covers both containers and virt-launcher pods
func (z *zedkube) collectAppLogs() {
sub := z.subAppInstanceConfig
items := sub.GetAll()
if len(items) == 0 {
return
}

if ctx.config == nil {
config, err := kubeapi.GetKubeConfig()
if err != nil {
return
}
ctx.config = config
}
clientset, err := kubernetes.NewForConfig(ctx.config)
clientset, err := getKubeClientSet()
if err != nil {
log.Errorf("collectAppLogs: can't get clientset %v", err)
return
}

err = z.getnodeNameAndUUID()
if err != nil {
log.Errorf("collectAppLogs: can't get edgeNodeInfo %v", err)
return
}

// "Thu Aug 17 05:39:04 UTC 2023"
timestampRegex := regexp.MustCompile(`(\w{3} \w{3} \d{2} \d{2}:\d{2}:\d{2} \w+ \d{4})`)
nowStr := time.Now().String()
Expand All @@ -50,15 +51,35 @@ func collectAppLogs(ctx *zedkubeContext) {
sinceSec = logcollectInterval
for _, item := range items {
aiconfig := item.(types.AppInstanceConfig)
contName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID)

if aiconfig.FixedResources.VirtualizationMode != types.NOHYPER {
continue
}
if aiconfig.DesignatedNodeID != uuid.Nil && aiconfig.DesignatedNodeID.String() != z.nodeuuid {
continue
}
kubeName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID)
contName := kubeName
opt := &corev1.PodLogOptions{}
if ctx.appLogStarted {
if z.appLogStarted {
opt = &corev1.PodLogOptions{
SinceSeconds: &sinceSec,
}
} else {
ctx.appLogStarted = true
z.appLogStarted = true
}

pods, err := clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", kubeName),
})
if err != nil {
logrus.Errorf("checkReplicaSetMetrics: can't get pod %v", err)
continue
}
for _, pod := range pods.Items {
if strings.HasPrefix(pod.ObjectMeta.Name, kubeName) {
contName = pod.ObjectMeta.Name
break
}
}
req := clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).GetLogs(contName, opt)
podLogs, err := req.Stream(context.Background())
Expand All @@ -84,18 +105,107 @@ func collectAppLogs(ctx *zedkubeContext) {
timeStr = nowStr
}
// Process and print the log line here
aiLogger := ctx.appContainerLogger.WithFields(logrus.Fields{
aiLogger := z.appContainerLogger.WithFields(logrus.Fields{
"appuuid": aiconfig.UUIDandVersion.UUID.String(),
"containername": contName,
"eventtime": timeStr,
})
aiLogger.Infof("%s", logLine)
}
if scanner.Err() != nil {
if scanner.Err() != io.EOF {
log.Errorf("collectAppLogs: pod %s, scanner error %v", contName, scanner.Err())
if scanner.Err() == io.EOF {
break // Break out of the loop when EOF is reached
}
}
}
}

func (z *zedkube) checkAppsStatus() {
sub := z.subAppInstanceConfig
items := sub.GetAll()
if len(items) == 0 {
return
}

err := z.getnodeNameAndUUID()
if err != nil {
log.Errorf("checkAppsStatus: can't get edgeNodeInfo %v", err)
return
}

u, err := uuid.FromString(z.nodeuuid)
if err != nil {
return
}

clientset, err := getKubeClientSet()
if err != nil {
log.Errorf("checkAppsStatus: can't get clientset %v", err)
return
}

options := metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", z.nodeName),
}
pods, err := clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), options)
if err != nil {
log.Errorf("checkAppsStatus: can't get pods %v", err)
return
}

pub := z.pubENClusterAppStatus
stItmes := pub.GetAll()
var oldStatus *types.ENClusterAppStatus
for _, item := range items {
aiconfig := item.(types.AppInstanceConfig)
if aiconfig.DesignatedNodeID == uuid.Nil { // if not for cluster app, skip
continue
}
encAppStatus := types.ENClusterAppStatus{
AppUUID: aiconfig.UUIDandVersion.UUID,
IsDNidNode: aiconfig.DesignatedNodeID == u,
}
contName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID)

for _, pod := range pods.Items {
contVMIName := "virt-launcher-" + contName
log.Functionf("checkAppsStatus: pod %s, cont %s", pod.Name, contName)
if strings.HasPrefix(pod.Name, contName) || strings.HasPrefix(pod.Name, contVMIName) {
encAppStatus.ScheduledOnThisNode = true
if pod.Status.Phase == corev1.PodRunning {
encAppStatus.StatusRunning = true
}
break
}
}

for _, st := range stItmes {
aiStatus := st.(types.ENClusterAppStatus)
if aiStatus.AppUUID == aiconfig.UUIDandVersion.UUID {
oldStatus = &aiStatus
break
}
break // Break out of the loop when EOF is reached or error occurs
}
log.Functionf("checkAppsStatus: devname %s, pod (%d) status %+v, old %+v", z.nodeName, len(pods.Items), encAppStatus, oldStatus)

if oldStatus == nil || oldStatus.IsDNidNode != encAppStatus.IsDNidNode ||
oldStatus.ScheduledOnThisNode != encAppStatus.ScheduledOnThisNode || oldStatus.StatusRunning != encAppStatus.StatusRunning {
log.Functionf("checkAppsStatus: status differ, publish")
z.pubENClusterAppStatus.Publish(aiconfig.Key(), encAppStatus)
}
}
}

func (z *zedkube) getnodeNameAndUUID() error {
if z.nodeuuid == "" || z.nodeName == "" {
NodeInfo, err := z.subEdgeNodeInfo.Get("global")
if err != nil {
log.Errorf("getnodeNameAndUUID: can't get edgeNodeInfo %v", err)
return err
}
enInfo := NodeInfo.(types.EdgeNodeInfo)
z.nodeName = strings.ToLower(enInfo.DeviceName)
z.nodeuuid = enInfo.DeviceID.String()
}
return nil
}
Loading
Loading