Skip to content

Commit

Permalink
Add node lease controller
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed May 18, 2023
1 parent 9f9da67 commit eaf3df3
Show file tree
Hide file tree
Showing 32 changed files with 1,110 additions and 139 deletions.
11 changes: 11 additions & 0 deletions kustomize/kwok/kwok-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ rules:
verbs:
- update
- patch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- update
- patch
- watch
- list
- get
1 change: 1 addition & 0 deletions kustomize/kwok/kwok-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ spec:
- --node-ip=$(POD_IP)
- --node-port=10247
- --cidr=10.0.0.1/24
- --node-lease-duration-seconds=40
env:
- name: POD_IP
valueFrom:
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/config/v1alpha1/kwok_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,11 @@ type KwokConfigurationOptions struct {
// NodePlayStageParallelism is the number of NodePlayStages that are allowed to run in parallel.
// +default=4
NodePlayStageParallelism uint `json:"nodePlayStageParallelism,omitempty"`

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds uint `json:"nodeLeaseDurationSeconds,omitempty"`

// NodeLeaseParallelism is the number of NodeLeases that are allowed to be processed in parallel.
// +default=4
NodeLeaseParallelism uint `json:"nodeLeaseParallelism,omitempty"`
}
4 changes: 4 additions & 0 deletions pkg/apis/config/v1alpha1/kwokctl_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ type KwokctlConfigurationOptions struct {
// +default=1200000
NodeStatusUpdateFrequencyMilliseconds int64 `json:"nodeStatusUpdateFrequencyMilliseconds,omitempty"`

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
// +default=1200
NodeLeaseDurationSeconds uint `json:"nodeLeaseDurationSeconds,omitempty"`

// BindAddress is the address to bind to.
// +default="0.0.0.0"
BindAddress string `json:"bindAddress,omitempty"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/config/v1alpha1/zz_generated.defaults.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/apis/internalversion/kwok_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ type KwokConfigurationOptions struct {

// NodePlayStageParallelism is the number of NodePlayStages that are allowed to run in parallel.
NodePlayStageParallelism uint

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds uint

// NodeLeaseParallelism is the number of NodeLeases that are allowed to be processed in parallel.
NodeLeaseParallelism uint
}
3 changes: 3 additions & 0 deletions pkg/apis/internalversion/kwokctl_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ type KwokctlConfigurationOptions struct {
// NodeStatusUpdateFrequencyMilliseconds is --node-status-update-frequency for kwok like kubelet.
NodeStatusUpdateFrequencyMilliseconds int64

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds uint

// BindAddress is the address to bind to.
BindAddress string
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/internalversion/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/config/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ func setKwokctlConfigurationDefaults(config *configv1alpha1.KwokctlConfiguration
}
}

// Disable node lease duration seconds for kubernetes < 1.14
if conf.NodeLeaseDurationSeconds != 0 {
minor := parseRelease(conf.KubeVersion)
if minor < 14 && minor != -1 {
conf.NodeLeaseDurationSeconds = 0
}
}

setKwokctlKubernetesConfig(conf)

setKwokctlKwokConfig(conf)
Expand Down
19 changes: 19 additions & 0 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/config"
Expand Down Expand Up @@ -81,6 +82,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringVar(&flags.Kubeconfig, "kubeconfig", flags.Kubeconfig, "Path to the kubeconfig file to use")
cmd.Flags().StringVar(&flags.Master, "master", flags.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
cmd.Flags().StringVar(&flags.Options.ServerAddress, "server-address", flags.Options.ServerAddress, "Address to expose the server on")
cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease seconds")

cmd.Flags().BoolVar(&flags.Options.EnableCNI, "experimental-enable-cni", flags.Options.EnableCNI, "Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux")
if config.GOOS != "linux" {
Expand Down Expand Up @@ -145,6 +147,13 @@ func runE(ctx context.Context, flags *flagpole) error {
if err != nil {
return err
}
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
}
}
podStages := filterStages(stagesData, "v1", "Pod")
if len(podStages) == 0 {
Expand All @@ -154,7 +163,14 @@ func runE(ctx context.Context, flags *flagpole) error {
}
}

id, err := controllers.Identity()
if err != nil {
return err
}
ctx = log.NewContext(ctx, logger.With("id", id))

ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
ClientSet: typedClient,
EnableCNI: flags.Options.EnableCNI,
ManageAllNodes: flags.Options.ManageAllNodes,
Expand All @@ -170,6 +186,9 @@ func runE(ctx context.Context, flags *flagpole) error {
NodePlayStageParallelism: flags.Options.NodePlayStageParallelism,
NodeStages: nodeStages,
PodStages: podStages,
NodeLeaseParallelism: flags.Options.NodeLeaseParallelism,
NodeLeaseDurationSeconds: flags.Options.NodeLeaseDurationSeconds,
ID: id,
})
if err != nil {
return err
Expand Down
102 changes: 97 additions & 5 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ package controllers
import (
"context"
"fmt"
"os"
"strings"
"text/template"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
"sigs.k8s.io/yaml"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
Expand Down Expand Up @@ -68,12 +72,14 @@ var (
type Controller struct {
nodes *NodeController
pods *PodController
nodeLeases *NodeLeaseController
broadcaster record.EventBroadcaster
clientSet kubernetes.Interface
}

// Config is the configuration for the controller
type Config struct {
Clock clock.Clock
EnableCNI bool
ClientSet kubernetes.Interface
ManageAllNodes bool
Expand All @@ -89,6 +95,9 @@ type Config struct {
NodeStages []*internalversion.Stage
PodPlayStageParallelism uint
NodePlayStageParallelism uint
NodeLeaseDurationSeconds uint
NodeLeaseParallelism uint
ID string
}

// NewController creates a new fake kubelet controller
Expand Down Expand Up @@ -121,9 +130,49 @@ func NewController(conf Config) (*Controller, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})

var onNodeManagedFunc func(ctx context.Context, nodeName string) error
var (
nodeLeases *NodeLeaseController
getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference
onLeaseNodeManageFunc func(nodeName string)
onNodeManagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
)

if conf.NodeLeaseDurationSeconds != 0 {
leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second
// https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200
renewInterval := leaseDuration / 4
// https://github.com/kubernetes/component-helpers/blob/d17b6f1e84500ee7062a26f5327dc73cb3e9374a/apimachinery/lease/controller.go#L100
renewIntervalJitter := 0.04
l, err := NewNodeLeaseController(NodeLeaseControllerConfig{
Clock: conf.Clock,
ClientSet: conf.ClientSet,
LeaseDurationSeconds: conf.NodeLeaseDurationSeconds,
LeaseParallelism: conf.NodeLeaseParallelism,
RenewInterval: renewInterval,
RenewIntervalJitter: renewIntervalJitter,
LeaseNamespace: corev1.NamespaceNodeLease,
MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference {
return getNodeOwnerFunc(nodeName)
}),
HolderIdentity: conf.ID,
OnNodeManagedFunc: func(nodeName string) {
onLeaseNodeManageFunc(nodeName)
},
})
if err != nil {
return nil, fmt.Errorf("failed to create node leases controller: %w", err)
}
nodeLeases = l

// Not holding the lease means the node is not managed
readOnlyFunc = func(nodeName string) bool {
return !nodeLeases.Held(nodeName)
}
}

nodes, err := NewNodeController(NodeControllerConfig{
Clock: conf.Clock,
ClientSet: conf.ClientSet,
NodeIP: conf.NodeIP,
NodeName: conf.NodeName,
Expand All @@ -132,19 +181,21 @@ func NewController(conf Config) (*Controller, error) {
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector,
NodeSelectorFunc: nodeSelectorFunc,
OnNodeManagedFunc: func(ctx context.Context, nodeName string) error {
return onNodeManagedFunc(ctx, nodeName)
OnNodeManagedFunc: func(nodeName string) {
onNodeManagedFunc(nodeName)
},
Stages: conf.NodeStages,
PlayStageParallelism: conf.NodePlayStageParallelism,
FuncMap: defaultFuncMap,
Recorder: recorder,
ReadOnlyFunc: readOnlyFunc,
})
if err != nil {
return nil, fmt.Errorf("failed to create nodes controller: %w", err)
}

pods, err := NewPodController(PodControllerConfig{
Clock: conf.Clock,
EnableCNI: conf.EnableCNI,
ClientSet: conf.ClientSet,
NodeIP: conf.NodeIP,
Expand All @@ -153,19 +204,45 @@ func NewController(conf Config) (*Controller, error) {
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
Stages: conf.PodStages,
PlayStageParallelism: conf.PodPlayStageParallelism,
Namespace: corev1.NamespaceAll,
NodeGetFunc: nodes.Get,
FuncMap: defaultFuncMap,
Recorder: recorder,
ReadOnlyFunc: readOnlyFunc,
})
if err != nil {
return nil, fmt.Errorf("failed to create pods controller: %w", err)
}

onNodeManagedFunc = pods.PlayStagePodsOnNode
if nodeLeases != nil {
getNodeOwnerFunc = func(nodeName string) []metav1.OwnerReference {
nodeInfo, ok := nodes.Get(nodeName)
if !ok || nodeInfo == nil {
return nil
}
return nodeInfo.OwnerReferences
}
onLeaseNodeManageFunc = func(nodeName string) {
// Manage the node and play stage all pods on the node
nodes.Manage(nodeName)
pods.PlayStagePodsOnNode(nodeName)
}

onNodeManagedFunc = func(nodeName string) {
// Try to hold the lease
nodeLeases.TryHold(nodeName)
}
} else {
onNodeManagedFunc = func(nodeName string) {
// Play stage all pods on the node
pods.PlayStagePodsOnNode(nodeName)
}
}

n := &Controller{
pods: pods,
nodes: nodes,
nodeLeases: nodeLeases,
broadcaster: eventBroadcaster,
clientSet: conf.ClientSet,
}
Expand All @@ -176,7 +253,12 @@ func NewController(conf Config) (*Controller, error) {
// Start starts the controller
func (c *Controller) Start(ctx context.Context) error {
c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.clientSet.CoreV1().Events("")})

if c.nodeLeases != nil {
err := c.nodeLeases.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start node leases controller: %w", err)
}
}
err := c.pods.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start pods controller: %w", err)
Expand All @@ -187,3 +269,13 @@ func (c *Controller) Start(ctx context.Context) error {
}
return nil
}

// Identity returns a unique identifier for this controller
func Identity() (string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", fmt.Errorf("unable to get hostname: %w", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
return hostname + "_" + string(uuid.NewUUID()), nil
}
2 changes: 2 additions & 0 deletions pkg/kwok/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func TestController(t *testing.T) {
}

nodeStages, _ := NewStagesFromYaml([]byte(stages.DefaultNodeStages))
nodeHeartbeatStage, _ := NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
nodeStages = append(nodeStages, nodeHeartbeatStage...)
podStages, _ := NewStagesFromYaml([]byte(stages.DefaultPodStages))

tests := []struct {
Expand Down
Loading

0 comments on commit eaf3df3

Please sign in to comment.