Skip to content

Commit

Permalink
Refactor the events
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Aug 11, 2023
1 parent 1cfc74a commit 52da03b
Show file tree
Hide file tree
Showing 23 changed files with 1,263 additions and 813 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/wzshiming/cmux v0.3.2
github.com/wzshiming/cron v0.2.1
github.com/wzshiming/ctc v1.2.3
github.com/wzshiming/easycel v0.4.0
go.uber.org/atomic v1.11.0
Expand Down Expand Up @@ -95,7 +94,6 @@ require (
github.com/rivo/uniseg v0.4.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/wzshiming/llrb v0.2.1 // indirect
github.com/wzshiming/trie v0.1.1 // indirect
github.com/wzshiming/winseq v0.0.0-20200112104235-db357dc107ae // indirect
github.com/xlab/treeprint v1.1.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/wzshiming/cmux v0.3.2 h1:lBEWbfbRqUDdXB6Mro/g35kvCuUEmAgIdpGEuER3bis=
github.com/wzshiming/cmux v0.3.2/go.mod h1:lPhqJN2E3frzkxrPdjesxL09z7nTcuZ6i8Is+2G/Xw4=
github.com/wzshiming/cron v0.2.1 h1:jtJlyGG4Od+xE0A36DtqTeyLhGssE95dsghqLdemXxI=
github.com/wzshiming/cron v0.2.1/go.mod h1:qXPkSl0AQLmjchtcqv7L6X8FSOLqy8hXYIXv9J1cKO4=
github.com/wzshiming/ctc v1.2.3 h1:q+hW3IQNsjIlOFBTGZZZeIXTElFM4grF4spW/errh/c=
github.com/wzshiming/ctc v1.2.3/go.mod h1:2tVAtIY7SUyraSk0JxvwmONNPFL4ARavPuEsg5+KA28=
github.com/wzshiming/easycel v0.4.0 h1:flPpJYS2bXQj1Jgk2yOp5oB6FvflvPM0VCHXx/qOJDk=
github.com/wzshiming/easycel v0.4.0/go.mod h1:qg3oAkmPOLJEUFlFsxBxYbp0cXHgbq2sZEHVq7SmUp8=
github.com/wzshiming/llrb v0.2.1 h1:+T/PE0lrrB2LE8487fYcRlkZct5E9L/Gi8mf6mUtEK8=
github.com/wzshiming/llrb v0.2.1/go.mod h1:xlAM5hpCBIT3dTy/6CbNPxHMR1JfIMSyk2lV68wz3Yw=
github.com/wzshiming/trie v0.1.1 h1:02AaBSZGhs6Aqljp8fz4xq/Mg8omFBPIlrUS0pJ11ks=
github.com/wzshiming/trie v0.1.1/go.mod h1:c9thxXTh4KcGkejt4sUsO4c5GUmWpxeWzOJ7AZJaI+8=
github.com/wzshiming/winseq v0.0.0-20200112104235-db357dc107ae h1:tpXvBXC3hpQBDCc9OojJZCQMVRAbT3TTdUMP8WguXkY=
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,14 @@ func runE(ctx context.Context, flags *flagpole) error {
}
ctx = log.NewContext(ctx, logger.With("id", id))

enableMetrics := len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind)
ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
TypedClient: typedClient,
TypedKwokClient: typedKwokClient,
EnableCNI: flags.Options.EnableCNI,
EnableMetrics: len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind),
EnableMetrics: enableMetrics,
EnablePodCache: enableMetrics,
ManageAllNodes: flags.Options.ManageAllNodes,
ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector,
Expand Down
170 changes: 122 additions & 48 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"strings"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -43,6 +44,8 @@ import (
"sigs.k8s.io/kwok/pkg/consts"
"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/gotpl"
"sigs.k8s.io/kwok/pkg/utils/informer"
"sigs.k8s.io/kwok/pkg/utils/queue"
"sigs.k8s.io/kwok/pkg/utils/slices"
)

Expand Down Expand Up @@ -86,6 +89,8 @@ var (
return consts.Version
},
}

nodeKind = corev1.SchemeGroupVersion.WithKind("Node")
)

// Controller is a fake kubelet implementation that can be used to test
Expand All @@ -96,6 +101,9 @@ type Controller struct {
nodeLeases *NodeLeaseController
broadcaster record.EventBroadcaster
typedClient kubernetes.Interface

nodeCacheGetter informer.Getter[*corev1.Node]
podCacheGetter informer.Getter[*corev1.Pod]
}

// Config is the configuration for the controller
Expand All @@ -121,6 +129,7 @@ type Config struct {
NodeLeaseParallelism uint
ID string
EnableMetrics bool
EnablePodCache bool
}

// NewController creates a new fake kubelet controller
Expand Down Expand Up @@ -155,49 +164,79 @@ func (c *Controller) Start(ctx context.Context) error {
recorder := c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})

var (
err error
nodeLeases *NodeLeaseController
getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference
nodeLeasesChan chan informer.Event[*coordinationv1.Lease]
onLeaseNodeManageFunc func(nodeName string)
onNodeManagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
nodeSelectorFunc func(node *corev1.Node) bool
)
switch {
case conf.ManageAllNodes:
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}
case conf.ManageNodesWithAnnotationSelector != "":
selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector)
if err != nil {
return err
}
nodeSelectorFunc = func(node *corev1.Node) bool {
return selector.Matches(labels.Set(node.Annotations))
}
case conf.ManageNodesWithLabelSelector != "":
// client-go supports label filtering, so return true is ok.
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}

nodeChan := make(chan informer.Event[*corev1.Node], 1)
nodesCli := conf.TypedClient.CoreV1().Nodes()
nodesInformer := informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
nodesCache, err := nodesInformer.WatchWithCache(ctx, informer.Option{
LabelSelector: conf.ManageNodesWithLabelSelector,
AnnotationSelector: conf.ManageNodesWithAnnotationSelector,
}, nodeChan)
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}

podsChan := make(chan informer.Event[*corev1.Pod], 1)
podsCli := conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
podsInformer := informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli)

podWatchOption := informer.Option{
FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(),
}

var podsCache informer.Getter[*corev1.Pod]
if conf.EnablePodCache {
podsCache, err = podsInformer.WatchWithCache(ctx, podWatchOption, podsChan)
} else {
err = podsInformer.Watch(ctx, podWatchOption, podsChan)
}
if err != nil {
return fmt.Errorf("failed to watch pods: %w", err)
}

if conf.NodeLeaseDurationSeconds != 0 {
nodeLeasesChan = make(chan informer.Event[*coordinationv1.Lease], 1)
nodeLeasesCli := conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
nodeLeasesInformer := informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
err = nodeLeasesInformer.Watch(ctx, informer.Option{}, nodeLeasesChan)
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}

leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second
// https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200
renewInterval := leaseDuration / 4
// https://github.com/kubernetes/component-helpers/blob/d17b6f1e84500ee7062a26f5327dc73cb3e9374a/apimachinery/lease/controller.go#L100
renewIntervalJitter := 0.04
l, err := NewNodeLeaseController(NodeLeaseControllerConfig{
nodeLeases, err = NewNodeLeaseController(NodeLeaseControllerConfig{
Clock: conf.Clock,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
LeaseDurationSeconds: conf.NodeLeaseDurationSeconds,
LeaseParallelism: conf.NodeLeaseParallelism,
RenewInterval: renewInterval,
RenewIntervalJitter: renewIntervalJitter,
LeaseNamespace: corev1.NamespaceNodeLease,
MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference {
return getNodeOwnerFunc(nodeName)
node, ok := nodesCache.Get(nodeName)
if !ok {
return nil
}
ownerReferences := []metav1.OwnerReference{
{
APIVersion: nodeKind.Version,
Kind: nodeKind.Kind,
Name: node.Name,
UID: node.UID,
},
}
return ownerReferences
}),
HolderIdentity: conf.ID,
OnNodeManagedFunc: func(nodeName string) {
Expand All @@ -207,7 +246,6 @@ func (c *Controller) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to create node leases controller: %w", err)
}
nodeLeases = l

// Not holding the lease means the node is not managed
readOnlyFunc = func(nodeName string) bool {
Expand Down Expand Up @@ -292,13 +330,12 @@ func (c *Controller) Start(ctx context.Context) error {
nodes, err := NewNodeController(NodeControllerConfig{
Clock: conf.Clock,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
NodeIP: conf.NodeIP,
NodeName: conf.NodeName,
NodePort: conf.NodePort,
DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector,
NodeSelectorFunc: nodeSelectorFunc,
OnNodeManagedFunc: func(nodeName string) {
onNodeManagedFunc(nodeName)
},
Expand All @@ -317,13 +354,13 @@ func (c *Controller) Start(ctx context.Context) error {
Clock: conf.Clock,
EnableCNI: conf.EnableCNI,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
NodeIP: conf.NodeIP,
CIDR: conf.CIDR,
DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
Lifecycle: podLifecycleGetter,
PlayStageParallelism: conf.PodPlayStageParallelism,
Namespace: corev1.NamespaceAll,
NodeGetFunc: nodes.Get,
FuncMap: defaultFuncMap,
Recorder: recorder,
Expand All @@ -334,68 +371,105 @@ func (c *Controller) Start(ctx context.Context) error {
return fmt.Errorf("failed to create pods controller: %w", err)
}

podOnNodeManageQueue := queue.NewQueue[string]()
if nodeLeases != nil {
getNodeOwnerFunc = func(nodeName string) []metav1.OwnerReference {
nodeInfo, ok := nodes.Get(nodeName)
if !ok || nodeInfo == nil {
return nil
}
return nodeInfo.OwnerReferences
}
nodeManageQueue := queue.NewQueue[string]()
onLeaseNodeManageFunc = func(nodeName string) {
// Manage the node and play stage all pods on the node
nodes.Manage(nodeName)
pods.PlayStagePodsOnNode(nodeName)
nodeManageQueue.Add(nodeName)
podOnNodeManageQueue.Add(nodeName)
}

onNodeManagedFunc = func(nodeName string) {
// Try to hold the lease
nodeLeases.TryHold(nodeName)
}

go func() {
for {
nodeName := nodeManageQueue.GetOrWait()
node, ok := nodesCache.Get(nodeName)
if !ok {
logger.Warn("node not found in cache", "node", nodeName)
err := nodesInformer.Sync(ctx, informer.Option{
FieldSelector: fields.OneTermEqualSelector("metadata.name", nodeName).String(),
}, nodeChan)
if err != nil {
logger.Error("failed to update node", err, "node", nodeName)
}
continue
}
nodeChan <- informer.Event[*corev1.Node]{
Type: informer.Sync,
Object: node,
}
}
}()
} else {
onNodeManagedFunc = func(nodeName string) {
// Play stage all pods on the node
pods.PlayStagePodsOnNode(nodeName)
podOnNodeManageQueue.Add(nodeName)
}
}

go func() {
for {
nodeName := podOnNodeManageQueue.GetOrWait()
err = podsInformer.Sync(ctx, informer.Option{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
}, podsChan)
if err != nil {
logger.Error("failed to update pods on node", err, "node", nodeName)
}
}
}()

c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.typedClient.CoreV1().Events("")})
if nodeLeases != nil {
err := nodeLeases.Start(ctx)
err := nodeLeases.Start(ctx, nodeLeasesChan)
if err != nil {
return fmt.Errorf("failed to start node leases controller: %w", err)
}
}
err = pods.Start(ctx)
err = pods.Start(ctx, podsChan)
if err != nil {
return fmt.Errorf("failed to start pods controller: %w", err)
}
err = nodes.Start(ctx)
err = nodes.Start(ctx, nodeChan)
if err != nil {
return fmt.Errorf("failed to start nodes controller: %w", err)
}

c.pods = pods
c.nodes = nodes
c.nodeLeases = nodeLeases
c.nodeCacheGetter = nodesCache
c.podCacheGetter = podsCache
return nil
}

// GetNode returns the node with the given name
func (c *Controller) GetNode(nodeName string) (*NodeInfo, bool) {
// GetNodeInfo returns the node info for the given node
func (c *Controller) GetNodeInfo(nodeName string) (*NodeInfo, bool) {
return c.nodes.Get(nodeName)
}

// ListNodes returns all nodes
func (c *Controller) ListNodes() []*NodeInfo {
func (c *Controller) ListNodes() []string {
return c.nodes.List()
}

// ListPods returns all pods on the given node
func (c *Controller) ListPods(nodeName string) ([]*PodInfo, bool) {
func (c *Controller) ListPods(nodeName string) ([]log.ObjectRef, bool) {
return c.pods.List(nodeName)
}

// GetPodCache returns the pod cache
func (c *Controller) GetPodCache() informer.Getter[*corev1.Pod] {
return c.podCacheGetter
}

// GetNodeCache returns the node cache
func (c *Controller) GetNodeCache() informer.Getter[*corev1.Node] {
return c.nodeCacheGetter
}

// Identity returns a unique identifier for this controller
func Identity() (string, error) {
hostname, err := os.Hostname()
Expand Down
Loading

0 comments on commit 52da03b

Please sign in to comment.