Skip to content

Commit

Permalink
Use DPC Reconciler to assign cluster IP to cluster interface
Browse files Browse the repository at this point in the history
NIM subscribes to EdgeNodeClusterStatus published by zedkube
and passes this to DPCManager and further into DPCReconciler.
Cluster IP address is assigned to the cluster interface statically
(directly using netlink, not via dhcpcd).

zedkube subscribes to DeviceNetworkStatus to determine if the cluster
IP address is assigned and ready to use. Once assigned, it will update
EdgeNodeClusterStatus with ClusterIPIsReady set to true, which the
script cluster-init.sh is waiting for before it starts the procedure
of node joining the cluster.

Signed-off-by: Milan Lenco <milan@zededa.com>
  • Loading branch information
milan-zededa authored and eriknordmark committed Dec 3, 2024
1 parent 634b692 commit f79132c
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 88 deletions.
17 changes: 5 additions & 12 deletions pkg/pillar/cmd/monitor/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,29 +416,22 @@ func (ctx *monitor) process(ps *pubsub.PubSub) {
watches := make([]pubsub.ChannelWatch, 0)
for i := range ctx.subscriptions {
sub := ctx.subscriptions[i]
watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(sub.MsgChan()),
Callback: func(value interface{}) {
change, ok := value.(pubsub.Change)
if !ok {
return
}
sub.ProcessChange(change)
},
})
watches = append(watches, pubsub.WatchAndProcessSubChanges(sub))
}

watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(stillRunning.C),
Callback: func(_ interface{}) {
Callback: func(_ interface{}) (exit bool) {
ps.StillRunning(agentName, warningTime, errorTime)
return false
},
})

watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(ctx.clientConnected),
Callback: func(_ interface{}) {
Callback: func(_ interface{}) (exit bool) {
ctx.handleClientConnected()
return false
},
})

Expand Down
38 changes: 38 additions & 0 deletions pkg/pillar/cmd/nim/nim.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type nim struct {
subOnboardStatus pubsub.Subscription
subWwanStatus pubsub.Subscription
subNetworkInstanceConfig pubsub.Subscription
subEdgeNodeClusterStatus pubsub.Subscription

// Publications
pubDummyDevicePortConfig pubsub.Publication // For logging
Expand Down Expand Up @@ -326,6 +327,9 @@ func (n *nim) run(ctx context.Context) (err error) {
case change := <-n.subControllerCert.MsgChan():
n.subControllerCert.ProcessChange(change)

case change := <-n.subEdgeNodeClusterStatus.MsgChan():
n.subEdgeNodeClusterStatus.ProcessChange(change)

case change := <-n.subEdgeNodeCert.MsgChan():
n.subEdgeNodeCert.ProcessChange(change)

Expand Down Expand Up @@ -697,6 +701,23 @@ func (n *nim) initSubscriptions() (err error) {
if err != nil {
return err
}

// Subscribe to EdgeNodeClusterStatus to get the cluster interface and the cluster
// IP address which DPC Reconciler should assign statically.
n.subEdgeNodeClusterStatus, err = n.PubSub.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: types.EdgeNodeClusterStatus{},
Activate: false,
CreateHandler: n.handleEdgeNodeClusterStatusCreate,
ModifyHandler: n.handleEdgeNodeClusterStatusModify,
DeleteHandler: n.handleEdgeNodeClusterStatusDelete,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -908,6 +929,23 @@ func (n *nim) handleNetworkInstanceUpdate() {
n.dpcManager.UpdateFlowlogState(flowlogEnabled)
}

func (n *nim) handleEdgeNodeClusterStatusCreate(_ interface{}, _ string,
statusArg interface{}) {
status := statusArg.(types.EdgeNodeClusterStatus)
n.dpcManager.UpdateClusterStatus(status)
}

func (n *nim) handleEdgeNodeClusterStatusModify(_ interface{}, _ string,
statusArg, _ interface{}) {
status := statusArg.(types.EdgeNodeClusterStatus)
n.dpcManager.UpdateClusterStatus(status)
}

func (n *nim) handleEdgeNodeClusterStatusDelete(_ interface{}, _ string, _ interface{}) {
// Apply empty cluster status, which effectively removes the cluster IP.
n.dpcManager.UpdateClusterStatus(types.EdgeNodeClusterStatus{})
}

func (n *nim) isDeviceOnboarded() bool {
obj, err := n.subOnboardStatus.Get("global")
if err != nil {
Expand Down
14 changes: 3 additions & 11 deletions pkg/pillar/cmd/usbmanager/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,14 @@ func (usbCtx *usbmanagerContext) process(ps *pubsub.PubSub) {
watches := make([]pubsub.ChannelWatch, 0)
for i := range usbCtx.subscriptions {
sub := usbCtx.subscriptions[i]
watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(sub.MsgChan()),
Callback: func(value interface{}) {
change, ok := value.(pubsub.Change)
if !ok {
return
}
sub.ProcessChange(change)
},
})
watches = append(watches, pubsub.WatchAndProcessSubChanges(sub))
}

watches = append(watches, pubsub.ChannelWatch{
Chan: reflect.ValueOf(stillRunning.C),
Callback: func(_ interface{}) {
Callback: func(_ interface{}) (exit bool) {
ps.StillRunning(agentName, warningTime, errorTime)
return false
},
})

Expand Down
8 changes: 5 additions & 3 deletions pkg/pillar/cmd/zedkube/zedkube.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,11 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
zedkubeCtx.subZedAgentStatus = subZedAgentStatus
subZedAgentStatus.Activate()

// This will wait for kubernetes, longhorn, etc. to be ready
// XXX temp, this will be changed in later cluster PR
err = kubeapi.WaitForKubernetes(agentName, ps, stillRunning)
err = kubeapi.WaitForKubernetes(agentName, ps, stillRunning,
// Make sure we keep ClusterIPIsReady up to date while we wait
// for Kubernetes to come up.
pubsub.WatchAndProcessSubChanges(subEdgeNodeClusterConfig),
pubsub.WatchAndProcessSubChanges(subDeviceNetworkStatus))
if err != nil {
log.Errorf("zedkube: WaitForKubenetes %v", err)
}
Expand Down
34 changes: 27 additions & 7 deletions pkg/pillar/dpcmanager/dpcmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type DpcManager struct {
enableLastResort bool
devUUID uuid.UUID
flowlogEnabled bool
clusterStatus types.EdgeNodeClusterStatus
// Boot-time configuration
dpclPresentAtBoot bool

Expand Down Expand Up @@ -191,17 +192,19 @@ const (
commandUpdateDevUUID
commandProcessWwanStatus
commandUpdateFlowlogState
commandUpdateClusterStatus
)

type inputCommand struct {
cmd command
dpc types.DevicePortConfig // for commandAddDPC and commandDelDPC
gcp types.ConfigItemValueMap // for commandUpdateGCP
aa types.AssignableAdapters // for commandUpdateAA
rs types.RadioSilence // for commandUpdateRS
devUUID uuid.UUID // for commandUpdateDevUUID
wwanStatus types.WwanStatus // for commandProcessWwanStatus
flowlogEnabled bool // for commandUpdateFlowlogState
dpc types.DevicePortConfig // for commandAddDPC and commandDelDPC
gcp types.ConfigItemValueMap // for commandUpdateGCP
aa types.AssignableAdapters // for commandUpdateAA
rs types.RadioSilence // for commandUpdateRS
devUUID uuid.UUID // for commandUpdateDevUUID
wwanStatus types.WwanStatus // for commandProcessWwanStatus
flowlogEnabled bool // for commandUpdateFlowlogState
clusterStatus types.EdgeNodeClusterStatus // for commandUpdateClusterStatus
}

type dpcVerify struct {
Expand Down Expand Up @@ -276,6 +279,8 @@ func (m *DpcManager) run(ctx context.Context) {
m.processWwanStatus(ctx, inputCmd.wwanStatus)
case commandUpdateFlowlogState:
m.doUpdateFlowlogState(ctx, inputCmd.flowlogEnabled)
case commandUpdateClusterStatus:
m.doUpdateClusterStatus(ctx, inputCmd.clusterStatus)
}
m.resumeVerifyIfAsyncDone(ctx)

Expand Down Expand Up @@ -412,6 +417,7 @@ func (m *DpcManager) reconcilerArgs() dpcreconciler.Args {
AA: m.adapters,
RS: m.rsConfig,
FlowlogEnabled: m.flowlogEnabled,
ClusterStatus: m.clusterStatus,
}
if m.currentDPC() != nil {
args.DPC = *m.currentDPC()
Expand Down Expand Up @@ -491,6 +497,14 @@ func (m *DpcManager) UpdateFlowlogState(flowlogEnabled bool) {
}
}

// UpdateClusterStatus : apply an updated cluster status.
func (m *DpcManager) UpdateClusterStatus(status types.EdgeNodeClusterStatus) {
m.inputCommands <- inputCommand{
cmd: commandUpdateClusterStatus,
clusterStatus: status,
}
}

// GetDNS returns device network state information.
func (m *DpcManager) GetDNS() types.DeviceNetworkStatus {
return m.deviceNetStatus
Expand Down Expand Up @@ -629,3 +643,9 @@ func (m *DpcManager) doUpdateFlowlogState(ctx context.Context, flowlogEnabled bo
m.flowlogEnabled = flowlogEnabled
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
}

func (m *DpcManager) doUpdateClusterStatus(ctx context.Context,
status types.EdgeNodeClusterStatus) {
m.clusterStatus = status
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
}
2 changes: 2 additions & 0 deletions pkg/pillar/dpcreconciler/dpcreconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Args struct {
GCP types.ConfigItemValueMap
// True if flow logging is enabled in at least one network instance.
FlowlogEnabled bool
// Cluster network status used when edge node is part of a kubernetes cluster.
ClusterStatus types.EdgeNodeClusterStatus
}

// ReconcileStatus : state data related to config reconciliation.
Expand Down
Loading

0 comments on commit f79132c

Please sign in to comment.