Skip to content

Commit

Permalink
Support pod template
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Chen <github@chenyicn.net>
  • Loading branch information
ChenYi015 committed Oct 23, 2024
1 parent 0a9c591 commit acae606
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 26 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
gocloud.dev v0.40.0
golang.org/x/mod v0.20.0
golang.org/x/net v0.30.0
golang.org/x/time v0.7.0
helm.sh/helm/v3 v3.16.2
Expand All @@ -30,6 +31,7 @@ require (
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/controller-runtime v0.17.5
sigs.k8s.io/scheduler-plugins v0.29.8
sigs.k8s.io/yaml v1.4.0
volcano.sh/apis v1.9.0
)

Expand Down Expand Up @@ -229,7 +231,6 @@ require (
sigs.k8s.io/kustomize/api v0.17.2 // indirect
sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace (
Expand Down
19 changes: 19 additions & 0 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sparkapplication
import (
"context"
"fmt"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -718,6 +719,9 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error
// Try submitting the application by running spark-submit.
logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs)
submitted, err := runSparkSubmit(newSubmission(sparkSubmitArgs, app))
if err := r.cleanUpPodTemplateFiles(app); err != nil {
return fmt.Errorf("failed to clean up pod template files: %v", err)
}
if err != nil {
r.recordSparkApplicationEvent(app)
return fmt.Errorf("failed to run spark-submit: %v", err)
Expand Down Expand Up @@ -1228,3 +1232,18 @@ func (r *Reconciler) cleanUpOnTermination(_, newApp *v1beta2.SparkApplication) e
}
return nil
}

// cleanUpPodTemplateFiles cleans up the driver and executor pod template files.
func (r *Reconciler) cleanUpPodTemplateFiles(app *v1beta2.SparkApplication) error {
if app.Spec.Driver.Template == nil && app.Spec.Executor.Template == nil {
return nil
}
path := fmt.Sprintf("/tmp/spark/%s", app.Status.SubmissionID)
if err := os.RemoveAll(path); err != nil {
if !os.IsNotExist(err) {
return err
}
}
logger.V(1).Info("Deleted pod template files", "path", path)
return nil
}
60 changes: 58 additions & 2 deletions internal/controller/sparkapplication/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ func buildSparkSubmitArgs(app *v1beta2.SparkApplication) ([]string, error) {
submissionWaitAppCompletionOption,
sparkConfOption,
hadoopConfOption,
driverPodTemplateOption,
driverPodNameOption,
driverConfOption,
driverSecretOption,
driverEnvOption,
driverSecretOption,
driverVolumeMountsOption,
executorPodTemplateOption,
executorConfOption,
executorEnvOption,
executorSecretOption,
executorVolumeMountsOption,
executorEnvOption,
nodeSelectorOption,
dynamicAllocationOption,
proxyUserOption,
Expand Down Expand Up @@ -303,6 +305,12 @@ func driverConfOption(app *v1beta2.SparkApplication) ([]string, error) {
property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelLaunchedBySparkOperator)
args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true"))

// If Spark version is less than 3.0.0 or driver pod template is not defined, then the driver pod needs to be mutated by the webhook.
if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Driver.Template == nil {
property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelMutatedBySparkOperator)
args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true"))
}

property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelSubmissionID)
args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID))

Expand Down Expand Up @@ -646,6 +654,12 @@ func executorConfOption(app *v1beta2.SparkApplication) ([]string, error) {
property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelLaunchedBySparkOperator)
args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true"))

// If Spark version is less than 3.0.0 or executor pod template is not defined, then the executor pods need to be mutated by the webhook.
if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Executor.Template == nil {
property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelMutatedBySparkOperator)
args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true"))
}

property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelSubmissionID)
args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID))

Expand Down Expand Up @@ -1022,3 +1036,45 @@ func mainApplicationFileOption(app *v1beta2.SparkApplication) ([]string, error)
func applicationOption(app *v1beta2.SparkApplication) ([]string, error) {
return app.Spec.Arguments, nil
}

// driverPodTemplateOption returns the driver pod template arguments.
func driverPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) {
if app.Spec.Driver.Template == nil {
return []string{}, nil
}

podTemplateFile := fmt.Sprintf("/tmp/spark/%s/driver-pod-template.yaml", app.Status.SubmissionID)
if err := util.WriteObjectToFile(app.Spec.Driver.Template, podTemplateFile); err != nil {
return []string{}, err
}
logger.V(1).Info("Created driver pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile)

args := []string{
"--conf",
fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateFile, podTemplateFile),
"--conf",
fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateContainerName, common.SparkDriverContainerName),
}
return args, nil
}

// executorPodTemplateOption returns the executor pod template arguments.
func executorPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) {
if app.Spec.Executor.Template == nil {
return []string{}, nil
}

podTemplateFile := fmt.Sprintf("/tmp/spark/%s/executor-pod-template.yaml", app.Status.SubmissionID)
if err := util.WriteObjectToFile(app.Spec.Executor.Template, podTemplateFile); err != nil {
return []string{}, err
}
logger.V(1).Info("Created executor pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile)

args := []string{
"--conf",
fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateFile, podTemplateFile),
"--conf",
fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateContainerName, common.Spark3DefaultExecutorContainerName),
}
return args, nil
}
23 changes: 0 additions & 23 deletions internal/webhook/sparkapplication_defaulter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,32 +83,9 @@ func defaultSparkApplication(app *v1beta2.SparkApplication) {
}

func defaultDriverSpec(app *v1beta2.SparkApplication) {
if app.Spec.Driver.Cores == nil {
if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverCores] == "" {
app.Spec.Driver.Cores = util.Int32Ptr(1)
}
}

if app.Spec.Driver.Memory == nil {
if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverMemory] == "" {
app.Spec.Driver.Memory = util.StringPtr("1g")
}
}
}

func defaultExecutorSpec(app *v1beta2.SparkApplication) {
if app.Spec.Executor.Cores == nil {
if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorCores] == "" {
app.Spec.Executor.Cores = util.Int32Ptr(1)
}
}

if app.Spec.Executor.Memory == nil {
if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorMemory] == "" {
app.Spec.Executor.Memory = util.StringPtr("1g")
}
}

if app.Spec.Executor.Instances == nil {
// Check whether dynamic allocation is enabled in application spec.
enableDynamicAllocation := app.Spec.DynamicAllocation != nil && app.Spec.DynamicAllocation.Enabled
Expand Down
14 changes: 14 additions & 0 deletions internal/webhook/sparkapplication_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (v *SparkApplicationValidator) ValidateDelete(ctx context.Context, obj runt
func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2.SparkApplication) error {
logger.V(1).Info("Validating SparkApplication spec", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app))

if err := v.validateSparkVersion(app); err != nil {
return err
}

if app.Spec.NodeSelector != nil && (app.Spec.Driver.NodeSelector != nil || app.Spec.Executor.NodeSelector != nil) {
return fmt.Errorf("node selector cannot be defined at both SparkApplication and Driver/Executor")
}
Expand Down Expand Up @@ -144,6 +148,16 @@ func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2
return nil
}

func (v *SparkApplicationValidator) validateSparkVersion(app *v1beta2.SparkApplication) error {
// The pod template feature requires Spark version 3.0.0 or higher.
if app.Spec.Driver.Template != nil || app.Spec.Executor.Template != nil {
if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 {
return fmt.Errorf("pod template feature requires Spark version 3.0.0 or higher")
}
}
return nil
}

func (v *SparkApplicationValidator) validateResourceUsage(ctx context.Context, app *v1beta2.SparkApplication) error {
logger.V(1).Info("Validating SparkApplication resource usage", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app))

Expand Down
3 changes: 3 additions & 0 deletions pkg/common/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ const (
// LabelLaunchedBySparkOperator is a label on Spark pods launched through the Spark Operator.
LabelLaunchedBySparkOperator = LabelAnnotationPrefix + "launched-by-spark-operator"

// LabelMutatedBySparkOperator is a label on Spark pods that need to be mutated by webhook.
LabelMutatedBySparkOperator = LabelAnnotationPrefix + "mutated-by-spark-operator"

// LabelSubmissionID is the label that records the submission ID of the current run of an application.
LabelSubmissionID = LabelAnnotationPrefix + "submission-id"

Expand Down
41 changes: 41 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ package util
import (
"fmt"
"os"
"path/filepath"
"strings"

"golang.org/x/mod/semver"
"sigs.k8s.io/yaml"

"github.com/kubeflow/spark-operator/pkg/common"
)

Expand Down Expand Up @@ -77,3 +81,40 @@ func Int64Ptr(n int64) *int64 {
func StringPtr(s string) *string {
return &s
}

// CompareSemanticVersion compares two semantic versions.
func CompareSemanticVersion(v1, v2 string) int {
// Add 'v' prefix if needed
addPrefix := func(s string) string {
if !strings.HasPrefix(s, "v") {
return "v" + s
}
return s
}
return semver.Compare(addPrefix(v1), addPrefix(v2))
}

// WriteObjectToFile marshals the given object into a YAML document and writes it to the given file.
func WriteObjectToFile(obj interface{}, filePath string) error {
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
return err
}

file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()

data, err := yaml.Marshal(obj)
if err != nil {
return err
}

_, err = file.Write(data)
if err != nil {
return err
}

return nil
}
72 changes: 72 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kubeflow/spark-operator/pkg/common"
"github.com/kubeflow/spark-operator/pkg/util"
Expand Down Expand Up @@ -129,3 +131,73 @@ var _ = Describe("StringPtr", func() {
Expect(util.StringPtr(s)).To(Equal(&s))
})
})

var _ = Describe("CompareSemanticVersions", func() {
It("Should return 0 if the two versions are equal", func() {
Expect(util.CompareSemanticVersion("1.2.3", "1.2.3"))
Expect(util.CompareSemanticVersion("1.2.3", "v1.2.3")).To(Equal(0))
})

It("Should return -1 if the first version is less than the second version", func() {
Expect(util.CompareSemanticVersion("2.3.4", "2.4.5")).To(Equal(-1))
Expect(util.CompareSemanticVersion("2.4.5", "2.4.8")).To(Equal(-1))
Expect(util.CompareSemanticVersion("2.4.8", "3.5.2")).To(Equal(-1))
})

It("Should return +1 if the first version is greater than the second version", func() {
Expect(util.CompareSemanticVersion("2.4.5", "2.3.4")).To(Equal(1))
Expect(util.CompareSemanticVersion("2.4.8", "2.4.5")).To(Equal(1))
Expect(util.CompareSemanticVersion("3.5.2", "2.4.8")).To(Equal(1))
})
})

var _ = Describe("WriteObjectToFile", func() {
It("Should write the object to the file", func() {
podTemplate := &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: map[string]string{
"key1": "value1",
"key2": "value2",
},
Annotations: map[string]string{
"key3": "value3",
"key4": "value4",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
},
}

expected := `metadata:
annotations:
key3: value3
key4: value4
creationTimestamp: null
labels:
key1: value1
key2: value2
name: test-pod
spec:
containers:
- image: test-image
name: test-container
resources: {}
`
file := "pod-template.yaml"
Expect(util.WriteObjectToFile(podTemplate, file)).To(Succeed())

data, err := os.ReadFile(file)
Expect(err).NotTo(HaveOccurred())
actual := string(data)

Expect(actual).To(Equal(expected))
Expect(os.Remove(file)).NotTo(HaveOccurred())
})
})

0 comments on commit acae606

Please sign in to comment.