diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
index cb9c78f9d36e..d1f39e1ad784 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
@@ -148,5 +148,10 @@
dolphinscheduler-datasource-sagemaker${project.version}
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-datasource-k8s
+ ${project.version}
+
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml
new file mode 100644
index 000000000000..2d51b85fed62
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml
@@ -0,0 +1,50 @@
+
+
+
+ 4.0.0
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-datasource-plugin
+ dev-SNAPSHOT
+
+
+ dolphinscheduler-datasource-k8s
+ jar
+ ${project.artifactId}
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-spi
+ provided
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-datasource-api
+ ${project.version}
+
+
+
+ io.fabric8
+ kubernetes-client
+
+
+
+
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java
new file mode 100644
index 000000000000..292eaf2f3d32
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
+
+import lombok.extern.slf4j.Slf4j;
+import io.fabric8.kubernetes.api.model.NamespaceList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+
+@Slf4j
+public class K8sClientWrapper implements AutoCloseable {
+
+ private KubernetesClient client;
+
+ public K8sClientWrapper() {
+ }
+
+ public boolean checkConnect(String kubeConfigYaml, String namespace) {
+ try {
+ Config config = Config.fromKubeconfig(kubeConfigYaml);
+ client = new KubernetesClientBuilder().withConfig(config).build();
+ NamespaceList namespaceList = client.namespaces().list();
+ if (!namespaceList.getItems().stream().anyMatch(ns -> ns.getMetadata().getName().equals(namespace))) {
+ log.info("failed to connect to the K8S cluster, namespace not found\n");
+ return false;
+ }
+ log.info("successfully connected to the K8S cluster");
+ return true;
+ } catch (Exception e) {
+ log.info("failed to connect to the K8S cluster\n");
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java
new file mode 100644
index 000000000000..3dac63925921
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
+
+import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+public class K8sDataSourceChannel implements DataSourceChannel {
+
+ @Override
+ public AdHocDataSourceClient createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
+ throw new UnsupportedOperationException("K8S AdHocDataSourceClient is not supported");
+ }
+
+ @Override
+ public PooledDataSourceClient createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
+ throw new UnsupportedOperationException("K8S AdHocDataSourceClient is not supported");
+ }
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java
new file mode 100644
index 000000000000..03ec046de84e
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
+
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceChannelFactory.class)
+public class K8sDataSourceChannelFactory implements DataSourceChannelFactory {
+
+ @Override
+ public DataSourceChannel create() {
+ return new K8sDataSourceChannel();
+ }
+
+ @Override
+ public String getName() {
+ return "k8s";
+ }
+
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java
new file mode 100644
index 000000000000..09e3e706a14a
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s.param;
+
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@Data
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class K8sConnectionParam implements ConnectionParam {
+
+ protected String kubeConfig;
+ protected String namespace;
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java
new file mode 100644
index 000000000000..f2ea50a40bfe
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s.param;
+
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import lombok.Data;
+
+@Data
+public class K8sDataSourceParamDTO extends BaseDataSourceParamDTO {
+
+ protected String kubeConfig;
+ protected String namespace;
+
+ @Override
+ public DbType getType() {
+ return DbType.K8S;
+ }
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java
new file mode 100644
index 000000000000..8f61e8bb45ad
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s.param;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
+import org.apache.dolphinscheduler.plugin.datasource.k8s.K8sClientWrapper;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.MessageFormat;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceProcessor.class)
+@Slf4j
+public class K8sDataSourceProcessor implements DataSourceProcessor {
+
+ @Override
+ public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+ return JSONUtils.parseObject(paramJson, K8sDataSourceParamDTO.class);
+
+ }
+
+ @Override
+ public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam) {
+ K8sDataSourceParamDTO k8sDataSourceParamDTO = (K8sDataSourceParamDTO) datasourceParam;
+ if (StringUtils.isEmpty(k8sDataSourceParamDTO.getKubeConfig())) {
+ throw new IllegalArgumentException("sagemaker datasource param is not valid");
+ }
+ }
+
+ @Override
+ public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) {
+ K8sConnectionParam baseConnectionParam = (K8sConnectionParam) connectionParam;
+ return MessageFormat.format("{0}@{1}@{2}", dbType.getDescp(),
+ PasswordUtils.encodePassword(baseConnectionParam.getKubeConfig()), baseConnectionParam.getNamespace());
+ }
+
+ @Override
+ public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
+ K8sConnectionParam connectionParams = (K8sConnectionParam) createConnectionParams(connectionJson);
+ K8sDataSourceParamDTO k8sDataSourceParamDTO = new K8sDataSourceParamDTO();
+ k8sDataSourceParamDTO.setKubeConfig(connectionParams.getKubeConfig());
+ k8sDataSourceParamDTO.setNamespace(connectionParams.getNamespace());
+ return k8sDataSourceParamDTO;
+ }
+
+ @Override
+ public K8sConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
+ K8sDataSourceParamDTO k8sDataSourceParam = (K8sDataSourceParamDTO) datasourceParam;
+ K8sConnectionParam k8sConnectionParam = new K8sConnectionParam();
+ k8sConnectionParam.setKubeConfig(k8sDataSourceParam.getKubeConfig());
+ k8sConnectionParam.setNamespace(k8sDataSourceParam.getNamespace());
+ return k8sConnectionParam;
+ }
+
+ @Override
+ public ConnectionParam createConnectionParams(String connectionJson) {
+ return JSONUtils.parseObject(connectionJson, K8sConnectionParam.class);
+ }
+
+ @Override
+ public String getDatasourceDriver() {
+ return "";
+ }
+
+ @Override
+ public String getValidationQuery() {
+ return "";
+ }
+
+ @Override
+ public String getJdbcUrl(ConnectionParam connectionParam) {
+ return "";
+ }
+
+ @Override
+ public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException, IOException {
+ return null;
+ }
+
+ @Override
+ public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) {
+ K8sConnectionParam baseConnectionParam = (K8sConnectionParam) connectionParam;
+ try (
+ K8sClientWrapper k8sClientWrapper = new K8sClientWrapper()) {
+ return k8sClientWrapper.checkConnect(baseConnectionParam.kubeConfig, baseConnectionParam.namespace);
+ } catch (Exception e) {
+ log.error("failed to connect to the K8S cluster", e);
+ return false;
+ }
+ }
+
+ @Override
+ public DbType getDbType() {
+ return DbType.K8S;
+ }
+
+ @Override
+ public DataSourceProcessor create() {
+ return new K8sDataSourceProcessor();
+ }
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java
new file mode 100644
index 000000000000..fe41f97686fd
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
+
+import org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sConnectionParam;
+import org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sDataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class K8sDataSourceProcessorTest {
+
+ private K8sDataSourceProcessor k8sDataSourceProcessor;
+
+ private String connectJson =
+ "{\"namespace\":\"namespace\",\"kubeConfig\":\"kubeConfig\"}";
+
+ @BeforeEach
+ public void init() {
+ k8sDataSourceProcessor = new K8sDataSourceProcessor();
+ }
+
+ @Test
+ void testCheckDatasourceParam() {
+ K8sDataSourceParamDTO k8sDataSourceParamDTO = new K8sDataSourceParamDTO();
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO));
+ k8sDataSourceParamDTO.setNamespace("namespace");
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO));
+ k8sDataSourceParamDTO.setKubeConfig("kubeConfig");
+ Assertions
+ .assertDoesNotThrow(
+ () -> k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO));
+ }
+
+ @Test
+ void testGetDatasourceUniqueId() {
+ K8sConnectionParam k8sConnectionParam = new K8sConnectionParam();
+ k8sConnectionParam.setNamespace("namespace");
+ k8sConnectionParam.setKubeConfig("kubeConfig");
+ Assertions.assertEquals("k8s@kubeConfig@namespace",
+ k8sDataSourceProcessor.getDatasourceUniqueId(k8sConnectionParam, DbType.K8S));
+
+ }
+
+ @Test
+ void testCreateDatasourceParamDTO() {
+ K8sDataSourceParamDTO k8sDataSourceParamDTO =
+ (K8sDataSourceParamDTO) k8sDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ Assertions.assertEquals("namespace", k8sDataSourceParamDTO.getNamespace());
+ Assertions.assertEquals("kubeConfig", k8sDataSourceParamDTO.getKubeConfig());
+ }
+
+ @Test
+ void testCreateConnectionParams() {
+ K8sDataSourceParamDTO k8sDataSourceParamDTO =
+ (K8sDataSourceParamDTO) k8sDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ K8sConnectionParam k8sConnectionParam =
+ k8sDataSourceProcessor.createConnectionParams(k8sDataSourceParamDTO);
+ Assertions.assertEquals("namespace", k8sConnectionParam.getNamespace());
+ Assertions.assertEquals("kubeConfig", k8sConnectionParam.getKubeConfig());
+ }
+
+ @Test
+ void testTestConnection() {
+ K8sDataSourceParamDTO k8sDataSourceParamDTO =
+ (K8sDataSourceParamDTO) k8sDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ K8sConnectionParam connectionParam =
+ k8sDataSourceProcessor.createConnectionParams(k8sDataSourceParamDTO);
+ Assertions.assertFalse(k8sDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+
+ try (
+ MockedConstruction k8sClientWrapperMockedConstruction =
+ Mockito.mockConstruction(K8sClientWrapper.class, (mock, context) -> {
+ Mockito.when(
+ mock.checkConnect(connectionParam.getKubeConfig(), connectionParam.getNamespace()))
+ .thenReturn(true);
+ })) {
+ Assertions.assertTrue(k8sDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+ }
+
+ }
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
index 424f4786997e..395e7e5d3ffc 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
@@ -31,7 +31,6 @@
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
-
public class SagemakerDataSourceProcessorTest {
private SagemakerDataSourceProcessor sagemakerDataSourceProcessor;
diff --git a/dolphinscheduler-datasource-plugin/pom.xml b/dolphinscheduler-datasource-plugin/pom.xml
index fb08aa7f6dd4..f5c59cfab5c5 100644
--- a/dolphinscheduler-datasource-plugin/pom.xml
+++ b/dolphinscheduler-datasource-plugin/pom.xml
@@ -54,6 +54,7 @@
dolphinscheduler-datasource-verticadolphinscheduler-datasource-dorisdolphinscheduler-datasource-sagemaker
+ dolphinscheduler-datasource-k8s
diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
index 3b8633443ddb..bd2f7e795bd7 100644
--- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
+++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
@@ -193,5 +193,11 @@ public class CreateDataSourceForm {
})
private WebElement inputZeppelinRestEndpoint;
+ @FindBys({
+ @FindBy(className = "input-kubeConfig"),
+ @FindBy(tagName = "textarea"),
+ })
+ private WebElement inputKubeConfig;
+
}
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
index 2dd895892401..e7ebbeee0a2a 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
@@ -53,7 +53,9 @@ public enum DbType {
HANA(22, "hana"),
DORIS(23, "doris"),
ZEPPELIN(24, "zeppelin"),
- SAGEMAKER(25, "sagemaker");
+ SAGEMAKER(25, "sagemaker"),
+
+ K8S(26, "k8s");
private static final Map DB_TYPE_MAP =
Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, Functions.identity()));
@EnumValue
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
index aa5bf62fe2b8..4138e5465fd7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
@@ -19,7 +19,7 @@
import java.io.Serializable;
-import lombok.Value;
+import lombok.Data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -27,13 +27,18 @@
/**
* k8s Task ExecutionContext
*/
-@Value
+@Data
public class K8sTaskExecutionContext implements Serializable {
private String configYaml;
private String namespace;
+ private String connectionParams;
+
+ public K8sTaskExecutionContext() {
+ }
+
@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public K8sTaskExecutionContext(
@JsonProperty("configYaml") String configYaml,
@@ -47,6 +52,7 @@ public String toString() {
return "K8sTaskExecutionContext{"
+ "namespace=" + namespace
+ ", configYaml='" + configYaml + '\''
+ + ", connectionParams='" + connectionParams + '\''
+ '}';
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
index ac1ce69f7608..a18637a4ffb0 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
@@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -131,6 +132,8 @@ private FilterWatchListDeletable getListenPod(Kuberne
private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
K8sTaskExecutionContext k8sTaskExecutionContext =
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
+ k8sTaskExecutionContext
+ .setConfigYaml(JSONUtils.getNodeString(k8sTaskExecutionContext.getConnectionParams(), "kubeConfig"));
return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
key -> new KubernetesClientBuilder()
.withConfig(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())).build());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
index a6c4703f10c9..8adc9121612f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
@@ -34,7 +34,6 @@ public class K8sTaskMainParameters {
private String args;
private String pullSecret;
private String namespaceName;
- private String clusterName;
private String imagePullPolicy;
private double minCpuCores;
private double minMemorySpace;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index 2a30e4e537f6..66665512b650 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -295,7 +295,9 @@ public TaskResponse run(String k8sParameterStr) throws Exception {
return result;
}
K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext();
- String configYaml = k8sTaskExecutionContext.getConfigYaml();
+ String connectionParams = k8sTaskExecutionContext.getConnectionParams();
+ String kubeConfig = JSONUtils.getNodeString(connectionParams, "kubeConfig");
+ String configYaml = kubeConfig;
k8sUtils.buildClient(configYaml);
submitJob2k8s(k8sParameterStr);
parsePodLogOutput();
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
index ba9f6498af53..d3d5e4963b65 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
@@ -17,21 +17,28 @@
package org.apache.dolphinscheduler.plugin.task.api.parameters;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.Label;
import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
/**
* k8s task parameters
*/
@Data
+@Slf4j
public class K8sTaskParameters extends AbstractParameters {
private String image;
@@ -44,14 +51,29 @@ public class K8sTaskParameters extends AbstractParameters {
private double minMemorySpace;
private List