Skip to content

Commit

Permalink
Merge pull request #703 from wzshiming/feat/config-and-crd
Browse files Browse the repository at this point in the history
Adjusting the config and CRD checks
  • Loading branch information
wzshiming authored Jul 5, 2023
2 parents 8e79793 + 3148782 commit 662c195
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 56 deletions.
10 changes: 9 additions & 1 deletion kustomize/kwok-with-cni/deployment-patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ spec:
- --node-ip=$(POD_IP)
- --node-port=10247
- --node-lease-duration-seconds=40
- --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs
- --enable-crd=Stage
- --enable-crd=Attach
- --enable-crd=ClusterAttach
- --enable-crd=Exec
- --enable-crd=ClusterExec
- --enable-crd=Logs
- --enable-crd=ClusterLogs
- --enable-crd=PortForward
- --enable-crd=ClusterPortForward
- --experimental-enable-cni=true
volumeMounts:
- name: etc-cni
Expand Down
10 changes: 9 additions & 1 deletion kustomize/kwok/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ spec:
- --node-port=10247
- --cidr=10.0.0.1/24
- --node-lease-duration-seconds=40
- --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs
- --enable-crd=Stage
- --enable-crd=Attach
- --enable-crd=ClusterAttach
- --enable-crd=Exec
- --enable-crd=ClusterExec
- --enable-crd=Logs
- --enable-crd=ClusterLogs
- --enable-crd=PortForward
- --enable-crd=ClusterPortForward
env:
- name: POD_IP
valueFrom:
Expand Down
156 changes: 114 additions & 42 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ func NewCommand(ctx context.Context) *cobra.Command {
return cmd
}

var crdDefines = map[string]struct{}{
v1alpha1.StageKind: {},
v1alpha1.AttachKind: {},
v1alpha1.ClusterAttachKind: {},
v1alpha1.ExecKind: {},
v1alpha1.ClusterExecKind: {},
v1alpha1.PortForwardKind: {},
v1alpha1.ClusterPortForwardKind: {},
v1alpha1.LogsKind: {},
v1alpha1.ClusterLogsKind: {},
v1alpha1.MetricKind: {},
}

func runE(ctx context.Context, flags *flagpole) error {
logger := log.FromContext(ctx)

Expand All @@ -110,6 +123,99 @@ func runE(ctx context.Context, flags *flagpole) error {
}
}

for _, crd := range flags.Options.EnableCRDs {
if _, ok := crdDefines[crd]; !ok {
return fmt.Errorf("invalid crd: %s", crd)
}
}

stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx)
err := checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.StageKind, stagesData)
if err != nil {
return err
}

nodeStages := filterStages(stagesData, "v1", "Node")
podStages := filterStages(stagesData, "v1", "Pod")
if !slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) {
if len(nodeStages) == 0 {
logger.Warn("No node stages found, using default node stages")
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
if err != nil {
return err
}
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
}
}

if len(podStages) == 0 {
logger.Warn("No pod stages found, using default pod stages")
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
if err != nil {
return err
}
}
}

clusterPortForwards := config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, clusterPortForwards)
if err != nil {
return err
}

portForwards := config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, portForwards)
if err != nil {
return err
}

clusterExecs := config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, clusterExecs)
if err != nil {
return err
}

execs := config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, execs)
if err != nil {
return err
}

clusterLogs := config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, clusterLogs)
if err != nil {
return err
}

logs := config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, logs)
if err != nil {
return err
}

clusterAttaches := config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, clusterAttaches)
if err != nil {
return err
}

attaches := config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, attaches)
if err != nil {
return err
}

metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, metrics)
if err != nil {
return err
}

if flags.Kubeconfig == "" && flags.Master == "" {
logger.Warn("Neither --kubeconfig nor --master was specified")
logger.Info("Using the inClusterConfig")
Expand Down Expand Up @@ -146,46 +252,12 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx)
var nodeStages []*internalversion.Stage
var podStages []*internalversion.Stage

if slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) {
if len(stagesData) != 0 {
return fmt.Errorf("stage already exists, cannot watch CRD")
}
} else {
nodeStages = filterStages(stagesData, "v1", "Node")
if len(nodeStages) == 0 {
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
if err != nil {
return err
}
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
}
}
podStages = filterStages(stagesData, "v1", "Pod")
if len(podStages) == 0 {
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
if err != nil {
return err
}
}
}

id, err := controllers.Identity()
if err != nil {
return err
}
ctx = log.NewContext(ctx, logger.With("id", id))

metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)

ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
TypedClient: typedClient,
Expand Down Expand Up @@ -218,14 +290,6 @@ func runE(ctx context.Context, flags *flagpole) error {
}

if serverAddress != "" {
clusterPortForwards := config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
portForwards := config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
clusterExecs := config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
execs := config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
clusterLogs := config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
logs := config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
clusterAttaches := config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
attaches := config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
config := server.Config{
TypedKwokClient: typedKwokClient,
EnableCRDs: flags.Options.EnableCRDs,
Expand Down Expand Up @@ -275,6 +339,14 @@ func runE(ctx context.Context, flags *flagpole) error {
return nil
}

func checkConfigOrCRD[T metav1.Object](crds []string, kind string, crs []T) error {
if slices.Contains(crds, kind) && len(crs) != 0 {
return fmt.Errorf("%s already exists in --config, so please remove it, or remove %s from --enable-crd", kind, kind)
}

return nil
}

func filterStages(stages []*internalversion.Stage, apiGroup, kind string) []*internalversion.Stage {
return slices.Filter(stages, func(stage *internalversion.Stage) bool {
return stage.Spec.ResourceRef.APIGroup == apiGroup && stage.Spec.ResourceRef.Kind == kind
Expand Down
5 changes: 5 additions & 0 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ func (c *Controller) Start(ctx context.Context) error {
})
return lifecycle
})

err := getter.Start(ctx)
if err != nil {
return err
}
} else {
lifecycle, err := NewLifecycle(conf.PodStages)
if err != nil {
Expand Down
25 changes: 13 additions & 12 deletions pkg/kwokctl/runtime/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ func (c *Cluster) Save(ctx context.Context) error {

kwokConfigs := config.FilterWithTypeFromContext[*internalversion.KwokConfiguration](ctx)
if (len(kwokConfigs) == 0 || !slices.Contains(kwokConfigs[0].Options.EnableCRDs, v1alpha1.StageKind)) &&
conf.Options.NodeLeaseDurationSeconds == 0 &&
conf.Options.NodeStatusUpdateFrequencyMilliseconds > 0 &&
conf.Options.Runtime != consts.RuntimeTypeKind &&
conf.Options.Runtime != consts.RuntimeTypeKindPodman &&
len(config.FilterWithTypeFromContext[*internalversion.Stage](ctx)) == 0 {
Expand Down Expand Up @@ -213,17 +211,20 @@ func (c *Cluster) getDefaultStages(updateFrequency int64, nodeHeartbeat bool) ([
if err != nil {
return nil, err
}
hasUpdate := false
for _, stage := range nodeHeartbeatStages {
if stage.Name == "node-heartbeat" {
stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency)
stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10)
hasUpdate = true

if updateFrequency > 0 {
hasUpdate := false
for _, stage := range nodeHeartbeatStages {
if stage.Name == "node-heartbeat" {
stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency)
stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10)
hasUpdate = true
}
objs = append(objs, stage)
}
if !hasUpdate {
return nil, fmt.Errorf("failed to update node heartbeat stage")
}
objs = append(objs, stage)
}
if !hasUpdate {
return nil, fmt.Errorf("failed to update node heartbeat stage")
}
}
return objs, nil
Expand Down

0 comments on commit 662c195

Please sign in to comment.