Skip to content

Commit

Permalink
Add lease controller
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed May 11, 2023
1 parent 19a5c56 commit df5bae2
Show file tree
Hide file tree
Showing 25 changed files with 602 additions and 108 deletions.
11 changes: 11 additions & 0 deletions kustomize/kwok/kwok-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ rules:
verbs:
- update
- patch
- apiGroups:
- coordination.k8s.io/v1
resources:
- leases
verbs:
- create
- update
- patch
- watch
- list
- get
1 change: 1 addition & 0 deletions kustomize/kwok/kwok-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ spec:
- --node-ip=$(POD_IP)
- --node-port=10247
- --cidr=10.0.0.1/24
- --node-lease-duration-seconds=40
env:
- name: POD_IP
valueFrom:
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/config/v1alpha1/kwok_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,11 @@ type KwokConfigurationOptions struct {
// NodePlayStageParallelism is the number of NodePlayStages that are allowed to run in parallel.
// +default=4
NodePlayStageParallelism uint `json:"nodePlayStageParallelism,omitempty"`

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds uint `json:"nodeLeaseDurationSeconds,omitempty"`

// NodeLeaseParallelism is the number of NodeLeases that are allowed to be processed in parallel.
// +default=4
NodeLeaseParallelism uint `json:"nodeLeaseParallelism,omitempty"`
}
4 changes: 4 additions & 0 deletions pkg/apis/config/v1alpha1/kwokctl_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ type KwokctlConfigurationOptions struct {
// +default=1200000
NodeStatusUpdateFrequencyMilliseconds int64 `json:"nodeStatusUpdateFrequencyMilliseconds,omitempty"`

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
// +default=1200
NodeLeaseDurationSeconds uint `json:"nodeLeaseDurationSeconds,omitempty"`

// BindAddress is the address to bind to.
// +default="0.0.0.0"
BindAddress string `json:"bindAddress,omitempty"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/config/v1alpha1/zz_generated.defaults.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/apis/internalversion/kwok_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ type KwokConfigurationOptions struct {

// NodePlayStageParallelism is the number of NodePlayStages that are allowed to run in parallel.
NodePlayStageParallelism uint

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds uint

// NodeLeaseParallelism is the number of NodeLeases that are allowed to be processed in parallel.
NodeLeaseParallelism uint
}
3 changes: 3 additions & 0 deletions pkg/apis/internalversion/kwokctl_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ type KwokctlConfigurationOptions struct {
// NodeStatusUpdateFrequencyMilliseconds is --node-status-update-frequency for kwok like kubelet.
NodeStatusUpdateFrequencyMilliseconds int64

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds uint

// BindAddress is the address to bind to.
BindAddress string
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/internalversion/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/config/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ func setKwokctlConfigurationDefaults(config *configv1alpha1.KwokctlConfiguration
}
}

// Disable node lease duration seconds for kubernetes < 1.14
if conf.NodeLeaseDurationSeconds != 0 {
minor := parseRelease(conf.KubeVersion)
if minor < 14 && minor != -1 {
conf.NodeLeaseDurationSeconds = 0
}
}

setKwokctlKubernetesConfig(conf)

setKwokctlKwokConfig(conf)
Expand Down
10 changes: 10 additions & 0 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringVar(&flags.Kubeconfig, "kubeconfig", flags.Kubeconfig, "Path to the kubeconfig file to use")
cmd.Flags().StringVar(&flags.Master, "master", flags.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
cmd.Flags().StringVar(&flags.Options.ServerAddress, "server-address", flags.Options.ServerAddress, "Address to expose the server on")
cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease seconds")

cmd.Flags().BoolVar(&flags.Options.EnableCNI, "experimental-enable-cni", flags.Options.EnableCNI, "Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux")
if config.GOOS != "linux" {
Expand Down Expand Up @@ -145,6 +146,13 @@ func runE(ctx context.Context, flags *flagpole) error {
if err != nil {
return err
}
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
}
}
podStages := filterStages(stagesData, "v1", "Pod")
if len(podStages) == 0 {
Expand All @@ -170,6 +178,8 @@ func runE(ctx context.Context, flags *flagpole) error {
NodePlayStageParallelism: flags.Options.NodePlayStageParallelism,
NodeStages: nodeStages,
PodStages: podStages,
NodeLeaseParallelism: flags.Options.NodeLeaseParallelism,
NodeLeaseDurationSeconds: flags.Options.NodeLeaseDurationSeconds,
})
if err != nil {
return err
Expand Down
62 changes: 61 additions & 1 deletion pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
"sigs.k8s.io/yaml"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
Expand Down Expand Up @@ -68,6 +70,7 @@ var (
type Controller struct {
nodes *NodeController
pods *PodController
leases *LeaseController
broadcaster record.EventBroadcaster
clientSet kubernetes.Interface
}
Expand All @@ -89,8 +92,15 @@ type Config struct {
NodeStages []*internalversion.Stage
PodPlayStageParallelism uint
NodePlayStageParallelism uint
NodeLeaseDurationSeconds uint
NodeLeaseParallelism uint
}

const (
// nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease
nodeLeaseRenewIntervalFraction = 0.25
)

// NewController creates a new fake kubelet controller
func NewController(conf Config) (*Controller, error) {
var nodeSelectorFunc func(node *corev1.Node) bool
Expand Down Expand Up @@ -121,6 +131,31 @@ func NewController(conf Config) (*Controller, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})

var leases *LeaseController
var setNodeOwner func(name string) []metav1.OwnerReference

if conf.NodeLeaseDurationSeconds != 0 {
leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second
renewInterval := time.Duration(float64(leaseDuration) * nodeLeaseRenewIntervalFraction)

l, err := NewLeaseController(LeaseControllerConfig{
Clock: clock.RealClock{},
ClientSet: conf.ClientSet,
LeaseDurationSeconds: conf.NodeLeaseDurationSeconds,
NodeLeaseParallelism: conf.NodeLeaseParallelism,
RenewInterval: renewInterval,
RenewIntervalJitter: 0.04,
LeaseNamespace: corev1.NamespaceNodeLease,
NewLeasePostProcessFunc: setNodeOwnerFunc(func(name string) []metav1.OwnerReference {
return setNodeOwner(name)
}),
})
if err != nil {
return nil, err
}
leases = l
}

var onNodeManagedFunc func(ctx context.Context, nodeName string) error

nodes, err := NewNodeController(NodeControllerConfig{
Expand All @@ -133,8 +168,17 @@ func NewController(conf Config) (*Controller, error) {
ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector,
NodeSelectorFunc: nodeSelectorFunc,
OnNodeManagedFunc: func(ctx context.Context, nodeName string) error {
if leases != nil {
leases.AddLease(nodeName)
}
return onNodeManagedFunc(ctx, nodeName)
},
OnNodeTeardownFunc: func(ctx context.Context, nodeName string) error {
if leases != nil {
leases.RemoveLease(nodeName)
}
return nil
},
Stages: conf.NodeStages,
PlayStageParallelism: conf.NodePlayStageParallelism,
FuncMap: defaultFuncMap,
Expand All @@ -144,6 +188,16 @@ func NewController(conf Config) (*Controller, error) {
return nil, fmt.Errorf("failed to create nodes controller: %w", err)
}

if leases != nil {
setNodeOwner = func(name string) []metav1.OwnerReference {
nodeInfo, ok := nodes.Get(name)
if !ok {
return nil
}
return nodeInfo.OwnerReferences
}
}

pods, err := NewPodController(PodControllerConfig{
EnableCNI: conf.EnableCNI,
ClientSet: conf.ClientSet,
Expand All @@ -166,6 +220,7 @@ func NewController(conf Config) (*Controller, error) {
n := &Controller{
pods: pods,
nodes: nodes,
leases: leases,
broadcaster: eventBroadcaster,
clientSet: conf.ClientSet,
}
Expand All @@ -176,7 +231,12 @@ func NewController(conf Config) (*Controller, error) {
// Start starts the controller
func (c *Controller) Start(ctx context.Context) error {
c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.clientSet.CoreV1().Events("")})

if c.leases != nil {
err := c.leases.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start leases controller: %w", err)
}
}
err := c.pods.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start pods controller: %w", err)
Expand Down
Loading

0 comments on commit df5bae2

Please sign in to comment.