Skip to content

Commit

Permalink
Adjusting the config and CRD checks
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jul 5, 2023
1 parent 8e79793 commit baab3be
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 54 deletions.
137 changes: 95 additions & 42 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,93 @@ func runE(ctx context.Context, flags *flagpole) error {
}
}

stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx)
err := checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.StageKind, stagesData)
if err != nil {
return err
}

nodeStages := filterStages(stagesData, "v1", "Node")
podStages := filterStages(stagesData, "v1", "Pod")
if !slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) {
if len(nodeStages) == 0 {
logger.Warn("No node stages found, using default node stages")
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
if err != nil {
return err
}
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
}
}

if len(podStages) == 0 {
logger.Warn("No pod stages found, using default pod stages")
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
if err != nil {
return err
}
}
}

clusterPortForwards := config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, clusterPortForwards)
if err != nil {
return err
}

portForwards := config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, portForwards)
if err != nil {
return err
}

clusterExecs := config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, clusterExecs)
if err != nil {
return err
}

execs := config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, execs)
if err != nil {
return err
}

clusterLogs := config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, clusterLogs)
if err != nil {
return err
}

logs := config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, logs)
if err != nil {
return err
}

clusterAttaches := config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, clusterAttaches)
if err != nil {
return err
}

attaches := config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, attaches)
if err != nil {
return err
}

metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, metrics)
if err != nil {
return err
}

if flags.Kubeconfig == "" && flags.Master == "" {
logger.Warn("Neither --kubeconfig nor --master was specified")
logger.Info("Using the inClusterConfig")
Expand Down Expand Up @@ -146,46 +233,12 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx)
var nodeStages []*internalversion.Stage
var podStages []*internalversion.Stage

if slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) {
if len(stagesData) != 0 {
return fmt.Errorf("stage already exists, cannot watch CRD")
}
} else {
nodeStages = filterStages(stagesData, "v1", "Node")
if len(nodeStages) == 0 {
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
if err != nil {
return err
}
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
}
}
podStages = filterStages(stagesData, "v1", "Pod")
if len(podStages) == 0 {
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
if err != nil {
return err
}
}
}

id, err := controllers.Identity()
if err != nil {
return err
}
ctx = log.NewContext(ctx, logger.With("id", id))

metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)

ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
TypedClient: typedClient,
Expand Down Expand Up @@ -218,14 +271,6 @@ func runE(ctx context.Context, flags *flagpole) error {
}

if serverAddress != "" {
clusterPortForwards := config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
portForwards := config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
clusterExecs := config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
execs := config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
clusterLogs := config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
logs := config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
clusterAttaches := config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
attaches := config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
config := server.Config{
TypedKwokClient: typedKwokClient,
EnableCRDs: flags.Options.EnableCRDs,
Expand Down Expand Up @@ -275,6 +320,14 @@ func runE(ctx context.Context, flags *flagpole) error {
return nil
}

func checkConfigOrCRD[T metav1.Object](crds []string, kind string, crs []T) error {
if slices.Contains(crds, kind) && len(crs) != 0 {
return fmt.Errorf("%s already exists in --config, so please remove it, or remove %s from --enable-crd", kind, kind)
}

return nil
}

func filterStages(stages []*internalversion.Stage, apiGroup, kind string) []*internalversion.Stage {
return slices.Filter(stages, func(stage *internalversion.Stage) bool {
return stage.Spec.ResourceRef.APIGroup == apiGroup && stage.Spec.ResourceRef.Kind == kind
Expand Down
5 changes: 5 additions & 0 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ func (c *Controller) Start(ctx context.Context) error {
})
return lifecycle
})

err := getter.Start(ctx)
if err != nil {
return err
}
} else {
lifecycle, err := NewLifecycle(conf.PodStages)
if err != nil {
Expand Down
25 changes: 13 additions & 12 deletions pkg/kwokctl/runtime/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ func (c *Cluster) Save(ctx context.Context) error {

kwokConfigs := config.FilterWithTypeFromContext[*internalversion.KwokConfiguration](ctx)
if (len(kwokConfigs) == 0 || !slices.Contains(kwokConfigs[0].Options.EnableCRDs, v1alpha1.StageKind)) &&
conf.Options.NodeLeaseDurationSeconds == 0 &&
conf.Options.NodeStatusUpdateFrequencyMilliseconds > 0 &&
conf.Options.Runtime != consts.RuntimeTypeKind &&
conf.Options.Runtime != consts.RuntimeTypeKindPodman &&
len(config.FilterWithTypeFromContext[*internalversion.Stage](ctx)) == 0 {
Expand Down Expand Up @@ -213,17 +211,20 @@ func (c *Cluster) getDefaultStages(updateFrequency int64, nodeHeartbeat bool) ([
if err != nil {
return nil, err
}
hasUpdate := false
for _, stage := range nodeHeartbeatStages {
if stage.Name == "node-heartbeat" {
stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency)
stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10)
hasUpdate = true

if updateFrequency > 0 {
hasUpdate := false
for _, stage := range nodeHeartbeatStages {
if stage.Name == "node-heartbeat" {
stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency)
stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10)
hasUpdate = true
}
objs = append(objs, stage)
}
if !hasUpdate {
return nil, fmt.Errorf("failed to update node heartbeat stage")
}
objs = append(objs, stage)
}
if !hasUpdate {
return nil, fmt.Errorf("failed to update node heartbeat stage")
}
}
return objs, nil
Expand Down

0 comments on commit baab3be

Please sign in to comment.