From 2dabc6bb37f15838d1d69bc7e03dc20bfcd712a5 Mon Sep 17 00:00:00 2001 From: Shiming Zhang Date: Thu, 10 Aug 2023 18:21:31 +0800 Subject: [PATCH] Refactor the events --- go.mod | 2 - go.sum | 4 - pkg/kwok/cmd/root.go | 4 +- pkg/kwok/controllers/controller.go | 170 ++++++--- pkg/kwok/controllers/node_controller.go | 324 ++++------------ pkg/kwok/controllers/node_controller_test.go | 33 +- pkg/kwok/controllers/node_lease_controller.go | 173 +++------ .../controllers/node_lease_controller_test.go | 15 +- pkg/kwok/controllers/pod_controller.go | 359 +++++++----------- pkg/kwok/controllers/pod_controller_test.go | 149 +++++--- pkg/kwok/controllers/utils.go | 10 +- pkg/kwok/metrics/metrics.go | 134 ++++--- pkg/kwok/server/metrics.go | 2 +- pkg/kwok/server/service_discovery.go | 8 +- pkg/utils/informer/doc.go | 18 + pkg/utils/informer/event.go | 106 ++++++ pkg/utils/informer/informer.go | 231 +++++++++++ pkg/utils/maps/sync.go | 10 + pkg/utils/queue/delaying_queue.go | 186 +++++++++ pkg/utils/queue/doc.go | 18 + pkg/utils/queue/queue.go | 98 +++++ test/kwokctl/kwokctl_benchmark_test.sh | 15 +- test/kwokctl/suite.sh | 5 + 23 files changed, 1261 insertions(+), 813 deletions(-) create mode 100644 pkg/utils/informer/doc.go create mode 100644 pkg/utils/informer/event.go create mode 100644 pkg/utils/informer/informer.go create mode 100644 pkg/utils/queue/delaying_queue.go create mode 100644 pkg/utils/queue/doc.go create mode 100644 pkg/utils/queue/queue.go diff --git a/go.mod b/go.mod index 2d8cd5e40..3aed264df 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/wzshiming/cmux v0.3.2 - github.com/wzshiming/cron v0.2.1 github.com/wzshiming/ctc v1.2.3 github.com/wzshiming/easycel v0.4.0 go.uber.org/atomic v1.11.0 @@ -95,7 +94,6 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect - github.com/wzshiming/llrb v0.2.1 // indirect github.com/wzshiming/trie v0.1.1 // indirect github.com/wzshiming/winseq v0.0.0-20200112104235-db357dc107ae // indirect github.com/xlab/treeprint v1.1.0 // indirect diff --git a/go.sum b/go.sum index 2a2eb1fc4..623046e48 100644 --- a/go.sum +++ b/go.sum @@ -222,14 +222,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/wzshiming/cmux v0.3.2 h1:lBEWbfbRqUDdXB6Mro/g35kvCuUEmAgIdpGEuER3bis= github.com/wzshiming/cmux v0.3.2/go.mod h1:lPhqJN2E3frzkxrPdjesxL09z7nTcuZ6i8Is+2G/Xw4= -github.com/wzshiming/cron v0.2.1 h1:jtJlyGG4Od+xE0A36DtqTeyLhGssE95dsghqLdemXxI= -github.com/wzshiming/cron v0.2.1/go.mod h1:qXPkSl0AQLmjchtcqv7L6X8FSOLqy8hXYIXv9J1cKO4= github.com/wzshiming/ctc v1.2.3 h1:q+hW3IQNsjIlOFBTGZZZeIXTElFM4grF4spW/errh/c= github.com/wzshiming/ctc v1.2.3/go.mod h1:2tVAtIY7SUyraSk0JxvwmONNPFL4ARavPuEsg5+KA28= github.com/wzshiming/easycel v0.4.0 h1:flPpJYS2bXQj1Jgk2yOp5oB6FvflvPM0VCHXx/qOJDk= github.com/wzshiming/easycel v0.4.0/go.mod h1:qg3oAkmPOLJEUFlFsxBxYbp0cXHgbq2sZEHVq7SmUp8= -github.com/wzshiming/llrb v0.2.1 h1:+T/PE0lrrB2LE8487fYcRlkZct5E9L/Gi8mf6mUtEK8= -github.com/wzshiming/llrb v0.2.1/go.mod h1:xlAM5hpCBIT3dTy/6CbNPxHMR1JfIMSyk2lV68wz3Yw= github.com/wzshiming/trie v0.1.1 h1:02AaBSZGhs6Aqljp8fz4xq/Mg8omFBPIlrUS0pJ11ks= github.com/wzshiming/trie v0.1.1/go.mod h1:c9thxXTh4KcGkejt4sUsO4c5GUmWpxeWzOJ7AZJaI+8= github.com/wzshiming/winseq v0.0.0-20200112104235-db357dc107ae h1:tpXvBXC3hpQBDCc9OojJZCQMVRAbT3TTdUMP8WguXkY= diff --git a/pkg/kwok/cmd/root.go b/pkg/kwok/cmd/root.go index fa3272a47..d88d916a8 100644 --- a/pkg/kwok/cmd/root.go +++ b/pkg/kwok/cmd/root.go @@ -253,12 +253,14 @@ func runE(ctx context.Context, flags *flagpole) error { } ctx = log.NewContext(ctx, logger.With("id", id)) + enableMetrics := len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind) ctr, err := controllers.NewController(controllers.Config{ Clock: clock.RealClock{}, TypedClient: typedClient, TypedKwokClient: typedKwokClient, EnableCNI: flags.Options.EnableCNI, - EnableMetrics: len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind), + EnableMetrics: enableMetrics, + EnablePodCache: enableMetrics, ManageAllNodes: flags.Options.ManageAllNodes, ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector, ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector, diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go index 5970af4ab..e98893891 100644 --- a/pkg/kwok/controllers/controller.go +++ b/pkg/kwok/controllers/controller.go @@ -25,9 +25,10 @@ import ( "strings" "time" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -43,6 +44,8 @@ import ( "sigs.k8s.io/kwok/pkg/consts" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/gotpl" + "sigs.k8s.io/kwok/pkg/utils/informer" + "sigs.k8s.io/kwok/pkg/utils/queue" "sigs.k8s.io/kwok/pkg/utils/slices" ) @@ -86,6 +89,8 @@ var ( return consts.Version }, } + + nodeKind = corev1.SchemeGroupVersion.WithKind("Node") ) // Controller is a fake kubelet implementation that can be used to test @@ -96,6 +101,9 @@ type Controller struct { nodeLeases *NodeLeaseController broadcaster record.EventBroadcaster typedClient kubernetes.Interface + + nodeCacheGetter informer.Getter[*corev1.Node] + podCacheGetter informer.Getter[*corev1.Pod] } // Config is the configuration for the controller @@ -121,6 +129,7 @@ type Config struct { NodeLeaseParallelism uint ID string EnableMetrics bool + EnablePodCache bool } // NewController creates a new fake kubelet controller @@ -155,49 +164,79 @@ func (c *Controller) Start(ctx context.Context) error { recorder := c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"}) var ( + err error nodeLeases *NodeLeaseController - getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference + nodeLeasesChan chan informer.Event[*coordinationv1.Lease] onLeaseNodeManageFunc func(nodeName string) onNodeManagedFunc func(nodeName string) readOnlyFunc func(nodeName string) bool - nodeSelectorFunc func(node *corev1.Node) bool ) - switch { - case conf.ManageAllNodes: - nodeSelectorFunc = func(node *corev1.Node) bool { - return true - } - case conf.ManageNodesWithAnnotationSelector != "": - selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector) - if err != nil { - return err - } - nodeSelectorFunc = func(node *corev1.Node) bool { - return selector.Matches(labels.Set(node.Annotations)) - } - case conf.ManageNodesWithLabelSelector != "": - // client-go supports label filtering, so return true is ok. - nodeSelectorFunc = func(node *corev1.Node) bool { - return true - } + + nodeChan := make(chan informer.Event[*corev1.Node], 1) + nodesCli := conf.TypedClient.CoreV1().Nodes() + nodesInformer := informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli) + nodesCache, err := nodesInformer.WatchWithCache(ctx, informer.Option{ + LabelSelector: conf.ManageNodesWithLabelSelector, + AnnotationSelector: conf.ManageNodesWithAnnotationSelector, + }, nodeChan) + if err != nil { + return fmt.Errorf("failed to watch nodes: %w", err) + } + + podsChan := make(chan informer.Event[*corev1.Pod], 1) + podsCli := conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll) + podsInformer := informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli) + + podWatchOption := informer.Option{ + FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(), + } + + var podsCache informer.Getter[*corev1.Pod] + if conf.EnablePodCache { + podsCache, err = podsInformer.WatchWithCache(ctx, podWatchOption, podsChan) + } else { + err = podsInformer.Watch(ctx, podWatchOption, podsChan) + } + if err != nil { + return fmt.Errorf("failed to watch pods: %w", err) } if conf.NodeLeaseDurationSeconds != 0 { + nodeLeasesChan = make(chan informer.Event[*coordinationv1.Lease], 1) + nodeLeasesCli := conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease) + nodeLeasesInformer := informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli) + err = nodeLeasesInformer.Watch(ctx, informer.Option{}, nodeLeasesChan) + if err != nil { + return fmt.Errorf("failed to watch nodes: %w", err) + } + leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second // https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200 renewInterval := leaseDuration / 4 // https://github.com/kubernetes/component-helpers/blob/d17b6f1e84500ee7062a26f5327dc73cb3e9374a/apimachinery/lease/controller.go#L100 renewIntervalJitter := 0.04 - l, err := NewNodeLeaseController(NodeLeaseControllerConfig{ + nodeLeases, err = NewNodeLeaseController(NodeLeaseControllerConfig{ Clock: conf.Clock, TypedClient: conf.TypedClient, + NodeCacheGetter: nodesCache, LeaseDurationSeconds: conf.NodeLeaseDurationSeconds, LeaseParallelism: conf.NodeLeaseParallelism, RenewInterval: renewInterval, RenewIntervalJitter: renewIntervalJitter, - LeaseNamespace: corev1.NamespaceNodeLease, MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference { - return getNodeOwnerFunc(nodeName) + node, ok := nodesCache.Get(nodeName) + if !ok { + return nil + } + ownerReferences := []metav1.OwnerReference{ + { + APIVersion: nodeKind.Version, + Kind: nodeKind.Kind, + Name: node.Name, + UID: node.UID, + }, + } + return ownerReferences }), HolderIdentity: conf.ID, OnNodeManagedFunc: func(nodeName string) { @@ -207,7 +246,6 @@ func (c *Controller) Start(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to create node leases controller: %w", err) } - nodeLeases = l // Not holding the lease means the node is not managed readOnlyFunc = func(nodeName string) bool { @@ -292,13 +330,12 @@ func (c *Controller) Start(ctx context.Context) error { nodes, err := NewNodeController(NodeControllerConfig{ Clock: conf.Clock, TypedClient: conf.TypedClient, + NodeCacheGetter: nodesCache, NodeIP: conf.NodeIP, NodeName: conf.NodeName, NodePort: conf.NodePort, DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector, DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector, - ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector, - NodeSelectorFunc: nodeSelectorFunc, OnNodeManagedFunc: func(nodeName string) { onNodeManagedFunc(nodeName) }, @@ -317,13 +354,13 @@ func (c *Controller) Start(ctx context.Context) error { Clock: conf.Clock, EnableCNI: conf.EnableCNI, TypedClient: conf.TypedClient, + NodeCacheGetter: nodesCache, NodeIP: conf.NodeIP, CIDR: conf.CIDR, DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector, DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector, Lifecycle: podLifecycleGetter, PlayStageParallelism: conf.PodPlayStageParallelism, - Namespace: corev1.NamespaceAll, NodeGetFunc: nodes.Get, FuncMap: defaultFuncMap, Recorder: recorder, @@ -334,43 +371,68 @@ func (c *Controller) Start(ctx context.Context) error { return fmt.Errorf("failed to create pods controller: %w", err) } + podOnNodeManageQueue := queue.NewQueue[string]() if nodeLeases != nil { - getNodeOwnerFunc = func(nodeName string) []metav1.OwnerReference { - nodeInfo, ok := nodes.Get(nodeName) - if !ok || nodeInfo == nil { - return nil - } - return nodeInfo.OwnerReferences - } + nodeManageQueue := queue.NewQueue[string]() onLeaseNodeManageFunc = func(nodeName string) { - // Manage the node and play stage all pods on the node - nodes.Manage(nodeName) - pods.PlayStagePodsOnNode(nodeName) + nodeManageQueue.Add(nodeName) + podOnNodeManageQueue.Add(nodeName) } - onNodeManagedFunc = func(nodeName string) { // Try to hold the lease nodeLeases.TryHold(nodeName) } + + go func() { + for { + nodeName := nodeManageQueue.GetOrWait() + node, ok := nodesCache.Get(nodeName) + if !ok { + logger.Warn("node not found in cache", "node", nodeName) + err := nodesInformer.Sync(ctx, informer.Option{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", nodeName).String(), + }, nodeChan) + if err != nil { + logger.Error("failed to update node", err, "node", nodeName) + } + continue + } + nodeChan <- informer.Event[*corev1.Node]{ + Type: informer.Sync, + Object: node, + } + } + }() } else { onNodeManagedFunc = func(nodeName string) { - // Play stage all pods on the node - pods.PlayStagePodsOnNode(nodeName) + podOnNodeManageQueue.Add(nodeName) } } + go func() { + for { + nodeName := podOnNodeManageQueue.GetOrWait() + err = podsInformer.Sync(ctx, informer.Option{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), + }, podsChan) + if err != nil { + logger.Error("failed to update pods on node", err, "node", nodeName) + } + } + }() + c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.typedClient.CoreV1().Events("")}) if nodeLeases != nil { - err := nodeLeases.Start(ctx) + err := nodeLeases.Start(ctx, nodeLeasesChan) if err != nil { return fmt.Errorf("failed to start node leases controller: %w", err) } } - err = pods.Start(ctx) + err = pods.Start(ctx, podsChan) if err != nil { return fmt.Errorf("failed to start pods controller: %w", err) } - err = nodes.Start(ctx) + err = nodes.Start(ctx, nodeChan) if err != nil { return fmt.Errorf("failed to start nodes controller: %w", err) } @@ -378,24 +440,36 @@ func (c *Controller) Start(ctx context.Context) error { c.pods = pods c.nodes = nodes c.nodeLeases = nodeLeases + c.nodeCacheGetter = nodesCache + c.podCacheGetter = podsCache return nil } -// GetNode returns the node with the given name -func (c *Controller) GetNode(nodeName string) (*NodeInfo, bool) { +// GetNodeInfo returns the node info for the given node +func (c *Controller) GetNodeInfo(nodeName string) (*NodeInfo, bool) { return c.nodes.Get(nodeName) } // ListNodes returns all nodes -func (c *Controller) ListNodes() []*NodeInfo { +func (c *Controller) ListNodes() []string { return c.nodes.List() } // ListPods returns all pods on the given node -func (c *Controller) ListPods(nodeName string) ([]*PodInfo, bool) { +func (c *Controller) ListPods(nodeName string) ([]log.ObjectRef, bool) { return c.pods.List(nodeName) } +// GetPodCache returns the pod cache +func (c *Controller) GetPodCache() informer.Getter[*corev1.Pod] { + return c.podCacheGetter +} + +// GetNodeCache returns the node cache +func (c *Controller) GetNodeCache() informer.Getter[*corev1.Node] { + return c.nodeCacheGetter +} + // Identity returns a unique identifier for this controller func Identity() (string, error) { hostname, err := os.Hostname() diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index ffd40f842..1bffde665 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -23,19 +23,14 @@ import ( "fmt" "net" "sync/atomic" - "time" - "github.com/wzshiming/cron" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" netutils "k8s.io/utils/net" @@ -45,12 +40,12 @@ import ( "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/expression" "sigs.k8s.io/kwok/pkg/utils/gotpl" + "sigs.k8s.io/kwok/pkg/utils/informer" "sigs.k8s.io/kwok/pkg/utils/maps" - "sigs.k8s.io/kwok/pkg/utils/slices" + "sigs.k8s.io/kwok/pkg/utils/queue" ) var ( - // https://kubernetes.io/docs/concepts/architecture/nodes/#condition nodeConditions = []corev1.NodeCondition{ { @@ -85,32 +80,28 @@ var ( }, } nodeConditionsData, _ = expression.ToJSONStandard(nodeConditions) - nodeKind = corev1.SchemeGroupVersion.WithKind("Node") ) // NodeController is a fake nodes implementation that can be used to test type NodeController struct { clock clock.Clock typedClient kubernetes.Interface + nodeCacheGetter informer.Getter[*corev1.Node] nodeIP string nodeName string nodePort int disregardStatusWithAnnotationSelector labels.Selector disregardStatusWithLabelSelector labels.Selector - manageNodesWithLabelSelector string - nodeSelectorFunc func(node *corev1.Node) bool onNodeManagedFunc func(nodeName string) nodesSets maps.SyncMap[string, *NodeInfo] renderer gotpl.Renderer preprocessChan chan *corev1.Node - playStageChan chan resourceStageJob[*corev1.Node] playStageParallelism uint lifecycle resources.Getter[Lifecycle] - cronjob *cron.Cron - delayJobs jobInfoMap + delayQueue queue.DelayingQueue[resourceStageJob[*corev1.Node]] + delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Node]] recorder record.EventRecorder readOnlyFunc func(nodeName string) bool - triggerPreprocessChan chan string enableMetrics bool } @@ -118,11 +109,10 @@ type NodeController struct { type NodeControllerConfig struct { Clock clock.Clock TypedClient kubernetes.Interface - NodeSelectorFunc func(node *corev1.Node) bool + NodeCacheGetter informer.Getter[*corev1.Node] OnNodeManagedFunc func(nodeName string) DisregardStatusWithAnnotationSelector string DisregardStatusWithLabelSelector string - ManageNodesWithLabelSelector string NodeIP string NodeName string NodePort int @@ -136,10 +126,6 @@ type NodeControllerConfig struct { // NodeInfo is the collection of necessary node information type NodeInfo struct { - Node *corev1.Node - HostIPs []string - PodCIDRs []string - OwnerReferences []metav1.OwnerReference StartedContainer atomic.Int64 } @@ -166,20 +152,17 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { c := &NodeController{ clock: conf.Clock, typedClient: conf.TypedClient, - nodeSelectorFunc: conf.NodeSelectorFunc, + nodeCacheGetter: conf.NodeCacheGetter, disregardStatusWithAnnotationSelector: disregardStatusWithAnnotationSelector, disregardStatusWithLabelSelector: disregardStatusWithLabelSelector, - manageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector, onNodeManagedFunc: conf.OnNodeManagedFunc, nodeIP: conf.NodeIP, nodeName: conf.NodeName, nodePort: conf.NodePort, - cronjob: cron.NewCron(), + delayQueue: queue.NewDelayingQueue[resourceStageJob[*corev1.Node]](conf.Clock), lifecycle: conf.Lifecycle, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Node), - triggerPreprocessChan: make(chan string, 64), - playStageChan: make(chan resourceStageJob[*corev1.Node]), recorder: conf.Recorder, readOnlyFunc: conf.ReadOnlyFunc, enableMetrics: conf.EnableMetrics, @@ -201,35 +184,16 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { // Start starts the fake nodes controller // if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed -func (c *NodeController) Start(ctx context.Context) error { +func (c *NodeController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Node]) error { go c.preprocessWorker(ctx) - go c.triggerPreprocessWorker(ctx) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } - - opt := metav1.ListOptions{ - LabelSelector: c.manageNodesWithLabelSelector, - } - err := c.watchResources(ctx, opt) - if err != nil { - return fmt.Errorf("failed watch nodes: %w", err) - } - - logger := log.FromContext(ctx) - go func() { - err = c.listResources(ctx, opt) - if err != nil { - logger.Error("Failed list nodes", err) - } - }() + go c.watchResources(ctx, events) return nil } func (c *NodeController) need(node *corev1.Node) bool { - if !c.nodeSelectorFunc(node) { - return false - } if c.disregardStatusWithAnnotationSelector != nil && len(node.Annotations) != 0 && c.disregardStatusWithAnnotationSelector.Matches(labels.Set(node.Annotations)) { @@ -245,115 +209,52 @@ func (c *NodeController) need(node *corev1.Node) bool { } // watchResources watch resources and send to preprocessChan -func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOptions) error { - // Watch nodes in the cluster - watcher, err := c.typedClient.CoreV1().Nodes().Watch(ctx, opt) - if err != nil { - return err - } - +func (c *NodeController) watchResources(ctx context.Context, events <-chan informer.Event[*corev1.Node]) { logger := log.FromContext(ctx) - go func() { - rc := watcher.ResultChan() - loop: - for { - select { - case event, ok := <-rc: - if !ok { - logger.Warn("Watch channel has been stopped, retrying") - for { - watcher, err := c.typedClient.CoreV1().Nodes().Watch(ctx, opt) - if err == nil { - rc = watcher.ResultChan() - continue loop - } - - logger.Error("Failed to watch nodes", err) - select { - case <-ctx.Done(): - break loop - case <-c.clock.After(time.Second * 5): - } +loop: + for { + select { + case event, ok := <-events: + if !ok { + break loop + } + switch event.Type { + case informer.Added, informer.Modified, informer.Sync: + node := event.Object + if c.need(node) { + c.putNodeInfo(node) + if c.readOnly(node.Name) { + logger.Debug("Skip node", + "reason", "read only", + "event", event.Type, + "node", node.Name, + ) + } else { + c.preprocessChan <- node } } - switch event.Type { - case watch.Added: - node := event.Object.(*corev1.Node) - if c.need(node) { - c.putNodeInfo(node) - if c.onNodeManagedFunc != nil { - c.onNodeManagedFunc(node.Name) - } - if c.readOnly(node.Name) { - logger.Debug("Skip node", - "reason", "read only", - "event", event.Type, - "node", node.Name, - ) - } else { - c.preprocessChan <- node - } - } - case watch.Modified: - node := event.Object.(*corev1.Node) - if c.need(node) { - c.putNodeInfo(node) - if c.readOnly(node.Name) { - logger.Debug("Skip node", - "reason", "read only", - "event", event.Type, - "node", node.Name, - ) - } else { - c.preprocessChan <- node - } - } - case watch.Deleted: - node := event.Object.(*corev1.Node) - if _, has := c.nodesSets.Load(node.Name); has { - c.deleteNodeInfo(node) - - // Cancel delay job - key := node.Name - resourceJob, ok := c.delayJobs.LoadAndDelete(key) - if ok { - resourceJob.Cancel() - } + + if c.onNodeManagedFunc != nil && event.Type != informer.Modified { + c.onNodeManagedFunc(node.Name) + } + case informer.Deleted: + node := event.Object + if _, has := c.nodesSets.Load(node.Name); has { + c.deleteNodeInfo(node) + + // Cancel delay job + key := node.Name + resourceJob, ok := c.delayQueueMapping.LoadAndDelete(key) + if ok { + c.delayQueue.Cancel(resourceJob) } } - case <-ctx.Done(): - watcher.Stop() - break loop - } - } - logger.Info("Stop watch nodes") - }() - return nil -} - -// listResources lists all resources and sends to preprocessChan -func (c *NodeController) listResources(ctx context.Context, opt metav1.ListOptions) error { - listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.typedClient.CoreV1().Nodes().List(ctx, opts) - }) - - logger := log.FromContext(ctx) - - return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { - node := obj.(*corev1.Node) - if c.need(node) { - c.putNodeInfo(node) - if c.readOnly(node.Name) { - logger.Debug("Skip node", - "node", node.Name, - "reason", "read only", - ) - } else { - c.preprocessChan <- node } + case <-ctx.Done(): + break loop } - return nil - }) + } + logger.Info("Stop watch nodes") } // finalizersModify modify finalizers of node @@ -427,40 +328,12 @@ func (c *NodeController) preprocessWorker(ctx context.Context) { } } -// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it -func (c *NodeController) triggerPreprocessWorker(ctx context.Context) { - logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - logger.Debug("Stop trigger preprocess worker") - return - case nodeName := <-c.triggerPreprocessChan: - nodeInfo, has := c.nodesSets.Load(nodeName) - if !has || nodeInfo.Node == nil { - logger.Warn("Node not found", - "node", nodeName, - ) - continue - } - if c.readOnly(nodeInfo.Node.Name) { - logger.Debug("Skip node", - "node", nodeInfo.Node.Name, - "reason", "read only", - ) - } else { - c.preprocessChan <- nodeInfo.Node - } - } - } -} - // preprocess the pod and send it to the playStageWorker func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) error { key := node.Name - resourceJob, ok := c.delayJobs.Load(key) - if ok && resourceJob.ResourceVersion == node.ResourceVersion { + resourceJob, ok := c.delayQueueMapping.Load(key) + if ok && resourceJob.Resource.ResourceVersion == node.ResourceVersion { return nil } @@ -497,39 +370,28 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro ) } - cancelFunc, ok := c.cronjob.AddWithCancel(cron.Order(now.Add(delay)), func() { - resourceJob, ok := c.delayJobs.LoadAndDelete(key) - if ok { - resourceJob.Cancel() - } - c.playStageChan <- resourceStageJob[*corev1.Node]{ - Resource: node, - Stage: stage, - } - }) - if ok { - resourceJob, ok := c.delayJobs.LoadOrStore(key, jobInfo{ - ResourceVersion: node.ResourceVersion, - Cancel: cancelFunc, - }) - if ok { - resourceJob.Cancel() - } + item := resourceStageJob[*corev1.Node]{ + Resource: node, + Stage: stage, + Key: key, + } + ok = c.delayQueue.AddAfter(item, delay) + if !ok { + logger.Debug("Skip node", + "reason", "delayed", + ) + } else { + c.delayQueueMapping.Store(key, item) } return nil } // playStageWorker receives the resource from the playStageChan and play the stage func (c *NodeController) playStageWorker(ctx context.Context) { - logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - logger.Debug("Stop play stage worker") - return - case node := <-c.playStageChan: - c.playStage(ctx, node.Resource, node.Stage) - } + for ctx.Err() == nil { + node := c.delayQueue.GetOrWait() + c.delayQueueMapping.Delete(node.Key) + c.playStage(ctx, node.Resource, node.Stage) } } @@ -652,30 +514,7 @@ func (c *NodeController) computePatch(node *corev1.Node, tpl string) ([]byte, er // putNodeInfo puts node info func (c *NodeController) putNodeInfo(node *corev1.Node) { - nodeIPs := getNodeHostIPs(node) - hostIps := slices.Map(nodeIPs, func(ip net.IP) string { - return ip.String() - }) - - podCIDRs := node.Spec.PodCIDRs - if len(podCIDRs) == 0 && node.Spec.PodCIDR != "" { - podCIDRs = []string{node.Spec.PodCIDR} - } - - nodeInfo := &NodeInfo{ - Node: node, - HostIPs: hostIps, - PodCIDRs: podCIDRs, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: nodeKind.Version, - Kind: nodeKind.Kind, - Name: node.Name, - UID: node.UID, - }, - }, - } - c.nodesSets.Store(node.Name, nodeInfo) + c.nodesSets.Store(node.Name, &NodeInfo{}) } // deleteNodeInfo deletes node info @@ -683,11 +522,6 @@ func (c *NodeController) deleteNodeInfo(node *corev1.Node) { c.nodesSets.Delete(node.Name) } -// Manage manages the node -func (c *NodeController) Manage(nodeName string) { - c.triggerPreprocessChan <- nodeName -} - // getNodeHostIPs returns the provided node's IP(s); either a single "primary IP" for the // node in a single-stack cluster, or a dual-stack pair of IPs in a dual-stack cluster // (for nodes that actually have dual-stack IPs). Among other things, the IPs returned @@ -730,17 +564,6 @@ func getNodeHostIPs(node *corev1.Node) []net.IP { return nodeIPs } -// Has returns true if the node is existed -func (c *NodeController) Has(nodeName string) bool { - _, has := c.nodesSets.Load(nodeName) - return has -} - -// Size returns the number of nodes -func (c *NodeController) Size() int { - return c.nodesSets.Size() -} - // Get returns Has bool and node info func (c *NodeController) Get(nodeName string) (*NodeInfo, bool) { nodeInfo, has := c.nodesSets.Load(nodeName) @@ -750,14 +573,9 @@ func (c *NodeController) Get(nodeName string) (*NodeInfo, bool) { return nil, has } -// List returns all nodes -func (c *NodeController) List() []*NodeInfo { - nodes := []*NodeInfo{} - c.nodesSets.Range(func(key string, value *NodeInfo) bool { - nodes = append(nodes, value) - return true - }) - return nodes +// List returns all name of nodes +func (c *NodeController) List() []string { + return c.nodesSets.Keys() } func (c *NodeController) funcNodeIP() string { diff --git a/pkg/kwok/controllers/node_controller_test.go b/pkg/kwok/controllers/node_controller_test.go index 8b86f785f..a0688fe11 100644 --- a/pkg/kwok/controllers/node_controller_test.go +++ b/pkg/kwok/controllers/node_controller_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "os" - "strings" "testing" "time" @@ -34,6 +33,7 @@ import ( "sigs.k8s.io/kwok/pkg/config" "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/informer" "sigs.k8s.io/kwok/pkg/utils/wait" ) @@ -42,6 +42,9 @@ func TestNodeController(t *testing.T) { &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node0", + Annotations: map[string]string{ + "node": "true", + }, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -69,7 +72,7 @@ func TestNodeController(t *testing.T) { ) nodeSelectorFunc := func(node *corev1.Node) bool { - return strings.HasPrefix(node.Name, "node") + return node.Annotations["node"] == "true" } nodeInit, _ := config.Unmarshal([]byte(nodefast.DefaultNodeInit)) @@ -79,7 +82,6 @@ func TestNodeController(t *testing.T) { nodes, err := NewNodeController(NodeControllerConfig{ TypedClient: clientset, NodeIP: "10.0.0.1", - NodeSelectorFunc: nodeSelectorFunc, Lifecycle: resources.NewStaticGetter(lifecycle), FuncMap: defaultFuncMap, PlayStageParallelism: 2, @@ -89,13 +91,23 @@ func TestNodeController(t *testing.T) { } ctx := context.Background() ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) t.Cleanup(func() { cancel() time.Sleep(time.Second) }) - err = nodes.Start(ctx) + nodeCh := make(chan informer.Event[*corev1.Node], 1) + nodesCli := clientset.CoreV1().Nodes() + nodesInformer := informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli) + err = nodesInformer.Watch(ctx, informer.Option{ + AnnotationSelector: "node=true", + }, nodeCh) + if err != nil { + t.Fatal(fmt.Errorf("failed to watch nodes: %w", err)) + } + + err = nodes.Start(ctx, nodeCh) if err != nil { t.Fatal(fmt.Errorf("failed to start nodes controller: %w", err)) } @@ -123,17 +135,6 @@ func TestNodeController(t *testing.T) { t.Fatal(fmt.Errorf("failed to create node1: %w", err)) } - err = wait.Poll(ctx, func(ctx context.Context) (done bool, err error) { - nodeSize := nodes.Size() - if nodeSize != 2 { - return false, fmt.Errorf("want 2 nodes, got %d", nodeSize) - } - return true, nil - }, wait.WithContinueOnError(5)) - if err != nil { - t.Fatal(err) - } - node1, err = clientset.CoreV1().Nodes().Get(ctx, "node1", metav1.GetOptions{}) if err != nil { t.Fatal(fmt.Errorf("failed to get node1: %w", err)) diff --git a/pkg/kwok/controllers/node_lease_controller.go b/pkg/kwok/controllers/node_lease_controller.go index f108ef532..a7a3619cc 100644 --- a/pkg/kwok/controllers/node_lease_controller.go +++ b/pkg/kwok/controllers/node_lease_controller.go @@ -21,26 +21,25 @@ import ( "fmt" "time" - "github.com/wzshiming/cron" coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/pager" "k8s.io/utils/clock" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/format" + "sigs.k8s.io/kwok/pkg/utils/informer" "sigs.k8s.io/kwok/pkg/utils/maps" + "sigs.k8s.io/kwok/pkg/utils/queue" "sigs.k8s.io/kwok/pkg/utils/wait" ) // NodeLeaseController is responsible for creating and renewing a lease object type NodeLeaseController struct { typedClient clientset.Interface - leaseNamespace string + nodeCacheGetter informer.Getter[*corev1.Node] leaseDurationSeconds uint leaseParallelism uint renewInterval time.Duration @@ -53,10 +52,7 @@ type NodeLeaseController struct { // mutateLeaseFunc allows customizing a lease object mutateLeaseFunc func(*coordinationv1.Lease) error - cronjob *cron.Cron - cancelJob maps.SyncMap[string, cron.DoFunc] - - leaseChan chan string + delayQueue queue.DelayingQueue[string] holderIdentity string onNodeManagedFunc func(nodeName string) @@ -67,11 +63,11 @@ type NodeLeaseControllerConfig struct { Clock clock.Clock HolderIdentity string TypedClient clientset.Interface + NodeCacheGetter informer.Getter[*corev1.Node] LeaseDurationSeconds uint LeaseParallelism uint RenewInterval time.Duration RenewIntervalJitter float64 - LeaseNamespace string MutateLeaseFunc func(*coordinationv1.Lease) error OnNodeManagedFunc func(nodeName string) } @@ -89,14 +85,13 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle c := &NodeLeaseController{ clock: conf.Clock, typedClient: conf.TypedClient, - leaseNamespace: conf.LeaseNamespace, + nodeCacheGetter: conf.NodeCacheGetter, leaseDurationSeconds: conf.LeaseDurationSeconds, leaseParallelism: conf.LeaseParallelism, renewInterval: conf.RenewInterval, renewIntervalJitter: conf.RenewIntervalJitter, mutateLeaseFunc: conf.MutateLeaseFunc, - cronjob: cron.NewCron(), - leaseChan: make(chan string), + delayQueue: queue.NewDelayingQueue[string](conf.Clock), holderIdentity: conf.HolderIdentity, onNodeManagedFunc: conf.OnNodeManagedFunc, } @@ -105,99 +100,53 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle } // Start starts the NodeLeaseController -func (c *NodeLeaseController) Start(ctx context.Context) error { +func (c *NodeLeaseController) Start(ctx context.Context, events <-chan informer.Event[*coordinationv1.Lease]) error { for i := uint(0); i < c.leaseParallelism; i++ { go c.syncWorker(ctx) } - - opt := metav1.ListOptions{} - err := c.watchResources(ctx, opt) - if err != nil { - return fmt.Errorf("failed watch node leases: %w", err) - } - - logger := log.FromContext(ctx) - go func() { - err = c.listResources(ctx, opt) - if err != nil { - logger.Error("Failed list node leases", err) - } - }() + go c.watchResources(ctx, events) return nil } // watchResources watch resources and send to preprocessChan -func (c *NodeLeaseController) watchResources(ctx context.Context, opt metav1.ListOptions) error { - // Watch node leases in the cluster - watcher, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Watch(ctx, opt) - if err != nil { - return err - } - +func (c *NodeLeaseController) watchResources(ctx context.Context, events <-chan informer.Event[*coordinationv1.Lease]) { logger := log.FromContext(ctx) - go func() { - rc := watcher.ResultChan() - loop: - for { - select { - case event, ok := <-rc: - if !ok { - for { - watcher, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Watch(ctx, opt) - if err == nil { - rc = watcher.ResultChan() - continue loop - } - - logger.Error("Failed to watch node leases", err) - select { - case <-ctx.Done(): - break loop - case <-c.clock.After(time.Second * 5): - } - } - } - switch event.Type { - case watch.Added, watch.Modified: - lease := event.Object.(*coordinationv1.Lease) - c.latestLease.Store(lease.Name, lease) - case watch.Deleted: - lease := event.Object.(*coordinationv1.Lease) - c.remove(lease.Name) - } - case <-ctx.Done(): - watcher.Stop() +loop: + for { + select { + case event, ok := <-events: + if !ok { break loop } + switch event.Type { + case informer.Added, informer.Modified, informer.Sync: + lease := event.Object.DeepCopy() + c.latestLease.Store(lease.Name, lease) + case informer.Deleted: + lease := event.Object + c.remove(lease.Name) + } + case <-ctx.Done(): + break loop } - logger.Info("Stop watch node leases") - }() - return nil -} - -// listResources lists all resources and sends to preprocessChan -func (c *NodeLeaseController) listResources(ctx context.Context, opt metav1.ListOptions) error { - listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.typedClient.CoordinationV1().Leases(c.leaseNamespace).List(ctx, opts) - }) - - return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { - lease := obj.(*coordinationv1.Lease) - c.latestLease.Store(lease.Name, lease) - return nil - }) + } + logger.Info("Stop watch node leases") } func (c *NodeLeaseController) syncWorker(ctx context.Context) { - logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - logger.Debug("Stop sync worker") - return - case nodeName := <-c.leaseChan: - c.sync(ctx, nodeName) + for ctx.Err() == nil { + nodeName := c.delayQueue.GetOrWait() + if c.nodeCacheGetter != nil { + _, ok := c.nodeCacheGetter.Get(nodeName) + if !ok { + continue + } } + + now := c.clock.Now() + c.sync(ctx, nodeName) + nextTime := c.nextTryTime(nodeName, now) + _ = c.delayQueue.AddAfter(nodeName, nextTime.Sub(now)) } } @@ -215,39 +164,13 @@ func (c *NodeLeaseController) nextTryTime(name string, now time.Time) time.Time // TryHold tries to hold a lease for the NodeLeaseController func (c *NodeLeaseController) TryHold(name string) { - // if already has a cron job, return - _, ok := c.cancelJob.Load(name) - if ok { - return - } - - // add a cron job to sync the lease periodically - cancel, ok := c.cronjob.AddWithCancel( - func(now time.Time) (time.Time, bool) { - return c.nextTryTime(name, now), true - }, - func() { - c.leaseChan <- name - }, - ) - if ok { - old, ok := c.cancelJob.Swap(name, cancel) - if ok { - old() - } - } - - // trigger a sync immediately - c.leaseChan <- name + c.delayQueue.Add(name) } // remove removes a lease from the NodeLeaseController func (c *NodeLeaseController) remove(name string) { - cancel, ok := c.cancelJob.LoadAndDelete(name) - if ok { - cancel() - c.latestLease.Delete(name) - } + _ = c.delayQueue.Cancel(name) + c.latestLease.Delete(name) } // Held returns true if the NodeLeaseController holds the lease @@ -294,14 +217,14 @@ func (c *NodeLeaseController) sync(ctx context.Context, nodeName string) { logger.Info("Creating lease") latestLease, err := c.ensureLease(ctx, nodeName) if err != nil { - if !apierrors.IsNotFound(err) || c.latestLease.Size() != 0 { + if !apierrors.IsNotFound(err) || !c.latestLease.IsEmpty() { logger.Error("failed to create lease", err) return } // kube-apiserver will not have finished initializing the resources when the cluster has just been created. logger.Error("lease namespace not found, retrying in 1 second", err) - time.Sleep(1 * time.Second) + c.clock.Sleep(1 * time.Second) latestLease, err = c.ensureLease(ctx, nodeName) if err != nil { logger.Error("failed to create lease secondly", err) @@ -325,7 +248,7 @@ func (c *NodeLeaseController) ensureLease(ctx context.Context, leaseName string) lease := &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: leaseName, - Namespace: c.leaseNamespace, + Namespace: corev1.NamespaceNodeLease, }, Spec: coordinationv1.LeaseSpec{ HolderIdentity: &c.holderIdentity, @@ -340,7 +263,7 @@ func (c *NodeLeaseController) ensureLease(ctx context.Context, leaseName string) } } - lease, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Create(ctx, lease, metav1.CreateOptions{}) + lease, err := c.typedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease).Create(ctx, lease, metav1.CreateOptions{}) if err != nil { return nil, err } @@ -366,7 +289,7 @@ func (c *NodeLeaseController) renewLease(ctx context.Context, base *coordination } } - lease, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Update(ctx, lease, metav1.UpdateOptions{}) + lease, err := c.typedClient.CoordinationV1().Leases(lease.Namespace).Update(ctx, lease, metav1.UpdateOptions{}) if err != nil { return nil, false, err } diff --git a/pkg/kwok/controllers/node_lease_controller_test.go b/pkg/kwok/controllers/node_lease_controller_test.go index 47b2f9bff..d93b8a1fd 100644 --- a/pkg/kwok/controllers/node_lease_controller_test.go +++ b/pkg/kwok/controllers/node_lease_controller_test.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/format" + "sigs.k8s.io/kwok/pkg/utils/informer" ) func TestNodeLeaseController(t *testing.T) { @@ -77,7 +78,6 @@ func TestNodeLeaseController(t *testing.T) { LeaseParallelism: 2, RenewInterval: 10 * time.Second, RenewIntervalJitter: 0.04, - LeaseNamespace: corev1.NamespaceNodeLease, }) if err != nil { t.Fatal(fmt.Errorf("new node leases controller error: %w", err)) @@ -85,12 +85,21 @@ func TestNodeLeaseController(t *testing.T) { ctx := context.Background() ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) - ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) t.Cleanup(func() { cancel() time.Sleep(time.Second) }) - err = nodeLeases.Start(ctx) + + nodeLeasesCh := make(chan informer.Event[*coordinationv1.Lease], 1) + nodeLeasesCli := clientset.CoordinationV1().Leases(corev1.NamespaceNodeLease) + nodesInformer := informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli) + err = nodesInformer.Watch(ctx, informer.Option{}, nodeLeasesCh) + if err != nil { + t.Fatal(fmt.Errorf("watch node leases error: %w", err)) + } + + err = nodeLeases.Start(ctx, nodeLeasesCh) if err != nil { t.Fatal(fmt.Errorf("start node leases controller error: %w", err)) } diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 18041bfd0..4230e315b 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -21,20 +21,14 @@ import ( "context" "encoding/json" "fmt" - "time" - "github.com/wzshiming/cron" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" @@ -44,12 +38,13 @@ import ( "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/expression" "sigs.k8s.io/kwok/pkg/utils/gotpl" + "sigs.k8s.io/kwok/pkg/utils/informer" "sigs.k8s.io/kwok/pkg/utils/maps" + "sigs.k8s.io/kwok/pkg/utils/queue" ) var ( - deleteOpt = *metav1.NewDeleteOptions(0) - podFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String() + deleteOpt = *metav1.NewDeleteOptions(0) ) // PodController is a fake pods implementation that can be used to test @@ -57,31 +52,28 @@ type PodController struct { clock clock.Clock enableCNI bool typedClient kubernetes.Interface + nodeCacheGetter informer.Getter[*corev1.Node] disregardStatusWithAnnotationSelector labels.Selector disregardStatusWithLabelSelector labels.Selector nodeIP string defaultCIDR string - namespace string nodeGetFunc func(nodeName string) (*NodeInfo, bool) ipPools maps.SyncMap[string, *ipPool] renderer gotpl.Renderer podsSets maps.SyncMap[log.ObjectRef, *PodInfo] podsOnNode maps.SyncMap[string, *maps.SyncMap[log.ObjectRef, *PodInfo]] preprocessChan chan *corev1.Pod - playStageChan chan resourceStageJob[*corev1.Pod] playStageParallelism uint lifecycle resources.Getter[Lifecycle] - cronjob *cron.Cron - delayJobs jobInfoMap + delayQueue queue.DelayingQueue[resourceStageJob[*corev1.Pod]] + delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Pod]] recorder record.EventRecorder readOnlyFunc func(nodeName string) bool - triggerPreprocessChan chan string enableMetrics bool } // PodInfo is the collection of necessary pod information type PodInfo struct { - Pod *corev1.Pod } // PodControllerConfig is the configuration for the PodController @@ -89,11 +81,11 @@ type PodControllerConfig struct { Clock clock.Clock EnableCNI bool TypedClient kubernetes.Interface + NodeCacheGetter informer.Getter[*corev1.Node] DisregardStatusWithAnnotationSelector string DisregardStatusWithLabelSelector string NodeIP string CIDR string - Namespace string NodeGetFunc func(nodeName string) (*NodeInfo, bool) NodeHasMetric func(nodeName string) bool Lifecycle resources.Getter[Lifecycle] @@ -128,18 +120,16 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { clock: conf.Clock, enableCNI: conf.EnableCNI, typedClient: conf.TypedClient, + nodeCacheGetter: conf.NodeCacheGetter, disregardStatusWithAnnotationSelector: disregardStatusWithAnnotationSelector, disregardStatusWithLabelSelector: disregardStatusWithLabelSelector, nodeIP: conf.NodeIP, defaultCIDR: conf.CIDR, - namespace: conf.Namespace, nodeGetFunc: conf.NodeGetFunc, - cronjob: cron.NewCron(), + delayQueue: queue.NewDelayingQueue[resourceStageJob[*corev1.Pod]](conf.Clock), lifecycle: conf.Lifecycle, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Pod), - triggerPreprocessChan: make(chan string, 64), - playStageChan: make(chan resourceStageJob[*corev1.Pod]), recorder: conf.Recorder, readOnlyFunc: conf.ReadOnlyFunc, enableMetrics: conf.EnableMetrics, @@ -159,28 +149,12 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { // Start starts the fake pod controller // It will modify the pods status to we want -func (c *PodController) Start(ctx context.Context) error { +func (c *PodController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) error { go c.preprocessWorker(ctx) - go c.triggerPreprocessWorker(ctx) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } - - opt := metav1.ListOptions{ - FieldSelector: podFieldSelector, - } - err := c.watchResources(ctx, opt) - if err != nil { - return fmt.Errorf("failed watch pods: %w", err) - } - - logger := log.FromContext(ctx) - go func() { - err = c.listResources(ctx, opt) - if err != nil { - logger.Error("Failed list pods", err) - } - }() + go c.watchResources(ctx, events) return nil } @@ -258,33 +232,12 @@ func (c *PodController) preprocessWorker(ctx context.Context) { } } -// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it -func (c *PodController) triggerPreprocessWorker(ctx context.Context) { - logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - logger.Debug("Stop trigger preprocess worker") - return - case nodeName := <-c.triggerPreprocessChan: - err := c.listResources(ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), - }) - if err != nil { - logger.Error("Failed to preprocess node", err, - "node", nodeName, - ) - } - } - } -} - // preprocess the pod and send it to the playStageWorker func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { key := log.KObj(pod).String() - resourceJob, ok := c.delayJobs.Load(key) - if ok && resourceJob.ResourceVersion == pod.ResourceVersion { + resourceJob, ok := c.delayQueueMapping.Load(key) + if ok && resourceJob.Resource.ResourceVersion == pod.ResourceVersion { return nil } @@ -322,24 +275,18 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { ) } - cancelFunc, ok := c.cronjob.AddWithCancel(cron.Order(now.Add(delay)), func() { - resourceJob, ok := c.delayJobs.LoadAndDelete(key) - if ok { - resourceJob.Cancel() - } - c.playStageChan <- resourceStageJob[*corev1.Pod]{ - Resource: pod, - Stage: stage, - } - }) - if ok { - resourceJob, ok := c.delayJobs.LoadOrStore(key, jobInfo{ - ResourceVersion: pod.ResourceVersion, - Cancel: cancelFunc, - }) - if ok { - resourceJob.Cancel() - } + item := resourceStageJob[*corev1.Pod]{ + Resource: pod, + Stage: stage, + Key: key, + } + ok = c.delayQueue.AddAfter(item, delay) + if !ok { + logger.Debug("Skip pod", + "reason", "delayed", + ) + } else { + c.delayQueueMapping.Store(key, item) } return nil @@ -347,15 +294,10 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { // playStageWorker receives the resource from the playStageChan and play the stage func (c *PodController) playStageWorker(ctx context.Context) { - logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - logger.Debug("Stop play stage worker") - return - case pod := <-c.playStageChan: - c.playStage(ctx, pod.Resource, pod.Stage) - } + for ctx.Err() == nil { + pod := c.delayQueue.GetOrWait() + c.delayQueueMapping.Delete(pod.Key) + c.playStage(ctx, pod.Resource, pod.Stage) } } @@ -462,129 +404,72 @@ func (c *PodController) need(pod *corev1.Pod) bool { } // watchResources watch resources and send to preprocessChan -func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptions) error { - watcher, err := c.typedClient.CoreV1().Pods(c.namespace).Watch(ctx, opt) - if err != nil { - return err - } - +func (c *PodController) watchResources(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) { logger := log.FromContext(ctx) - go func() { - rc := watcher.ResultChan() - loop: - for { - select { - case event, ok := <-rc: - if !ok { - for { - watcher, err := c.typedClient.CoreV1().Pods(c.namespace).Watch(ctx, opt) - if err == nil { - rc = watcher.ResultChan() - continue loop - } - - logger.Error("Failed to watch pods", err) - select { - case <-ctx.Done(): - break loop - case <-c.clock.After(time.Second * 5): - } - } - } +loop: + for { + select { + case event, ok := <-events: + if !ok { + break loop + } - switch event.Type { - case watch.Added, watch.Modified: - pod := event.Object.(*corev1.Pod) - if c.enableMetrics { - c.putPodInfo(pod) - } - if c.need(pod) { - if c.readOnly(pod.Spec.NodeName) { - logger.Debug("Skip pod", - "reason", "read only", - "event", event.Type, - "pod", log.KObj(pod), - "node", pod.Spec.NodeName, - ) - } else { - c.preprocessChan <- pod.DeepCopy() - } - } else { + switch event.Type { + case informer.Added, informer.Modified, informer.Sync: + pod := event.Object + if c.enableMetrics { + c.putPodInfo(pod) + } + if c.need(pod) { + if c.readOnly(pod.Spec.NodeName) { logger.Debug("Skip pod", - "reason", "not managed", + "reason", "read only", "event", event.Type, "pod", log.KObj(pod), "node", pod.Spec.NodeName, ) + } else { + c.preprocessChan <- pod.DeepCopy() } + } else { + logger.Debug("Skip pod", + "reason", "not managed", + "event", event.Type, + "pod", log.KObj(pod), + "node", pod.Spec.NodeName, + ) + } - if c.enableMetrics && - event.Type == watch.Added && - c.nodeGetFunc != nil { - nodeInfo, ok := c.nodeGetFunc(pod.Spec.NodeName) - if ok { - nodeInfo.StartedContainer.Add(int64(len(pod.Spec.Containers))) - } - } - case watch.Deleted: - pod := event.Object.(*corev1.Pod) - if c.enableMetrics { - c.deletePodInfo(pod) + if c.enableMetrics && + event.Type == informer.Added && + c.nodeGetFunc != nil { + nodeInfo, ok := c.nodeGetFunc(pod.Spec.NodeName) + if ok { + nodeInfo.StartedContainer.Add(int64(len(pod.Spec.Containers))) } - if c.need(pod) { - // Recycling PodIP - c.recyclingPodIP(ctx, pod) - - // Cancel delay job - key := log.KObj(pod).String() - resourceJob, ok := c.delayJobs.LoadAndDelete(key) - if ok { - resourceJob.Cancel() - } + } + case informer.Deleted: + pod := event.Object + if c.enableMetrics { + c.deletePodInfo(pod) + } + if c.need(pod) { + // Recycling PodIP + c.recyclingPodIP(ctx, pod) + + // Cancel delay job + key := log.KObj(pod).String() + resourceJob, ok := c.delayQueueMapping.LoadAndDelete(key) + if ok { + c.delayQueue.Cancel(resourceJob) } } - case <-ctx.Done(): - watcher.Stop() - break loop - } - } - logger.Info("Stop watch pods") - }() - - return nil -} - -// listResources lists all resources and sends to preprocessChan -func (c *PodController) listResources(ctx context.Context, opt metav1.ListOptions) error { - listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.typedClient.CoreV1().Pods(c.namespace).List(ctx, opts) - }) - - logger := log.FromContext(ctx) - - return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { - pod := obj.(*corev1.Pod) - if c.enableMetrics { - c.putPodInfo(pod) - } - if c.need(pod) { - if c.readOnly(pod.Spec.NodeName) { - logger.Debug("Skip pod", - "pod", log.KObj(pod), - "node", pod.Spec.NodeName, - "reason", "read only", - ) - } else { - c.preprocessChan <- pod.DeepCopy() } + case <-ctx.Done(): + break loop } - return nil - }) -} - -// PlayStagePodsOnNode plays stage pods on node -func (c *PodController) PlayStagePodsOnNode(nodeName string) { - c.triggerPreprocessChan <- nodeName + } + logger.Info("Stop watch pods") } // ipPool returns the ipPool for the given cidr @@ -611,26 +496,29 @@ func (c *PodController) recyclingPodIP(ctx context.Context, pod *corev1.Pod) { } // Skip not managed node - nodeInfo, ok := c.nodeGetFunc(pod.Spec.NodeName) - if !ok { + _, has := c.nodeGetFunc(pod.Spec.NodeName) + if !has { return } logger := log.FromContext(ctx) if !c.enableCNI { - if pod.Status.PodIP != "" { + if pod.Status.PodIP != "" && c.nodeCacheGetter != nil { cidr := c.defaultCIDR - if len(nodeInfo.PodCIDRs) > 0 { - cidr = nodeInfo.PodCIDRs[0] - } - pool, err := c.ipPool(cidr) - if err != nil { - logger.Error("Failed to get ip pool", err, - "pod", log.KObj(pod), - "node", pod.Spec.NodeName, - ) - } else { - pool.Put(pod.Status.PodIP) + node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName) + if ok { + if node.Spec.PodCIDR != "" { + cidr = node.Spec.PodCIDR + } + pool, err := c.ipPool(cidr) + if err != nil { + logger.Error("Failed to get ip pool", err, + "pod", log.KObj(pod), + "node", pod.Spec.NodeName, + ) + } else { + pool.Put(pod.Status.PodIP) + } } } } else { @@ -644,15 +532,18 @@ func (c *PodController) recyclingPodIP(ctx context.Context, pod *corev1.Pod) { func (c *PodController) configureResource(pod *corev1.Pod, template string) ([]byte, error) { if !c.enableCNI { // Mark the pod IP that existed before the kubelet was started - if nodeInfo, has := c.nodeGetFunc(pod.Spec.NodeName); has { - if pod.Status.PodIP != "" { + if _, has := c.nodeGetFunc(pod.Spec.NodeName); has { + if pod.Status.PodIP != "" && c.nodeCacheGetter != nil { cidr := c.defaultCIDR - if len(nodeInfo.PodCIDRs) > 0 { - cidr = nodeInfo.PodCIDRs[0] - } - pool, err := c.ipPool(cidr) - if err == nil { - pool.Use(pod.Status.PodIP) + node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName) + if ok { + if node.Spec.PodCIDR != "" { + cidr = node.Spec.PodCIDR + } + pool, err := c.ipPool(cidr) + if err == nil { + pool.Use(pod.Status.PodIP) + } } } } @@ -710,11 +601,14 @@ func (c *PodController) funcNodeIP() string { } func (c *PodController) funcNodeIPWith(nodeName string) string { - nodeInfo, has := c.nodeGetFunc(nodeName) - if has && len(nodeInfo.HostIPs) > 0 { - hostIP := nodeInfo.HostIPs[0] - if hostIP != "" { - return hostIP + _, has := c.nodeGetFunc(nodeName) + if has && c.nodeCacheGetter != nil { + node, ok := c.nodeCacheGetter.Get(nodeName) + if ok { + hostIPs := getNodeHostIPs(node) + if len(hostIPs) != 0 { + return hostIPs[0].String() + } } } return c.nodeIP @@ -743,9 +637,14 @@ func (c *PodController) funcPodIPWith(nodeName string, hostNetwork bool, uid, na } podCIDR := c.defaultCIDR - nodeInfo, has := c.nodeGetFunc(nodeName) - if has && len(nodeInfo.PodCIDRs) > 0 { - podCIDR = nodeInfo.PodCIDRs[0] + _, has := c.nodeGetFunc(nodeName) + if has && c.nodeCacheGetter != nil { + node, ok := c.nodeCacheGetter.Get(nodeName) + if ok { + if node.Spec.PodCIDR != "" { + podCIDR = node.Spec.PodCIDR + } + } } pool, err := c.ipPool(podCIDR) @@ -757,9 +656,7 @@ func (c *PodController) funcPodIPWith(nodeName string, hostNetwork bool, uid, na // putPodInfo puts pod info func (c *PodController) putPodInfo(pod *corev1.Pod) { - podInfo := &PodInfo{ - Pod: pod, - } + podInfo := &PodInfo{} key := log.KObj(pod) c.podsSets.Store(key, podInfo) m, ok := c.podsOnNode.Load(pod.Spec.NodeName) @@ -791,10 +688,10 @@ func (c *PodController) Get(namespace, name string) (*PodInfo, bool) { } // List lists pod info -func (c *PodController) List(nodeName string) ([]*PodInfo, bool) { +func (c *PodController) List(nodeName string) ([]log.ObjectRef, bool) { m, ok := c.podsOnNode.Load(nodeName) if !ok { return nil, false } - return m.Values(), true + return m.Keys(), true } diff --git a/pkg/kwok/controllers/pod_controller_test.go b/pkg/kwok/controllers/pod_controller_test.go index c0ccfceac..953d2830a 100644 --- a/pkg/kwok/controllers/pod_controller_test.go +++ b/pkg/kwok/controllers/pod_controller_test.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" @@ -34,6 +35,7 @@ import ( "sigs.k8s.io/kwok/pkg/config" "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/informer" "sigs.k8s.io/kwok/pkg/utils/slices" "sigs.k8s.io/kwok/pkg/utils/wait" ) @@ -138,30 +140,31 @@ func TestPodController(t *testing.T) { }, ) - nodeGetFunc := func(nodeName string) (*NodeInfo, bool) { - node, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - return nil, false - } - + wantHostIPFunc := func(node corev1.Node) string { nodeIP := defaultNodeIP - podCIDR := defaultPodCIDR - for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { nodeIP = addr.Address break } } + return nodeIP + } + wantPodCIDRFunc := func(node corev1.Node) string { + podCIDR := defaultPodCIDR if node.Spec.PodCIDR != "" { podCIDR = node.Spec.PodCIDR } - - nodeInfo := &NodeInfo{ - HostIPs: []string{nodeIP}, - PodCIDRs: []string{podCIDR}, + return podCIDR + } + nodeGetFunc := func(nodeName string) (*NodeInfo, bool) { + _, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, false } + + nodeInfo := &NodeInfo{} return nodeInfo, true } @@ -177,10 +180,27 @@ func TestPodController(t *testing.T) { return iobj.(*internalversion.Stage), nil }) + ctx := context.Background() + ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + t.Cleanup(func() { + cancel() + time.Sleep(time.Second) + }) + + nodeCh := make(chan informer.Event[*corev1.Node], 1) + nodesCli := clientset.CoreV1().Nodes() + nodesInformer := informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli) + nodeCache, err := nodesInformer.WatchWithCache(ctx, informer.Option{}, nodeCh) + if err != nil { + t.Fatal(fmt.Errorf("failed to watch nodes: %w", err)) + } + lifecycle, _ := NewLifecycle(podStages) annotationSelector, _ := labels.Parse("fake=custom") pods, err := NewPodController(PodControllerConfig{ TypedClient: clientset, + NodeCacheGetter: nodeCache, NodeIP: defaultNodeIP, CIDR: defaultPodCIDR, DisregardStatusWithAnnotationSelector: annotationSelector.String(), @@ -193,15 +213,17 @@ func TestPodController(t *testing.T) { t.Fatal(fmt.Errorf("new pods controller error: %w", err)) } - ctx := context.Background() - ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - t.Cleanup(func() { - cancel() - time.Sleep(time.Second) - }) + podsCh := make(chan informer.Event[*corev1.Pod], 1) + podsCli := clientset.CoreV1().Pods(corev1.NamespaceAll) + podsInformer := informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli) + err = podsInformer.Watch(ctx, informer.Option{ + FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(), + }, podsCh) + if err != nil { + t.Fatal(fmt.Errorf("watch pods error: %w", err)) + } - err = pods.Start(ctx) + err = pods.Start(ctx, podsCh) if err != nil { t.Fatal(fmt.Errorf("start pods controller error: %w", err)) } @@ -211,24 +233,27 @@ func TestPodController(t *testing.T) { t.Fatal(fmt.Errorf("list nodes error: %w", err)) } for _, node := range listNodes.Items { - if nodeInfo, ok := nodeGetFunc(node.Name); ok { + if _, ok := nodeGetFunc(node.Name); ok { + wantPodCIRD := wantPodCIDRFunc(node) + wantHostIP := wantHostIPFunc(node) + if node.Spec.PodCIDR != "" { - if node.Spec.PodCIDR != nodeInfo.PodCIDRs[0] { - t.Fatal(fmt.Errorf("want node %s podCIDR=%s, got %s", node.Name, node.Spec.PodCIDR, nodeInfo.PodCIDRs[0])) + if node.Spec.PodCIDR != wantPodCIRD { + t.Fatal(fmt.Errorf("want node %s podCIDR=%s, got %s", node.Name, node.Spec.PodCIDR, wantPodCIRD)) } } else { - if defaultPodCIDR != nodeInfo.PodCIDRs[0] { - t.Fatal(fmt.Errorf("want node %s podCIDR=%s, got %s", node.Name, defaultPodCIDR, nodeInfo.PodCIDRs[0])) + if defaultPodCIDR != wantPodCIRD { + t.Fatal(fmt.Errorf("want node %s podCIDR=%s, got %s", node.Name, defaultPodCIDR, wantPodCIRD)) } } if len(node.Status.Addresses) != 0 { - if node.Status.Addresses[0].Address != nodeInfo.HostIPs[0] { - t.Fatal(fmt.Errorf("want node %s address=%s, got %s", node.Name, node.Status.Addresses[0].Address, nodeInfo.HostIPs[0])) + if node.Status.Addresses[0].Address != wantHostIP { + t.Fatal(fmt.Errorf("want node %s address=%s, got %s", node.Name, node.Status.Addresses[0].Address, wantHostIP)) } } else { - if defaultNodeIP != nodeInfo.HostIPs[0] { - t.Fatal(fmt.Errorf("want node %s address=%s, got %s", node.Name, defaultNodeIP, nodeInfo.HostIPs[0])) + if defaultNodeIP != wantHostIP { + t.Fatal(fmt.Errorf("want node %s address=%s, got %s", node.Name, defaultNodeIP, wantHostIP)) } } } @@ -254,27 +279,6 @@ func TestPodController(t *testing.T) { t.Fatal(fmt.Errorf("create pod1 error: %w", err)) } - pod1, err := clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) - if err != nil { - t.Fatal(fmt.Errorf("get pod1 error: %w", err)) - } - pod1.Annotations = map[string]string{ - "fake": "custom", - } - pod1.Status.Reason = "custom" - _, err = clientset.CoreV1().Pods("default").Update(ctx, pod1, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(fmt.Errorf("update pod1 error: %w", err)) - } - - pod1, err = clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) - if err != nil { - t.Fatal(fmt.Errorf("get pod1 error: %w", err)) - } - if pod1.Status.Reason != "custom" { - t.Fatal(fmt.Errorf("pod1 status reason not custom")) - } - var list *corev1.PodList err = wait.Poll(ctx, func(ctx context.Context) (done bool, err error) { list, err = clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{}) @@ -286,22 +290,30 @@ func TestPodController(t *testing.T) { } for index, pod := range list.Items { - if nodeInfo, ok := nodeGetFunc(pod.Spec.NodeName); ok { + if _, ok := nodeGetFunc(pod.Spec.NodeName); ok { if pod.Status.Phase != corev1.PodRunning { return false, fmt.Errorf("want pod %s phase is running, got %s", pod.Name, pod.Status.Phase) } + + node, err := clientset.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("get node %s error: %w", pod.Spec.NodeName, err) + } + if pods.need(&list.Items[index]) { - if pod.Status.HostIP != nodeInfo.HostIPs[0] { - return false, fmt.Errorf("want pod %s hostIP=%s, got %s", pod.Name, nodeInfo.HostIPs[0], pod.Status.HostIP) + wantHostIP := wantHostIPFunc(*node) + wantPodCIRD := wantPodCIDRFunc(*node) + if pod.Status.HostIP != wantHostIP { + return false, fmt.Errorf("want pod %s hostIP=%s, got %s", pod.Name, wantHostIP, pod.Status.HostIP) } if pod.Spec.HostNetwork { - if pod.Status.PodIP != nodeInfo.HostIPs[0] { - return false, fmt.Errorf("want pod %s podIP=%s, got %s", pod.Name, nodeInfo.HostIPs[0], pod.Status.PodIP) + if pod.Status.PodIP != wantHostIP { + return false, fmt.Errorf("want pod %s podIP=%s, got %s", pod.Name, wantHostIP, pod.Status.PodIP) } } else { - cidr, _ := parseCIDR(nodeInfo.PodCIDRs[0]) + cidr, _ := parseCIDR(wantPodCIRD) if !cidr.Contains(net.ParseIP(pod.Status.PodIP)) { - return false, fmt.Errorf("want pod %s podIP=%s in %s, got not", pod.Name, pod.Status.PodIP, nodeInfo.PodCIDRs[0]) + return false, fmt.Errorf("want pod %s podIP=%s in %s, got not", pod.Name, pod.Status.PodIP, wantPodCIRD) } } } @@ -310,11 +322,32 @@ func TestPodController(t *testing.T) { } } return true, nil - }, wait.WithContinueOnError(5)) + }, wait.WithContinueOnError(10)) if err != nil { t.Fatal(err) } + pod1, err := clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) + if err != nil { + t.Fatal(fmt.Errorf("get pod1 error: %w", err)) + } + pod1.Annotations = map[string]string{ + "fake": "custom", + } + pod1.Status.Reason = "custom" + _, err = clientset.CoreV1().Pods("default").Update(ctx, pod1, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(fmt.Errorf("update pod1 error: %w", err)) + } + + pod1, err = clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) + if err != nil { + t.Fatal(fmt.Errorf("get pod1 error: %w", err)) + } + if pod1.Status.Reason != "custom" { + t.Fatal(fmt.Errorf("pod1 status reason not custom")) + } + pod, ok := slices.Find(list.Items, func(pod corev1.Pod) bool { return pod.Name == "pod0" }) diff --git a/pkg/kwok/controllers/utils.go b/pkg/kwok/controllers/utils.go index 7188d6291..ce3c797f1 100644 --- a/pkg/kwok/controllers/utils.go +++ b/pkg/kwok/controllers/utils.go @@ -20,10 +20,8 @@ import ( "net" "sync" - "github.com/wzshiming/cron" "k8s.io/apimachinery/pkg/labels" - "sigs.k8s.io/kwok/pkg/utils/maps" utilsnet "sigs.k8s.io/kwok/pkg/utils/net" ) @@ -110,14 +108,8 @@ func labelsParse(selector string) (labels.Selector, error) { return labels.Parse(selector) } -type jobInfoMap = maps.SyncMap[string, jobInfo] - -type jobInfo struct { - ResourceVersion string - Cancel cron.DoFunc -} - type resourceStageJob[T any] struct { Resource T Stage *LifecycleStage + Key string } diff --git a/pkg/kwok/metrics/metrics.go b/pkg/kwok/metrics/metrics.go index 572ee4963..d5f56529f 100644 --- a/pkg/kwok/metrics/metrics.go +++ b/pkg/kwok/metrics/metrics.go @@ -25,18 +25,22 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/kwok/pkg/apis/internalversion" "sigs.k8s.io/kwok/pkg/kwok/controllers" "sigs.k8s.io/kwok/pkg/kwok/metrics/cel" "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/informer" "sigs.k8s.io/kwok/pkg/utils/maps" ) // UpdateHandler handles updating metrics on request type UpdateHandler struct { - controller *controllers.Controller - environment *cel.Environment + controller *controllers.Controller + environment *cel.Environment + nodeCacheGetter informer.Getter[*corev1.Node] + podCacheGetter informer.Getter[*corev1.Pod] handler http.Handler registry *prometheus.Registry @@ -60,20 +64,17 @@ func NewMetricsUpdateHandler(conf UpdateHandlerConfig) *UpdateHandler { ) h := &UpdateHandler{ - controller: conf.Controller, - environment: conf.Environment, - registry: registry, - handler: handler, + controller: conf.Controller, + environment: conf.Environment, + nodeCacheGetter: conf.Controller.GetNodeCache(), + podCacheGetter: conf.Controller.GetPodCache(), + registry: registry, + handler: handler, } return h } -func (h *UpdateHandler) getNodeInfo(nodeName string) (*controllers.NodeInfo, bool) { - nodeInfo, ok := h.controller.GetNode(nodeName) - return nodeInfo, ok -} - -func (h *UpdateHandler) getPodsInfo(nodeName string) ([]*controllers.PodInfo, bool) { +func (h *UpdateHandler) getPodsInfo(nodeName string) ([]log.ObjectRef, bool) { podsInfo, ok := h.controller.ListPods(nodeName) return podsInfo, ok } @@ -170,14 +171,15 @@ func (h *UpdateHandler) updateGauge(ctx context.Context, metricConfig *internalv return nil, fmt.Errorf("failed to compile metric value %s: %w", metricConfig.Value, err) } - nodeInfo, ok := h.getNodeInfo(nodeName) + logger := log.FromContext(ctx).With("node", nodeName) + + node, ok := h.nodeCacheGetter.Get(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("node not found", "node", nodeName) + logger.Warn("node not found") return nil, nil } data := cel.Data{ - Node: nodeInfo.Node, + Node: node, } switch metricConfig.Dimension { @@ -196,14 +198,18 @@ func (h *UpdateHandler) updateGauge(ctx context.Context, metricConfig *internalv case internalversion.DimensionPod: pods, ok := h.getPodsInfo(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("pods not found", "node", nodeName) + logger.Warn("pods not found") return nil, nil } keys := make([]string, 0, len(pods)) - for _, pod := range pods { - data.Pod = pod.Pod + for _, podInfo := range pods { + pod, ok := h.podCacheGetter.GetWithNamespace(podInfo.Name, podInfo.Namespace) + if !ok { + logger.Warn("pod not found", "pod", podInfo) + continue + } + data.Pod = pod gauge, key, err := h.getOrRegisterGauge(metricConfig, data) if err != nil { return nil, err @@ -220,15 +226,19 @@ func (h *UpdateHandler) updateGauge(ctx context.Context, metricConfig *internalv case internalversion.DimensionContainer: pods, ok := h.getPodsInfo(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("pods not found", "node", nodeName) + logger.Warn("pods not found") return nil, nil } keys := make([]string, 0, len(pods)) - for _, pod := range pods { - data.Pod = pod.Pod - for _, container := range pod.Pod.Spec.Containers { + for _, podInfo := range pods { + pod, ok := h.podCacheGetter.GetWithNamespace(podInfo.Name, podInfo.Namespace) + if !ok { + logger.Warn("pod not found", "pod", podInfo) + continue + } + data.Pod = pod + for _, container := range pod.Spec.Containers { container := container data.Container = &container gauge, key, err := h.getOrRegisterGauge(metricConfig, data) @@ -255,15 +265,17 @@ func (h *UpdateHandler) updateCounter(ctx context.Context, metricConfig *interna return nil, fmt.Errorf("failed to compile metric value %s: %w", metricConfig.Value, err) } - nodeInfo, ok := h.getNodeInfo(nodeName) + logger := log.FromContext(ctx).With("node", nodeName) + + node, ok := h.nodeCacheGetter.Get(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("node not found", "node", nodeName) + logger.Warn("node not found") return nil, nil } data := cel.Data{ - Node: nodeInfo.Node, + Node: node, } + switch metricConfig.Dimension { case internalversion.DimensionNode: counter, key, err := h.getOrRegisterCounter(metricConfig, data) @@ -280,14 +292,18 @@ func (h *UpdateHandler) updateCounter(ctx context.Context, metricConfig *interna case internalversion.DimensionPod: pods, ok := h.getPodsInfo(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("pods not found", "node", nodeName) + logger.Warn("pods not found") return nil, nil } keys := make([]string, 0, len(pods)) - for _, pod := range pods { - data.Pod = pod.Pod + for _, podInfo := range pods { + pod, ok := h.podCacheGetter.GetWithNamespace(podInfo.Name, podInfo.Namespace) + if !ok { + logger.Warn("pod not found", "pod", podInfo) + continue + } + data.Pod = pod counter, key, err := h.getOrRegisterCounter(metricConfig, data) if err != nil { return nil, err @@ -304,15 +320,19 @@ func (h *UpdateHandler) updateCounter(ctx context.Context, metricConfig *interna case internalversion.DimensionContainer: pods, ok := h.getPodsInfo(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("pods not found", "node", nodeName) + logger.Warn("pods not found") return nil, nil } keys := make([]string, 0, len(pods)) - for _, pod := range pods { - data.Pod = pod.Pod - for _, container := range pod.Pod.Spec.Containers { + for _, podInfo := range pods { + pod, ok := h.podCacheGetter.GetWithNamespace(podInfo.Name, podInfo.Namespace) + if !ok { + logger.Warn("pod not found", "pod", podInfo) + continue + } + data.Pod = pod + for _, container := range pod.Spec.Containers { container := container data.Container = &container counter, key, err := h.getOrRegisterCounter(metricConfig, data) @@ -334,15 +354,17 @@ func (h *UpdateHandler) updateCounter(ctx context.Context, metricConfig *interna } func (h *UpdateHandler) updateHistogram(ctx context.Context, metricConfig *internalversion.MetricConfig, nodeName string) ([]string, error) { - nodeInfo, ok := h.getNodeInfo(nodeName) + logger := log.FromContext(ctx).With("node", nodeName) + + node, ok := h.nodeCacheGetter.Get(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("node not found", "node", nodeName) + logger.Warn("node not found") return nil, nil } data := cel.Data{ - Node: nodeInfo.Node, + Node: node, } + switch metricConfig.Dimension { case internalversion.DimensionNode: histogram, key, err := h.getOrRegisterHistogram(metricConfig, data) @@ -365,14 +387,18 @@ func (h *UpdateHandler) updateHistogram(ctx context.Context, metricConfig *inter case internalversion.DimensionPod: pods, ok := h.getPodsInfo(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("pods not found", "node", nodeName) + logger.Warn("pods not found") return nil, nil } keys := make([]string, 0, len(pods)) - for _, pod := range pods { - data.Pod = pod.Pod + for _, podInfo := range pods { + pod, ok := h.podCacheGetter.GetWithNamespace(podInfo.Name, podInfo.Namespace) + if !ok { + logger.Warn("pod not found", "pod", podInfo) + continue + } + data.Pod = pod histogram, key, err := h.getOrRegisterHistogram(metricConfig, data) if err != nil { return nil, err @@ -395,15 +421,19 @@ func (h *UpdateHandler) updateHistogram(ctx context.Context, metricConfig *inter case internalversion.DimensionContainer: pods, ok := h.getPodsInfo(nodeName) if !ok { - logger := log.FromContext(ctx) - logger.Warn("pods not found", "node", nodeName) + logger.Warn("pods not found") return nil, nil } keys := make([]string, 0, len(pods)) - for _, pod := range pods { - data.Pod = pod.Pod - for _, container := range pod.Pod.Spec.Containers { + for _, podInfo := range pods { + pod, ok := h.podCacheGetter.GetWithNamespace(podInfo.Name, podInfo.Namespace) + if !ok { + logger.Warn("pod not found", "pod", podInfo) + continue + } + data.Pod = pod + for _, container := range pod.Spec.Containers { container := container data.Container = &container histogram, key, err := h.getOrRegisterHistogram(metricConfig, data) @@ -495,7 +525,7 @@ func uniqueKey(name string, kind internalversion.Kind, labels map[string]string) func (h *UpdateHandler) Update(ctx context.Context, nodeName string, metrics []internalversion.MetricConfig) { logger := log.FromContext(ctx) has := map[string]struct{}{} - // Update metrics + // Sync metrics h.environment.ClearResultCache() for _, metric := range metrics { metric := metric diff --git a/pkg/kwok/server/metrics.go b/pkg/kwok/server/metrics.go index c316ddeb8..9c2ade345 100644 --- a/pkg/kwok/server/metrics.go +++ b/pkg/kwok/server/metrics.go @@ -44,7 +44,7 @@ func (s *Server) InstallMetrics(ctx context.Context) error { EnableEvaluatorCache: true, EnableResultCache: true, StartedContainersTotal: func(nodeName string) int64 { - nodeInfo, ok := controller.GetNode(nodeName) + nodeInfo, ok := controller.GetNodeInfo(nodeName) if !ok { return 0 } diff --git a/pkg/kwok/server/service_discovery.go b/pkg/kwok/server/service_discovery.go index 3a7794ab9..a76d647bf 100644 --- a/pkg/kwok/server/service_discovery.go +++ b/pkg/kwok/server/service_discovery.go @@ -20,8 +20,6 @@ import ( "encoding/json" "net/http" "strings" - - "sigs.k8s.io/kwok/pkg/kwok/controllers" ) // InstallServiceDiscovery installs the service discovery handler. @@ -39,7 +37,7 @@ func (s *Server) prometheusDiscovery(rw http.ResponseWriter, req *http.Request) hosts := []string{req.Host} - var listNode []*controllers.NodeInfo + var listNode []string metrics := s.metrics.Get() for _, m := range metrics { @@ -47,13 +45,13 @@ func (s *Server) prometheusDiscovery(rw http.ResponseWriter, req *http.Request) if listNode == nil { listNode = s.controller.ListNodes() } - for _, node := range listNode { + for _, nodeName := range listNode { targets = append(targets, prometheusStaticConfig{ Targets: hosts, Labels: map[string]string{ "metrics_name": m.Name, "__scheme__": scheme, - "__metrics_path__": strings.ReplaceAll(m.Spec.Path, "{nodeName}", node.Node.Name), + "__metrics_path__": strings.ReplaceAll(m.Spec.Path, "{nodeName}", nodeName), }, }) } diff --git a/pkg/utils/informer/doc.go b/pkg/utils/informer/doc.go new file mode 100644 index 000000000..707e5fdf5 --- /dev/null +++ b/pkg/utils/informer/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package informer provides a generic mechanism for listening for changes to resources. +package informer diff --git a/pkg/utils/informer/event.go b/pkg/utils/informer/event.go new file mode 100644 index 000000000..6a14a9fc0 --- /dev/null +++ b/pkg/utils/informer/event.go @@ -0,0 +1,106 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informer + +import ( + "context" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// EventType defines the possible types of events. +type EventType string + +// Event types. +const ( + Added EventType = "ADDED" + Modified EventType = "MODIFIED" + Deleted EventType = "DELETED" + Sync EventType = "SYNC" +) + +// Event represents a single event to a watched resource. +type Event[T runtime.Object] struct { + // Type is Added, Modified, Deleted, or Sync. + Type EventType + + // Object is: + // * If Type is Added, Modified or Sync: the new state of the object. + // * If Type is Deleted: the state of the object immediately before deletion. + Object T +} + +// Watcher is an interface for objects that know how to watch resources. +type Watcher[T runtime.Object, L runtime.Object] interface { + // List returns an object containing a list of the resources matching the provided options. + List(ctx context.Context, opts metav1.ListOptions) (L, error) + // Watch returns an object that watches the resources matching the provided options. + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) +} + +// Option is used to filter events. +type Option struct { + LabelSelector string + FieldSelector string + AnnotationSelector string + annotationSelector labels.Selector +} + +func (o *Option) setup(opts *metav1.ListOptions) { + if o.LabelSelector != "" { + opts.LabelSelector = o.LabelSelector + } + if o.FieldSelector != "" { + opts.FieldSelector = o.FieldSelector + } +} + +func (o *Option) toListOptions() metav1.ListOptions { + opts := metav1.ListOptions{} + o.setup(&opts) + return opts +} + +func (o *Option) filter(obj any) (bool, error) { + if o.AnnotationSelector == "" { + return true, nil + } + + if o.annotationSelector == nil { + var err error + o.annotationSelector, err = labels.Parse(o.AnnotationSelector) + if err != nil { + return false, err + } + } + + accessor, err := meta.Accessor(obj) + if err != nil { + return false, err + } + + annotations := accessor.GetAnnotations() + if len(annotations) == 0 { + return false, nil + } + + return o.annotationSelector.Matches(labels.Set(annotations)), nil +} diff --git a/pkg/utils/informer/informer.go b/pkg/utils/informer/informer.go new file mode 100644 index 000000000..afd76c434 --- /dev/null +++ b/pkg/utils/informer/informer.go @@ -0,0 +1,231 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informer + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/pager" + + "sigs.k8s.io/kwok/pkg/log" +) + +// Informer is a wrapper around a Get/List/Watch function. +type Informer[T runtime.Object, L runtime.Object] struct { + ListFunc func(ctx context.Context, opts metav1.ListOptions) (L, error) + WatchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) +} + +// NewInformer returns a new Informer. +func NewInformer[T runtime.Object, L runtime.Object](lw Watcher[T, L]) *Informer[T, L] { + return &Informer[T, L]{ + ListFunc: lw.List, + WatchFunc: lw.Watch, + } +} + +// Sync sends a sync event for each resource returned by the ListFunc. +func (i *Informer[T, L]) Sync(ctx context.Context, opt Option, events chan<- Event[T]) error { + listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return i.ListFunc(ctx, opts) + }) + + err := listPager.EachListItem(ctx, opt.toListOptions(), func(obj runtime.Object) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + events <- Event[T]{Type: Sync, Object: obj.(T)} + return nil + }) + if err != nil { + return err + } + return nil +} + +// WatchWithCache starts a goroutine that watches the resource and sends events to the events channel. +func (i *Informer[T, L]) WatchWithCache(ctx context.Context, opt Option, events chan<- Event[T]) (Getter[T], error) { + var t T + logger := log.FromContext(ctx) + store, contrtoller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opt.setup(&opts) + return i.ListFunc(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opt.setup(&opts) + return i.WatchFunc(ctx, opts) + }, + }, + t, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if ok, err := opt.filter(obj); err != nil { + logger.Error("filtering object", err) + return + } else if !ok { + return + } + events <- Event[T]{Type: Added, Object: obj.(T)} + }, + UpdateFunc: func(oldObj, newObj any) { + if ok, err := opt.filter(newObj); err != nil { + logger.Error("filtering object", err) + return + } else if !ok { + return + } + events <- Event[T]{Type: Modified, Object: newObj.(T)} + }, + DeleteFunc: func(obj any) { + if ok, err := opt.filter(obj); err != nil { + logger.Error("filtering object", err) + return + } else if !ok { + return + } + events <- Event[T]{Type: Deleted, Object: obj.(T)} + }, + }, + ) + + go contrtoller.Run(ctx.Done()) + + g := &getter[T]{store: store} + return g, nil +} + +// Watch starts a goroutine that watches the resource and sends events to the events channel. +func (i *Informer[T, L]) Watch(ctx context.Context, opt Option, events chan<- Event[T]) error { + var t T + informer := cache.NewReflectorWithOptions( + &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opt.setup(&opts) + return i.ListFunc(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opt.setup(&opts) + return i.WatchFunc(ctx, opts) + }, + }, + t, + dummyCache(events, opt), + cache.ReflectorOptions{}, + ) + go informer.Run(ctx.Done()) + return nil +} + +func dummyCache[T runtime.Object](ch chan<- Event[T], opt Option) cache.Store { + return &cache.FakeCustomStore{ + AddFunc: func(obj any) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + ch <- Event[T]{Type: Added, Object: obj.(T)} + return nil + }, + UpdateFunc: func(obj any) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + ch <- Event[T]{Type: Modified, Object: obj.(T)} + return nil + }, + DeleteFunc: func(obj any) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + ch <- Event[T]{Type: Deleted, Object: obj.(T)} + return nil + }, + ReplaceFunc: func(list []any, resourceVersion string) error { + for _, obj := range list { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + continue + } + ch <- Event[T]{Type: Sync, Object: obj.(T)} + } + return nil + }, + ListFunc: func() []any { + panic("unreachable") + }, + ListKeysFunc: func() []string { + panic("unreachable") + }, + GetFunc: func(obj any) (item any, exists bool, err error) { + panic("unreachable") + }, + GetByKeyFunc: func(key string) (item any, exists bool, err error) { + panic("unreachable") + }, + ResyncFunc: func() error { + return nil + }, + } +} + +// Getter is a wrapper around a cache.Store that provides Get and List methods. +type Getter[T runtime.Object] interface { + Get(name string) (T, bool) + GetWithNamespace(name, namespace string) (T, bool) + List() []T +} + +type getter[T runtime.Object] struct { + store cache.Store +} + +func (g *getter[T]) Get(name string) (t T, exists bool) { + obj, exists, err := g.store.GetByKey(name) + if err != nil { + return t, false + } + if !exists { + return t, false + } + return obj.(T), true +} + +func (g *getter[T]) GetWithNamespace(name, namespace string) (t T, exists bool) { + return g.Get(namespace + "/" + name) +} + +func (g *getter[T]) List() (list []T) { + for _, obj := range g.store.List() { + list = append(list, obj.(T)) + } + return list +} diff --git a/pkg/utils/maps/sync.go b/pkg/utils/maps/sync.go index bddc9c326..c6f753ceb 100644 --- a/pkg/utils/maps/sync.go +++ b/pkg/utils/maps/sync.go @@ -108,3 +108,13 @@ func (m *SyncMap[K, V]) Values() []V { }) return values } + +// IsEmpty returns true if the map is empty. +func (m *SyncMap[K, V]) IsEmpty() bool { + empty := true + m.m.Range(func(key, value interface{}) bool { + empty = false + return false + }) + return empty +} diff --git a/pkg/utils/queue/delaying_queue.go b/pkg/utils/queue/delaying_queue.go new file mode 100644 index 000000000..12f850587 --- /dev/null +++ b/pkg/utils/queue/delaying_queue.go @@ -0,0 +1,186 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "container/heap" + "sync" + "time" +) + +// Clock is an interface that returns the current time and duration since a given time. +type Clock interface { + Now() time.Time + After(d time.Duration) <-chan time.Time + Sleep(d time.Duration) +} + +// DelayingQueue is a generic queue interface that supports adding items after +type DelayingQueue[T comparable] interface { + Queue[T] + // AddAfter adds an item to the queue after the indicated duration has passed + AddAfter(item T, duration time.Duration) bool + // Cancel removes an item from the queue if it has not yet been processed + Cancel(item T) bool +} + +// delayingQueue is a generic DelayingQueue implementation. +type delayingQueue[T comparable] struct { + Queue[T] + + clock Clock + + mut sync.Mutex + + heap waitEntries[T] + entries map[T]*waitEntry[T] + + signal chan struct{} +} + +// NewDelayingQueue returns a new DelayingQueue. +func NewDelayingQueue[T comparable](clock Clock) DelayingQueue[T] { + q := &delayingQueue[T]{ + Queue: NewQueue[T](), + clock: clock, + heap: waitEntries[T]{}, + entries: make(map[T]*waitEntry[T]), + signal: make(chan struct{}, 1), + } + go q.loopWorker() + return q +} + +func (q *delayingQueue[T]) AddAfter(item T, duration time.Duration) bool { + if duration <= 0 { + q.Queue.Add(item) + return true + } + + q.mut.Lock() + defer q.mut.Unlock() + + _, ok := q.entries[item] + if ok { + return false + } + + entry := &waitEntry[T]{data: item, readyAt: q.clock.Now().Add(duration)} + heap.Push(&q.heap, entry) + q.entries[item] = entry + + select { + case q.signal <- struct{}{}: + default: + } + return true +} + +func (q *delayingQueue[T]) loopWorker() { + for { + t, ok, next := q.next() + if ok { + q.Queue.Add(t) + continue + } + + delay := 10 * time.Second + if next != nil && *next < delay { + delay = *next + } + select { + case <-q.clock.After(delay): + case <-q.signal: + } + } +} + +func (q *delayingQueue[T]) next() (t T, ok bool, wait *time.Duration) { + q.mut.Lock() + defer q.mut.Unlock() + + if len(q.heap) == 0 { + return t, false, nil + } + entry := q.heap[0] + waitDuration := entry.readyAt.Sub(q.clock.Now()) + if waitDuration > 0 { + return t, false, &waitDuration + } + + entry = heap.Pop(&q.heap).(*waitEntry[T]) + delete(q.entries, entry.data) + return entry.data, true, nil +} + +func (q *delayingQueue[T]) Cancel(item T) bool { + q.mut.Lock() + defer q.mut.Unlock() + return q.cancel(item) +} + +func (q *delayingQueue[T]) cancel(item T) bool { + entry, ok := q.entries[item] + if !ok { + return false + } + + heap.Remove(&q.heap, entry.index) + delete(q.entries, item) + return true +} + +// waitEntry is an entry in the delayingQueue heap. +type waitEntry[T any] struct { + data T + readyAt time.Time + // index in the heap + index int +} + +type waitEntries[T any] []*waitEntry[T] + +func (w waitEntries[T]) Len() int { + return len(w) +} + +func (w waitEntries[T]) Less(i, j int) bool { + return w[i].readyAt.Before(w[j].readyAt) +} + +func (w waitEntries[T]) Swap(i, j int) { + w[i], w[j] = w[j], w[i] + w[i].index = i + w[j].index = j +} + +// Push adds an item to the queue, should not be called directly instead use `heap.Push`. +func (w *waitEntries[T]) Push(x any) { + n := len(*w) + item := x.(*waitEntry[T]) + item.index = n + *w = append(*w, item) +} + +// Pop removes an item from the queue, should not be called directly instead use `heap.Pop`. +func (w *waitEntries[T]) Pop() any { + n := len(*w) + item := (*w)[n-1] + item.index = -1 + *w = (*w)[0:(n - 1)] + return item +} diff --git a/pkg/utils/queue/doc.go b/pkg/utils/queue/doc.go new file mode 100644 index 000000000..d03642d91 --- /dev/null +++ b/pkg/utils/queue/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package queue provides a generic queue implementation. +package queue diff --git a/pkg/utils/queue/queue.go b/pkg/utils/queue/queue.go new file mode 100644 index 000000000..853df6cd5 --- /dev/null +++ b/pkg/utils/queue/queue.go @@ -0,0 +1,98 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "container/list" + "sync" +) + +// Queue is a generic queue interface. +type Queue[T any] interface { + // Add adds an item to the queue. + Add(item T) + // Get returns an item from the queue. + Get() (T, bool) + // GetOrWait returns an item from the queue or waits until an item is added. + GetOrWait() T + // Len returns the number of items in the queue. + Len() int +} + +// queue is a generic Queue implementation. +type queue[T any] struct { + base *list.List + + signal chan struct{} + mut sync.Mutex +} + +// NewQueue returns a new Queue. +func NewQueue[T any]() Queue[T] { + return &queue[T]{ + base: list.New(), + signal: make(chan struct{}, 1), + } +} + +func (q *queue[T]) Add(item T) { + q.mut.Lock() + defer q.mut.Unlock() + + q.base.PushBack(item) + + // Signal that an item was added. + if len(q.signal) == 0 { + select { + case q.signal <- struct{}{}: + default: + } + } +} + +func (q *queue[T]) Get() (t T, ok bool) { + q.mut.Lock() + defer q.mut.Unlock() + item := q.base.Front() + if item == nil { + return t, false + } + q.base.Remove(item) + return item.Value.(T), true +} + +func (q *queue[T]) GetOrWait() T { + t, ok := q.Get() + if ok { + return t + } + + // Wait for an item to be added. + for range q.signal { + t, ok = q.Get() + if ok { + return t + } + } + panic("unreachable") +} + +func (q *queue[T]) Len() int { + q.mut.Lock() + defer q.mut.Unlock() + return q.base.Len() +} diff --git a/test/kwokctl/kwokctl_benchmark_test.sh b/test/kwokctl/kwokctl_benchmark_test.sh index 7f1801d4c..0a4fb1604 100755 --- a/test/kwokctl/kwokctl_benchmark_test.sh +++ b/test/kwokctl/kwokctl_benchmark_test.sh @@ -40,6 +40,7 @@ function wait_resource() { local resource="${2}" local reason="${3}" local want="${4}" + local gap="${5}" local raw local got local all @@ -52,6 +53,10 @@ function wait_resource() { else all=$(echo "${raw}" | wc -l) echo "${resource} ${got}/${all} => ${want}" + if [[ "${gap}" != "" && "${got}" -ne 0 && "$((all - got))" -gt "${gap}" ]]; then + echo "Error ${resource} gap too large, actual: $((all - got)), expected: ${got}" + return 1 + fi fi sleep 1 done @@ -63,7 +68,7 @@ function scale_create_pod() { local node_name node_name="$(kwokctl --name "${name}" kubectl get node -o jsonpath='{.items.*.metadata.name}' | tr ' ' '\n' | grep fake- | head -n 1)" kwokctl --name "${name}" scale pod fake-pod --replicas "${size}" --param ".nodeName=\"${node_name}\"" >/dev/null & - wait_resource "${name}" Pod Running "${size}" + wait_resource "${name}" Pod Running "${size}" 10 } function scale_delete_pod() { @@ -77,7 +82,7 @@ function scale_create_node() { local name="${1}" local size="${2}" kwokctl --name "${name}" scale node fake-node --replicas "${size}" >/dev/null & - wait_resource "${name}" Node Ready "${size}" + wait_resource "${name}" Node Ready "${size}" 10 } function main() { @@ -90,9 +95,9 @@ function main() { name="benchmark-${KWOK_RUNTIME}" create_cluster "${name}" "${release}" --disable-qps-limits - child_timeout 120 scale_create_node "${name}" 1000 || failed+=("scale_create_node_timeout_${name}") - child_timeout 120 scale_create_pod "${name}" 1000 || failed+=("scale_create_pod_timeout_${name}") - child_timeout 120 scale_delete_pod "${name}" 0 || failed+=("scale_delete_pod_timeout_${name}") + child_timeout 60 scale_create_node "${name}" 1000 || failed+=("scale_create_node_timeout_${name}") + child_timeout 30 scale_create_pod "${name}" 1000 || failed+=("scale_create_pod_timeout_${name}") + child_timeout 30 scale_delete_pod "${name}" 0 || failed+=("scale_delete_pod_timeout_${name}") delete_cluster "${name}" if [[ "${#failed[@]}" -ne 0 ]]; then diff --git a/test/kwokctl/suite.sh b/test/kwokctl/suite.sh index e54fd16e4..a63cf4768 100644 --- a/test/kwokctl/suite.sh +++ b/test/kwokctl/suite.sh @@ -67,6 +67,11 @@ function child_timeout() { ((start++)) sleep 1 done + + if ! wait "${wp}"; then + echo "Error: Child process failed" >&2 + return 1 + fi echo "Took ${start}s" >&2 }