Skip to content

Commit

Permalink
Add configuration spark.kubernetes.file.upload.path (#4829)
Browse files Browse the repository at this point in the history
* Set the default value for SPARK_K8S_CONFIG_FILE

* Add configuration spark.kubernetes.file.upload.path

* Delete useless parameters
  • Loading branch information
ChengJie1053 authored Jul 27, 2023
1 parent 5c4e779 commit 3dd9b0c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class SparkConfig {
private String k8sSparkVersion;

private String k8sNamespace;
private String k8sFileUploadPath;
private String deployMode = "client"; // ("client") // todo cluster
private String appResource; // ("")
private String appName; // ("")
Expand All @@ -73,6 +74,14 @@ public class SparkConfig {
private String keytab; // ("--keytab", "")
private String queue; // ("--queue", "")

public String getK8sFileUploadPath() {
return k8sFileUploadPath;
}

public void setK8sFileUploadPath(String k8sFileUploadPath) {
this.k8sFileUploadPath = k8sFileUploadPath;
}

public String getK8sImagePullPolicy() {
return k8sImagePullPolicy;
}
Expand Down Expand Up @@ -421,6 +430,9 @@ public String toString() {
+ ", k8sSparkVersion='"
+ k8sSparkVersion
+ '\''
+ ", k8sFileUploadPath='"
+ k8sFileUploadPath
+ '\''
+ ", k8sNamespace='"
+ k8sNamespace
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.*;
import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.launcher.SparkAppHandle;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -79,6 +81,7 @@ public void deployCluster(String mainClass, String args, Map<String, String> con

NonNamespaceOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>>
sparkApplicationClient = getSparkApplicationClient(client);

SparkApplication sparkApplication =
getSparkApplication(sparkConfig.getAppName(), sparkConfig.getK8sNamespace());

Expand All @@ -88,12 +91,19 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
.memory(sparkConfig.getDriverMemory())
.serviceAccount(sparkConfig.getK8sServiceAccount())
.build();

SparkPodSpec executor =
SparkPodSpec.Builder()
.cores(sparkConfig.getExecutorCores())
.instances(sparkConfig.getNumExecutors())
.memory(sparkConfig.getExecutorMemory())
.build();

Map<String, String> sparkConfMap = new HashMap<>();
sparkConfMap.put(
SparkConfiguration.SPARK_KUBERNETES_FILE_UPLOAD_PATH().key(),
sparkConfig.getK8sFileUploadPath());

SparkApplicationSpec sparkApplicationSpec =
SparkApplicationSpec.Builder()
.type(sparkConfig.getK8sLanguageType())
Expand All @@ -107,10 +117,12 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
.restartPolicy(new RestartPolicy(sparkConfig.getK8sRestartPolicy()))
.driver(driver)
.executor(executor)
.sparkConf(sparkConfMap)
.build();

logger.info("Spark k8s operator task parameters: {}", sparkApplicationSpec);
sparkApplication.setSpec(sparkApplicationSpec);

SparkApplication created = sparkApplicationClient.createOrReplace(sparkApplication);
logger.info("Preparing to submit the Spark k8s operator Task: {}", created);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.linkis.engineplugin.spark.client.deployment.crds;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.fabric8.kubernetes.api.model.KubernetesResource;

Expand Down Expand Up @@ -45,6 +47,16 @@ public class SparkApplicationSpec implements KubernetesResource {

private SparkPodSpec executor;

private Map<String, String> sparkConf;

public Map<String, String> getSparkConf() {
return sparkConf;
}

public void setSparkConf(Map<String, String> sparkConf) {
this.sparkConf = sparkConf;
}

public String getType() {
return type;
}
Expand Down Expand Up @@ -165,6 +177,8 @@ public String toString() {
+ driver
+ ", executor="
+ executor
+ ", sparkConf="
+ sparkConf
+ '}';
}

Expand All @@ -185,6 +199,8 @@ public static class SparkApplicationSpecBuilder {
private SparkPodSpec driver;
private SparkPodSpec executor;

private Map<String, String> sparkConf;

private SparkApplicationSpecBuilder() {}

public SparkApplicationSpecBuilder type(String type) {
Expand Down Expand Up @@ -242,6 +258,22 @@ public SparkApplicationSpecBuilder executor(SparkPodSpec executor) {
return this;
}

public SparkApplicationSpecBuilder sparkConf(Map<String, String> sparkConf) {
if (sparkConf == null || sparkConf.size() == 0) {
return this;
}

if (this.sparkConf == null) {
this.sparkConf = new HashMap<>();
}

for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
this.sparkConf.put(entry.getKey(), entry.getValue());
}

return this;
}

public SparkApplicationSpec build() {
SparkApplicationSpec sparkApplicationSpec = new SparkApplicationSpec();
sparkApplicationSpec.type = this.type;
Expand All @@ -255,6 +287,7 @@ public SparkApplicationSpec build() {
sparkApplicationSpec.executor = this.executor;
sparkApplicationSpec.image = this.image;
sparkApplicationSpec.restartPolicy = this.restartPolicy;
sparkApplicationSpec.sparkConf = this.sparkConf;
return sparkApplicationSpec;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ object SparkConfiguration extends Logging {
val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default")

val SPARK_KUBERNETES_FILE_UPLOAD_PATH =
CommonVars[String]("spark.kubernetes.file.upload.path", "local:///opt/spark/tmp")

val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")

val SPARK_PYTHON_TEST_MODE_ENABLE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sPassword(SPARK_K8S_PASSWORD.getValue(options))
sparkConfig.setK8sImage(SPARK_K8S_IMAGE.getValue(options))
sparkConfig.setK8sNamespace(SPARK_K8S_NAMESPACE.getValue(options))
sparkConfig.setK8sFileUploadPath(SPARK_KUBERNETES_FILE_UPLOAD_PATH.getValue(options))
sparkConfig.setK8sSparkVersion(SPARK_K8S_SPARK_VERSION.getValue(options))
sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))
Expand Down

0 comments on commit 3dd9b0c

Please sign in to comment.