generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 243
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* [pod] Export `FromObject()` * [cmd/importer] Initial implementation. * Review Remarks * Add extra labels * Review Remarks * Review Remarks * Review Remarks * Review Remarks
- Loading branch information
Showing
14 changed files
with
1,403 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Kueue Importer Tool | ||
|
||
A tool able to import existing pods into kueue. | ||
|
||
## Cluster setup. | ||
|
||
The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration framework disabled. Check Kueue's [installation guide](https://kueue.sigs.k8s.io/docs/installation/) and [Run Plain Pods](https://kueue.sigs.k8s.io/docs/tasks/run_plain_pods/#before-you-begin) for details. | ||
|
||
For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects. | ||
|
||
## Build | ||
|
||
From kueue source root run: | ||
```bash | ||
go build -C cmd/importer/ -o $(pwd)/bin/importer | ||
|
||
``` | ||
|
||
## Usage | ||
|
||
The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters. | ||
|
||
The importer will perform following checks: | ||
|
||
- At least one `namespace` is provided. | ||
- The label key (`queuelabel`) providing the queue mapping is provided. | ||
- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists. | ||
- The LocalQueues involved in the import are using an existing ClusterQueue. | ||
- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads. | ||
|
||
After which, if `--dry-run=false` was specified, for each selected Pod the importer will: | ||
|
||
- Update the Pod's Kueue related labels. | ||
- Create a Workload associated with the Pod. | ||
- Admit the Workload. | ||
### Example | ||
```bash | ||
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false | ||
``` | ||
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"maps" | ||
"os" | ||
|
||
"github.com/spf13/cobra" | ||
"go.uber.org/zap/zapcore" | ||
"gopkg.in/yaml.v2" | ||
"k8s.io/apimachinery/pkg/util/validation" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||
|
||
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" | ||
"sigs.k8s.io/kueue/cmd/importer/pod" | ||
"sigs.k8s.io/kueue/cmd/importer/util" | ||
"sigs.k8s.io/kueue/pkg/util/useragent" | ||
) | ||
|
||
const ( | ||
NamespaceFlag = "namespace" | ||
NamespaceFlagShort = "n" | ||
QueueMappingFlag = "queuemapping" | ||
QueueMappingFileFlag = "queuemapping-file" | ||
QueueLabelFlag = "queuelabel" | ||
QPSFlag = "qps" | ||
BurstFlag = "burst" | ||
VerbosityFlag = "verbose" | ||
VerboseFlagShort = "v" | ||
ConcurrencyFlag = "concurrent-workers" | ||
ConcurrencyFlagShort = "c" | ||
DryRunFlag = "dry-run" | ||
AddLabelsFlag = "add-labels" | ||
) | ||
|
||
var ( | ||
rootCmd = &cobra.Command{ | ||
Use: "importer", | ||
Short: "Import existing (running) objects into Kueue", | ||
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { | ||
v, _ := cmd.Flags().GetCount(VerbosityFlag) | ||
level := (v + 1) * -1 | ||
ctrl.SetLogger(zap.New( | ||
zap.UseDevMode(true), | ||
zap.ConsoleEncoder(), | ||
zap.Level(zapcore.Level(level)), | ||
)) | ||
return nil | ||
}, | ||
} | ||
) | ||
|
||
func setFlags(cmd *cobra.Command) { | ||
cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") | ||
cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") | ||
cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") | ||
cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads") | ||
cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") | ||
cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") | ||
cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") | ||
cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers") | ||
cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only") | ||
|
||
_ = cmd.MarkFlagRequired(QueueLabelFlag) | ||
_ = cmd.MarkFlagRequired(NamespaceFlag) | ||
} | ||
|
||
func init() { | ||
rootCmd.AddGroup(&cobra.Group{ | ||
ID: "pod", | ||
Title: "Pods import", | ||
}) | ||
rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)") | ||
|
||
importCmd := &cobra.Command{ | ||
Use: "import", | ||
GroupID: "pod", | ||
Short: "Checks the prerequisites and import pods.", | ||
RunE: importCmd, | ||
} | ||
setFlags(importCmd) | ||
rootCmd.AddCommand(importCmd) | ||
} | ||
|
||
func main() { | ||
err := rootCmd.Execute() | ||
if err != nil { | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) (*util.ImportCache, error) { | ||
flags := cmd.Flags() | ||
namespaces, err := flags.GetStringSlice(NamespaceFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
queueLabel, err := flags.GetString(QueueLabelFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mapping, err := flags.GetStringToString(QueueMappingFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mappingFile, err := flags.GetString(QueueMappingFileFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if mappingFile != "" { | ||
yamlFile, err := os.ReadFile(mappingFile) | ||
if err != nil { | ||
return nil, err | ||
} | ||
extraMapping := map[string]string{} | ||
err = yaml.Unmarshal(yamlFile, extraMapping) | ||
if err != nil { | ||
return nil, fmt.Errorf("decoding %q: %w", mappingFile, err) | ||
} | ||
maps.Copy(mapping, extraMapping) | ||
} | ||
|
||
addLabels, err := flags.GetStringToString(AddLabelsFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var validationErrors []error | ||
for name, value := range addLabels { | ||
for _, err := range validation.IsQualifiedName(name) { | ||
validationErrors = append(validationErrors, fmt.Errorf("name %q: %s", name, err)) | ||
} | ||
for _, err := range validation.IsValidLabelValue(value) { | ||
validationErrors = append(validationErrors, fmt.Errorf("label %q value %q: %s", name, value, err)) | ||
} | ||
} | ||
if len(validationErrors) > 0 { | ||
return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...)) | ||
} | ||
|
||
return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels) | ||
} | ||
|
||
func getKubeClient(cmd *cobra.Command) (client.Client, error) { | ||
kubeConfig, err := ctrl.GetConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if kubeConfig.UserAgent == "" { | ||
kubeConfig.UserAgent = useragent.Default() | ||
} | ||
qps, err := cmd.Flags().GetFloat32(QPSFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kubeConfig.QPS = qps | ||
bust, err := cmd.Flags().GetInt(BurstFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kubeConfig.Burst = bust | ||
|
||
if err := kueue.AddToScheme(scheme.Scheme); err != nil { | ||
return nil, err | ||
} | ||
|
||
c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return c, nil | ||
} | ||
|
||
func importCmd(cmd *cobra.Command, _ []string) error { | ||
log := ctrl.Log.WithName("import") | ||
ctx := ctrl.LoggerInto(context.Background(), log) | ||
cWorkers, _ := cmd.Flags().GetUint(ConcurrencyFlag) | ||
c, err := getKubeClient(cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
cache, err := loadMappingCache(ctx, c, cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err = pod.Check(ctx, c, cache, cWorkers); err != nil { | ||
return err | ||
} | ||
|
||
if dr, _ := cmd.Flags().GetBool(DryRunFlag); dr { | ||
fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag) | ||
return nil | ||
} | ||
return pod.Import(ctx, c, cache, cWorkers) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package pod | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/klog/v2" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
"sigs.k8s.io/kueue/cmd/importer/util" | ||
) | ||
|
||
func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { | ||
ch := make(chan corev1.Pod) | ||
go func() { | ||
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) | ||
if err != nil { | ||
ctrl.LoggerFrom(ctx).Error(err, "Listing pods") | ||
} | ||
}() | ||
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { | ||
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) | ||
log.V(3).Info("Checking") | ||
|
||
cq, err := cache.ClusterQueue(p) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if len(cq.Spec.ResourceGroups) == 0 { | ||
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid) | ||
} | ||
|
||
if len(cq.Spec.ResourceGroups[0].Flavors) == 0 { | ||
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid) | ||
} | ||
|
||
rfName := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) | ||
rf, rfFound := cache.ResourceFalvors[rfName] | ||
if !rfFound { | ||
return fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid) | ||
} | ||
|
||
log.V(2).Info("Successfully checked", "pod", klog.KObj(p), "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf)) | ||
return nil | ||
}) | ||
|
||
log := ctrl.LoggerFrom(ctx) | ||
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods) | ||
for e, pods := range summary.ErrorsForPods { | ||
log.Info("Validation failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) | ||
} | ||
return errors.Join(summary.Errors...) | ||
} |
Oops, something went wrong.