Skip to content

Commit

Permalink
Add scheduler predictor (#64)
Browse files Browse the repository at this point in the history
* A scheduler plugin imitator that tries to verify if proposed pods could be schedulable in the cluster.

Limit the amount of pods to 1 per node to be schedulable. This does not accurately take into account interpod anti-affinities, but also does not leak information from other namespaces.

Add some smoke tests to verify we actually cover some of our base ideas, but do not replicate the actual kube-scheduler tests for every verification.

* Add new command, "tools estimate" which gets the parameters amount of pods and how much memory and cpu each pod needs
  • Loading branch information
burmanm authored Dec 10, 2024
1 parent baa199b commit 82ff69d
Show file tree
Hide file tree
Showing 8 changed files with 786 additions and 254 deletions.
6 changes: 2 additions & 4 deletions cmd/kubectl-k8ssandra/k8ssandra/k8ssandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/nodetool"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/operate"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/register"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/tools"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/users"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -40,21 +41,18 @@ func NewCmd(streams genericclioptions.IOStreams) *cobra.Command {
}

// Add subcommands
// cmd.AddCommand(nodetool.NewCmd(streams))
// cmd.AddCommand(cqlsh.NewCmd(streams))
// cmd.AddCommand(cleaner.NewCmd(streams))
// cmd.AddCommand(crds.NewCmd(streams))
// cmd.AddCommand(edit.NewCmd(streams))
cmd.AddCommand(operate.NewStartCmd(streams))
cmd.AddCommand(operate.NewRestartCmd(streams))
cmd.AddCommand(operate.NewStopCmd(streams))
// cmd.AddCommand(list.NewCmd(streams))
// cmd.AddCommand(migrate.NewCmd(streams))
cmd.AddCommand(users.NewCmd(streams))
// cmd.AddCommand(migrate.NewInstallCmd(streams))
cmd.AddCommand(config.NewCmd(streams))
cmd.AddCommand(helm.NewHelmCmd(streams))
cmd.AddCommand(nodetool.NewCmd(streams))
cmd.AddCommand(tools.NewToolsCmd(streams))
register.SetupRegisterClusterCmd(cmd, streams)

// cmd.Flags().BoolVar(&o.listNamespaces, "list", o.listNamespaces, "if true, print the list of all namespaces in the current KUBECONFIG")
Expand Down
183 changes: 183 additions & 0 deletions cmd/kubectl-k8ssandra/tools/estimate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package tools

import (
"context"
"fmt"

"github.com/charmbracelet/log"

"github.com/pkg/errors"

"github.com/k8ssandra/k8ssandra-client/pkg/kubernetes"
"github.com/k8ssandra/k8ssandra-client/pkg/scheduler"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/cli-runtime/pkg/genericclioptions"
)

var (
estimateExample = `
# Estimate if pods will fit the cluster
%[1]s estimate [<args>]
# Estimate if 4 pods each with 2Gi of memory and 2 vCPUs will be able to run on the cluster.
# All CPU values and memory use Kubernetes notation
%[1]s estimate --count 4 --memory 2Gi --cpu 2000m
`
errInvalidCount = errors.New("Count of pods must be higher than 0")
)

type estimateOptions struct {
configFlags *genericclioptions.ConfigFlags
genericclioptions.IOStreams
rawConfig clientcmdapi.Config

count int
memory string
cpu string

cpuQuantity resource.Quantity
memoryQuantity resource.Quantity
}

func newEstimateOptions(streams genericclioptions.IOStreams) *estimateOptions {
return &estimateOptions{
configFlags: genericclioptions.NewConfigFlags(true),
IOStreams: streams,
}
}

// NewCmd provides a cobra command wrapping newAddOptions
func NewEstimateCmd(streams genericclioptions.IOStreams) *cobra.Command {
o := newEstimateOptions(streams)

cmd := &cobra.Command{
Use: "estimate [flags]",
Short: "Estimate if datacenter can be expanded",
Example: fmt.Sprintf(estimateExample, "kubectl k8ssandra tools estimate"),
RunE: func(c *cobra.Command, args []string) error {
if err := o.Complete(c, args); err != nil {
return err
}
if err := o.Validate(); err != nil {
return err
}
if err := o.Run(); err != nil {
log.Printf("Error: %v", err)
} else {
log.Printf("Pods can be scheduled to current cluster %s", o.rawConfig.CurrentContext)
}

return nil
},
}

fl := cmd.Flags()
fl.IntVar(&o.count, "count", 0, "new nodes to create")
fl.StringVar(&o.cpu, "cpu", "0", "how many cores per node")
fl.StringVar(&o.memory, "memory", "0", "how much memory per node")

if err := cmd.MarkFlagRequired("memory"); err != nil {
panic(err)
}

if err := cmd.MarkFlagRequired("cpu"); err != nil {
panic(err)
}

if err := cmd.MarkFlagRequired("count"); err != nil {
panic(err)
}

o.configFlags.AddFlags(fl)
return cmd
}

// Complete parses the arguments and necessary flags to options
func (c *estimateOptions) Complete(cmd *cobra.Command, args []string) error {
return nil
}

// Validate ensures that all required arguments and flag values are provided
func (c *estimateOptions) Validate() error {
if c.count <= 0 {
return errInvalidCount
}

var err error
cpuQuantity, err := resource.ParseQuantity(c.cpu)
if err != nil {
return err
}

memoryQuantity, err := resource.ParseQuantity(c.memory)
if err != nil {
return err
}

c.memoryQuantity = memoryQuantity
c.cpuQuantity = cpuQuantity

f, err := c.configFlags.ToRawKubeConfigLoader().RawConfig()
if err != nil {
return err
}
c.rawConfig = f

return nil
}

// Run processes the input, creates a connection to Kubernetes and processes a secret to add the users
func (c *estimateOptions) Run() error {
restConfig, err := c.configFlags.ToRESTConfig()
if err != nil {
return err
}

kubeClient, err := kubernetes.GetClient(restConfig)
if err != nil {
return err
}
ctx := context.Background()

proposedPods := makePods(c.count, makeResources(c.cpuQuantity.MilliValue(), c.memoryQuantity.Value()))

if err := scheduler.TryScheduling(ctx, kubeClient, proposedPods); err != nil {
return errors.Wrap(err, fmt.Sprintf("Unable to schedule the pods to current cluster %s", c.rawConfig.CurrentContext))
}

return nil
}

func makePods(count int, resources corev1.ResourceList) []*corev1.Pod {
pods := make([]*corev1.Pod, count)
for i := 0; i < count; i++ {
pods[i] = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "a",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: resources,
},
},
},
},
}
}

return pods
}

func makeResources(milliCPU, memory int64) corev1.ResourceList {
return corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
}
}
35 changes: 35 additions & 0 deletions cmd/kubectl-k8ssandra/tools/tools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package tools

import (
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
)

type ClientOptions struct {
configFlags *genericclioptions.ConfigFlags
genericclioptions.IOStreams
}

// NewToolsOptions provides an instance of NamespaceOptions with default values
func NewToolsOptions(streams genericclioptions.IOStreams) *ClientOptions {
return &ClientOptions{
configFlags: genericclioptions.NewConfigFlags(true),
IOStreams: streams,
}
}

// NewToolsCmd provides a cobra command wrapping ClientOptions
func NewToolsCmd(streams genericclioptions.IOStreams) *cobra.Command {
o := NewToolsOptions(streams)

cmd := &cobra.Command{
Use: "tools [subcommand] [flags]",
}

// Add subcommands
cmd.AddCommand(NewEstimateCmd(streams))

o.configFlags.AddFlags(cmd.Flags())

return cmd
}
Loading

0 comments on commit 82ff69d

Please sign in to comment.