Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][K8S Task] support node selector #14126

Merged
merged 2 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions docs/docs/en/guide/task/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ K8S task type used to execute a batch task. In this task, the worker submits the

- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.

| **Parameter** | **Description** |
|------------------|------------------------------------------------------------------------------------------------------------------|
| Namespace | The namespace for running k8s task. |
| Min CPU | Minimum CPU requirement for running k8s task. |
| Min Memory | Minimum memory requirement for running k8s task. |
| Image | The registry url for image. |
| Command | The container execution command (yaml-style array), for example: ["printenv"] |
| Args | The args of execution command (yaml-style array), for example: ["HOSTNAME", "KUBERNETES_PORT"] |
| Custom label | The customized labels for k8s Job. |
| Custom parameter | It is a local user-defined parameter for K8S task, these params will pass to container as environment variables. |
| **Parameter** | **Description** |
|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Namespace | The namespace for running k8s task. |
| Min CPU | Minimum CPU requirement for running k8s task. |
| Min Memory | Minimum memory requirement for running k8s task. |
| Image | The registry url for image. |
| Command | The container execution command (yaml-style array), for example: ["printenv"] |
| Args | The args of execution command (yaml-style array), for example: ["HOSTNAME", "KUBERNETES_PORT"] |
| Custom label | The customized labels for k8s Job. |
| Node selector | The label selectors for running k8s pod. Different value in value set should be seperated by command, for example: `value1,value2`. You can refer to https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/node-selector-requirement/ for configuration of different operators. |
| Custom parameter | It is a local user-defined parameter for K8S task, these params will pass to container as environment variables. |

## Task Example

Expand Down
21 changes: 11 additions & 10 deletions docs/docs/zh/guide/task/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ kubernetes任务类型,用于在kubernetes上执行一个短时和批处理的

- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。

| **任务参数** | **描述** |
|----------|-----------------------------------------------------------------|
| 命名空间 | 选择kubernetes集群上存在的命名空间 |
| 最小CPU | 任务在kubernetes上运行所需的最小CPU |
| 最小内存 | 任务在kubernetes上运行所需的最小内存 |
| 镜像 | 镜像地址 |
| 容器执行命令 | 容器执行命令(yaml格式数组),例如:["printenv"] |
| 执行命令参数 | 执行命令参数(yaml格式数组),例如:["HOSTNAME", "KUBERNETES_PORT"] |
| 自定义标签 | 作业自定义标签 |
| 自定义参数 | kubernetes任务局部的用户自定义参数,自定义参数最终会通过环境变量形式存在于容器中,提供给kubernetes任务使用 |
| **任务参数** | **描述** |
|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 命名空间 | 选择kubernetes集群上存在的命名空间 |
| 最小CPU | 任务在kubernetes上运行所需的最小CPU |
| 最小内存 | 任务在kubernetes上运行所需的最小内存 |
| 镜像 | 镜像地址 |
| 容器执行命令 | 容器执行命令(yaml格式数组),例如:["printenv"] |
| 执行命令参数 | 执行命令参数(yaml格式数组),例如:["HOSTNAME", "KUBERNETES_PORT"] |
| 自定义标签 | 作业自定义标签 |
| 节点选择器 | 定义Pod在kubernetes集群上运行的标签选择器,值集中不同表达式值使用逗号分割,例如:`value1,value2`,不同操作符配置方式可参考:https://kubernetes.io/zh-cn/docs/reference/kubernetes-api/common-definitions/node-selector-requirement/ |
| 自定义参数 | kubernetes任务局部的用户自定义参数,自定义参数最终会通过环境变量形式存在于容器中,提供给kubernetes任务使用 |

## 任务样例

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.dolphinscheduler.plugin.task.api.k8s;

import java.util.List;
import java.util.Map;

import lombok.Data;
import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;

/**
* k8s task parameters
Expand All @@ -36,4 +38,5 @@ public class K8sTaskMainParameters {
private double minMemorySpace;
private Map<String, String> paramsMap;
private Map<String, String> labelMap;
private List<NodeSelectorRequirement> nodeSelectorRequirements;
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.slf4j.Logger;

import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
Expand Down Expand Up @@ -125,6 +126,9 @@ public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
throw new TaskException("Parse yaml-like commands and args failed", e);
}

NodeSelectorTerm nodeSelectorTerm = new NodeSelectorTerm();
nodeSelectorTerm.setMatchExpressions(k8STaskMainParameters.getNodeSelectorRequirements());

return new JobBuilder()
.withApiVersion(API_VERSION)
.withNewMetadata()
Expand All @@ -146,6 +150,14 @@ public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
.withEnv(envVars)
.endContainer()
.withRestartPolicy(RESTART_POLICY)
.withNewAffinity()
.withNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTermLike(nodeSelectorTerm)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity()
.endAffinity()
.endSpec()
.endTemplate()
.withBackoffLimit(retryNum)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.api.model;

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class NodeSelectorExpression implements Serializable {

/**
* selector key
*/
private String key;

/**
* selector operator
*/
private String operator;

/**
* selector value
*/
private String values;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.api.parameters;

import org.apache.dolphinscheduler.plugin.task.api.model.Label;
import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -37,6 +38,7 @@ public class K8sTaskParameters extends AbstractParameters {
private String namespace;
private String command;
private List<Label> customizedLabels;
private List<NodeSelectorExpression> nodeSelectors;
private String args;
private double minCpuCores;
private double minMemorySpace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;

Expand All @@ -58,6 +60,11 @@ public void before() {
String clusterName = namespace.get(CLUSTER);
Map<String, String> labelMap = new HashMap<>();
labelMap.put("test", "1234");

NodeSelectorRequirement requirement = new NodeSelectorRequirement();
requirement.setKey("node-label");
requirement.setOperator("In");
requirement.setValues(Arrays.asList("1234", "123456"));
k8sTaskExecutor = new K8sTaskExecutor(null, taskRequest);
k8sTaskMainParameters = new K8sTaskMainParameters();
k8sTaskMainParameters.setImage(image);
Expand All @@ -67,6 +74,7 @@ public void before() {
k8sTaskMainParameters.setMinMemorySpace(minMemorySpace);
k8sTaskMainParameters.setCommand("[\"perl\" ,\"-Mbignum=bpi\", \"-wle\", \"print bpi(2000)\"]");
k8sTaskMainParameters.setLabelMap(labelMap);
k8sTaskMainParameters.setNodeSelectorRequirements(Arrays.asList(requirement));
job = k8sTaskExecutor.buildK8sJob(k8sTaskMainParameters);
}
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.k8s;

import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMMA;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand All @@ -26,17 +27,23 @@
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTask;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.Label;
import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;

public class K8sTask extends AbstractK8sTask {

Expand Down Expand Up @@ -87,18 +94,33 @@ protected String buildCommand() {
k8sTaskMainParameters.setMinMemorySpace(k8sTaskParameters.getMinMemorySpace());
k8sTaskMainParameters.setParamsMap(ParamUtils.convert(paramsMap));
k8sTaskMainParameters.setLabelMap(convertToLabelMap(k8sTaskParameters.getCustomizedLabels()));
k8sTaskMainParameters
.setNodeSelectorRequirements(convertToNodeSelectorRequirements(k8sTaskParameters.getNodeSelectors()));
k8sTaskMainParameters.setCommand(k8sTaskParameters.getCommand());
k8sTaskMainParameters.setArgs(k8sTaskParameters.getArgs());
return JSONUtils.toJsonString(k8sTaskMainParameters);
}

public Map<String, String> convertToLabelMap(List<Label> customizedLabels) {
if (CollectionUtils.isEmpty(customizedLabels)) {
public List<NodeSelectorRequirement> convertToNodeSelectorRequirements(List<NodeSelectorExpression> expressions) {
if (CollectionUtils.isEmpty(expressions)) {
return Collections.emptyList();
}

return expressions.stream().map(expression -> new NodeSelectorRequirement(
expression.getKey(),
expression.getOperator(),
StringUtils.isEmpty(expression.getValues()) ? Collections.emptyList()
: Arrays.asList(expression.getValues().trim().split(COMMA))))
.collect(Collectors.toList());
}

public Map<String, String> convertToLabelMap(List<Label> labels) {
if (CollectionUtils.isEmpty(labels)) {
return Collections.emptyMap();
}

HashMap<String, String> labelMap = new HashMap<>();
customizedLabels.forEach(label -> {
labels.forEach(label -> {
labelMap.put(label.getLabel(), label.getValue());
});
return labelMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.k8s;

import org.apache.dolphinscheduler.plugin.task.api.model.Label;
import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
import org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters;

import java.util.Arrays;
Expand All @@ -37,6 +38,8 @@ public class K8sParametersTest {
private final String command = "[\"/bin/bash\", \"-c\"]";
private final String args = "[\"echo hello world\"]";
private final List<Label> labels = Arrays.asList(new Label("test", "1234"));
private final List<NodeSelectorExpression> nodeSelectorExpressions =
Arrays.asList(new NodeSelectorExpression("node-label", "In", "1234,12345"));

@BeforeEach
public void before() {
Expand All @@ -48,6 +51,7 @@ public void before() {
k8sTaskParameters.setCommand(command);
k8sTaskParameters.setArgs(args);
k8sTaskParameters.setCustomizedLabels(labels);
k8sTaskParameters.setNodeSelectors(nodeSelectorExpressions);
}

@Test
Expand All @@ -70,6 +74,7 @@ public void testK8sParameters() {
Assertions.assertEquals(command, k8sTaskParameters.getCommand());
Assertions.assertEquals(args, k8sTaskParameters.getArgs());
Assertions.assertEquals(labels, k8sTaskParameters.getCustomizedLabels());
Assertions.assertEquals(nodeSelectorExpressions, k8sTaskParameters.getNodeSelectors());
}

}
Loading