Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the events #758

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/wzshiming/cmux v0.3.2
github.com/wzshiming/cron v0.2.1
github.com/wzshiming/ctc v1.2.3
github.com/wzshiming/easycel v0.4.0
go.uber.org/atomic v1.11.0
Expand Down Expand Up @@ -95,7 +94,6 @@ require (
github.com/rivo/uniseg v0.4.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/wzshiming/llrb v0.2.1 // indirect
github.com/wzshiming/trie v0.1.1 // indirect
github.com/wzshiming/winseq v0.0.0-20200112104235-db357dc107ae // indirect
github.com/xlab/treeprint v1.1.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/wzshiming/cmux v0.3.2 h1:lBEWbfbRqUDdXB6Mro/g35kvCuUEmAgIdpGEuER3bis=
github.com/wzshiming/cmux v0.3.2/go.mod h1:lPhqJN2E3frzkxrPdjesxL09z7nTcuZ6i8Is+2G/Xw4=
github.com/wzshiming/cron v0.2.1 h1:jtJlyGG4Od+xE0A36DtqTeyLhGssE95dsghqLdemXxI=
github.com/wzshiming/cron v0.2.1/go.mod h1:qXPkSl0AQLmjchtcqv7L6X8FSOLqy8hXYIXv9J1cKO4=
github.com/wzshiming/ctc v1.2.3 h1:q+hW3IQNsjIlOFBTGZZZeIXTElFM4grF4spW/errh/c=
github.com/wzshiming/ctc v1.2.3/go.mod h1:2tVAtIY7SUyraSk0JxvwmONNPFL4ARavPuEsg5+KA28=
github.com/wzshiming/easycel v0.4.0 h1:flPpJYS2bXQj1Jgk2yOp5oB6FvflvPM0VCHXx/qOJDk=
github.com/wzshiming/easycel v0.4.0/go.mod h1:qg3oAkmPOLJEUFlFsxBxYbp0cXHgbq2sZEHVq7SmUp8=
github.com/wzshiming/llrb v0.2.1 h1:+T/PE0lrrB2LE8487fYcRlkZct5E9L/Gi8mf6mUtEK8=
github.com/wzshiming/llrb v0.2.1/go.mod h1:xlAM5hpCBIT3dTy/6CbNPxHMR1JfIMSyk2lV68wz3Yw=
github.com/wzshiming/trie v0.1.1 h1:02AaBSZGhs6Aqljp8fz4xq/Mg8omFBPIlrUS0pJ11ks=
github.com/wzshiming/trie v0.1.1/go.mod h1:c9thxXTh4KcGkejt4sUsO4c5GUmWpxeWzOJ7AZJaI+8=
github.com/wzshiming/winseq v0.0.0-20200112104235-db357dc107ae h1:tpXvBXC3hpQBDCc9OojJZCQMVRAbT3TTdUMP8WguXkY=
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,14 @@ func runE(ctx context.Context, flags *flagpole) error {
}
ctx = log.NewContext(ctx, logger.With("id", id))

enableMetrics := len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind)
ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
TypedClient: typedClient,
TypedKwokClient: typedKwokClient,
EnableCNI: flags.Options.EnableCNI,
EnableMetrics: len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind),
EnableMetrics: enableMetrics,
EnablePodCache: enableMetrics,
ManageAllNodes: flags.Options.ManageAllNodes,
ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector,
Expand Down
170 changes: 122 additions & 48 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"strings"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -43,6 +44,8 @@ import (
"sigs.k8s.io/kwok/pkg/consts"
"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/gotpl"
"sigs.k8s.io/kwok/pkg/utils/informer"
"sigs.k8s.io/kwok/pkg/utils/queue"
"sigs.k8s.io/kwok/pkg/utils/slices"
)

Expand Down Expand Up @@ -86,6 +89,8 @@ var (
return consts.Version
},
}

nodeKind = corev1.SchemeGroupVersion.WithKind("Node")
)

// Controller is a fake kubelet implementation that can be used to test
Expand All @@ -96,6 +101,9 @@ type Controller struct {
nodeLeases *NodeLeaseController
broadcaster record.EventBroadcaster
typedClient kubernetes.Interface

nodeCacheGetter informer.Getter[*corev1.Node]
podCacheGetter informer.Getter[*corev1.Pod]
}

// Config is the configuration for the controller
Expand All @@ -121,6 +129,7 @@ type Config struct {
NodeLeaseParallelism uint
ID string
EnableMetrics bool
EnablePodCache bool
}

// NewController creates a new fake kubelet controller
Expand Down Expand Up @@ -155,49 +164,79 @@ func (c *Controller) Start(ctx context.Context) error {
recorder := c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})

var (
err error
nodeLeases *NodeLeaseController
getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference
nodeLeasesChan chan informer.Event[*coordinationv1.Lease]
onLeaseNodeManageFunc func(nodeName string)
onNodeManagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
nodeSelectorFunc func(node *corev1.Node) bool
)
switch {
case conf.ManageAllNodes:
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}
case conf.ManageNodesWithAnnotationSelector != "":
selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector)
if err != nil {
return err
}
nodeSelectorFunc = func(node *corev1.Node) bool {
return selector.Matches(labels.Set(node.Annotations))
}
case conf.ManageNodesWithLabelSelector != "":
// client-go supports label filtering, so return true is ok.
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}

nodeChan := make(chan informer.Event[*corev1.Node], 1)
nodesCli := conf.TypedClient.CoreV1().Nodes()
nodesInformer := informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
nodesCache, err := nodesInformer.WatchWithCache(ctx, informer.Option{
LabelSelector: conf.ManageNodesWithLabelSelector,
AnnotationSelector: conf.ManageNodesWithAnnotationSelector,
}, nodeChan)
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}

podsChan := make(chan informer.Event[*corev1.Pod], 1)
podsCli := conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
podsInformer := informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli)

podWatchOption := informer.Option{
FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(),
}

var podsCache informer.Getter[*corev1.Pod]
if conf.EnablePodCache {
podsCache, err = podsInformer.WatchWithCache(ctx, podWatchOption, podsChan)
} else {
err = podsInformer.Watch(ctx, podWatchOption, podsChan)
}
if err != nil {
return fmt.Errorf("failed to watch pods: %w", err)
}

if conf.NodeLeaseDurationSeconds != 0 {
nodeLeasesChan = make(chan informer.Event[*coordinationv1.Lease], 1)
nodeLeasesCli := conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
nodeLeasesInformer := informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
err = nodeLeasesInformer.Watch(ctx, informer.Option{}, nodeLeasesChan)
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}

leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second
// https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200
renewInterval := leaseDuration / 4
// https://github.com/kubernetes/component-helpers/blob/d17b6f1e84500ee7062a26f5327dc73cb3e9374a/apimachinery/lease/controller.go#L100
renewIntervalJitter := 0.04
l, err := NewNodeLeaseController(NodeLeaseControllerConfig{
nodeLeases, err = NewNodeLeaseController(NodeLeaseControllerConfig{
Clock: conf.Clock,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
LeaseDurationSeconds: conf.NodeLeaseDurationSeconds,
LeaseParallelism: conf.NodeLeaseParallelism,
RenewInterval: renewInterval,
RenewIntervalJitter: renewIntervalJitter,
LeaseNamespace: corev1.NamespaceNodeLease,
MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference {
return getNodeOwnerFunc(nodeName)
node, ok := nodesCache.Get(nodeName)
if !ok {
return nil
}
ownerReferences := []metav1.OwnerReference{
{
APIVersion: nodeKind.Version,
Kind: nodeKind.Kind,
Name: node.Name,
UID: node.UID,
},
}
return ownerReferences
}),
HolderIdentity: conf.ID,
OnNodeManagedFunc: func(nodeName string) {
Expand All @@ -207,7 +246,6 @@ func (c *Controller) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to create node leases controller: %w", err)
}
nodeLeases = l

// Not holding the lease means the node is not managed
readOnlyFunc = func(nodeName string) bool {
Expand Down Expand Up @@ -292,13 +330,12 @@ func (c *Controller) Start(ctx context.Context) error {
nodes, err := NewNodeController(NodeControllerConfig{
Clock: conf.Clock,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
NodeIP: conf.NodeIP,
NodeName: conf.NodeName,
NodePort: conf.NodePort,
DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector,
NodeSelectorFunc: nodeSelectorFunc,
OnNodeManagedFunc: func(nodeName string) {
onNodeManagedFunc(nodeName)
},
Expand All @@ -317,13 +354,13 @@ func (c *Controller) Start(ctx context.Context) error {
Clock: conf.Clock,
EnableCNI: conf.EnableCNI,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
NodeIP: conf.NodeIP,
CIDR: conf.CIDR,
DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
Lifecycle: podLifecycleGetter,
PlayStageParallelism: conf.PodPlayStageParallelism,
Namespace: corev1.NamespaceAll,
NodeGetFunc: nodes.Get,
FuncMap: defaultFuncMap,
Recorder: recorder,
Expand All @@ -334,68 +371,105 @@ func (c *Controller) Start(ctx context.Context) error {
return fmt.Errorf("failed to create pods controller: %w", err)
}

podOnNodeManageQueue := queue.NewQueue[string]()
if nodeLeases != nil {
getNodeOwnerFunc = func(nodeName string) []metav1.OwnerReference {
nodeInfo, ok := nodes.Get(nodeName)
if !ok || nodeInfo == nil {
return nil
}
return nodeInfo.OwnerReferences
}
nodeManageQueue := queue.NewQueue[string]()
onLeaseNodeManageFunc = func(nodeName string) {
// Manage the node and play stage all pods on the node
nodes.Manage(nodeName)
pods.PlayStagePodsOnNode(nodeName)
nodeManageQueue.Add(nodeName)
podOnNodeManageQueue.Add(nodeName)
}

onNodeManagedFunc = func(nodeName string) {
// Try to hold the lease
nodeLeases.TryHold(nodeName)
}

go func() {
for {
nodeName := nodeManageQueue.GetOrWait()
node, ok := nodesCache.Get(nodeName)
if !ok {
logger.Warn("node not found in cache", "node", nodeName)
err := nodesInformer.Sync(ctx, informer.Option{
FieldSelector: fields.OneTermEqualSelector("metadata.name", nodeName).String(),
}, nodeChan)
if err != nil {
logger.Error("failed to update node", err, "node", nodeName)
}
continue
}
nodeChan <- informer.Event[*corev1.Node]{
Type: informer.Sync,
Object: node,
}
}
}()
} else {
onNodeManagedFunc = func(nodeName string) {
// Play stage all pods on the node
pods.PlayStagePodsOnNode(nodeName)
podOnNodeManageQueue.Add(nodeName)
}
}

go func() {
for {
nodeName := podOnNodeManageQueue.GetOrWait()
err = podsInformer.Sync(ctx, informer.Option{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
}, podsChan)
if err != nil {
logger.Error("failed to update pods on node", err, "node", nodeName)
}
}
}()

c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.typedClient.CoreV1().Events("")})
if nodeLeases != nil {
err := nodeLeases.Start(ctx)
err := nodeLeases.Start(ctx, nodeLeasesChan)
if err != nil {
return fmt.Errorf("failed to start node leases controller: %w", err)
}
}
err = pods.Start(ctx)
err = pods.Start(ctx, podsChan)
if err != nil {
return fmt.Errorf("failed to start pods controller: %w", err)
}
err = nodes.Start(ctx)
err = nodes.Start(ctx, nodeChan)
if err != nil {
return fmt.Errorf("failed to start nodes controller: %w", err)
}

c.pods = pods
c.nodes = nodes
c.nodeLeases = nodeLeases
c.nodeCacheGetter = nodesCache
c.podCacheGetter = podsCache
return nil
}

// GetNode returns the node with the given name
func (c *Controller) GetNode(nodeName string) (*NodeInfo, bool) {
// GetNodeInfo returns the node info for the given node
func (c *Controller) GetNodeInfo(nodeName string) (*NodeInfo, bool) {
return c.nodes.Get(nodeName)
}

// ListNodes returns all nodes
func (c *Controller) ListNodes() []*NodeInfo {
func (c *Controller) ListNodes() []string {
return c.nodes.List()
}

// ListPods returns all pods on the given node
func (c *Controller) ListPods(nodeName string) ([]*PodInfo, bool) {
func (c *Controller) ListPods(nodeName string) ([]log.ObjectRef, bool) {
return c.pods.List(nodeName)
}

// GetPodCache returns the pod cache
func (c *Controller) GetPodCache() informer.Getter[*corev1.Pod] {
return c.podCacheGetter
}

// GetNodeCache returns the node cache
func (c *Controller) GetNodeCache() informer.Getter[*corev1.Node] {
return c.nodeCacheGetter
}

// Identity returns a unique identifier for this controller
func Identity() (string, error) {
hostname, err := os.Hostname()
Expand Down
Loading