From de8c7413403f2c4219e76a6c080c1fc22475ac87 Mon Sep 17 00:00:00 2001 From: xdu-chenrj Date: Tue, 31 Oct 2023 15:46:50 +0800 Subject: [PATCH] support k8s connection --- .../dolphinscheduler-datasource-all/pom.xml | 5 + .../dolphinscheduler-datasource-k8s/pom.xml | 50 +++++++ .../datasource/k8s/K8sClientWrapper.java | 55 ++++++++ .../datasource/k8s/K8sDataSourceChannel.java | 37 +++++ .../k8s/K8sDataSourceChannelFactory.java | 38 ++++++ .../k8s/param/K8sConnectionParam.java | 32 +++++ .../k8s/param/K8sDataSourceParamDTO.java | 35 +++++ .../k8s/param/K8sDataSourceProcessor.java | 128 ++++++++++++++++++ .../k8s/K8sDataSourceProcessorTest.java | 107 +++++++++++++++ .../SagemakerDataSourceProcessorTest.java | 1 - dolphinscheduler-datasource-plugin/pom.xml | 1 + .../e2e/pages/datasource/DataSourcePage.java | 6 + .../dolphinscheduler/spi/enums/DbType.java | 4 +- .../task/api/K8sTaskExecutionContext.java | 10 +- .../api/am/KubernetesApplicationManager.java | 3 + .../task/api/k8s/K8sTaskMainParameters.java | 1 - .../task/api/k8s/impl/K8sTaskExecutor.java | 4 +- .../api/parameters/K8sTaskParameters.java | 28 +++- .../task/api/k8s/K8sTaskExecutorTest.java | 11 +- .../plugin/task/k8s/K8sTask.java | 36 +++-- .../plugin/task/k8s/K8sTaskChannel.java | 2 +- .../plugin/task/k8s/K8sParametersTest.java | 2 +- .../plugin/task/k8s/K8sTaskTest.java | 82 +++++++---- .../task/sagemaker/SagemakerTaskTest.java | 1 - .../plugin/task/zeppelin/ZeppelinTask.java | 2 +- .../src/service/modules/data-source/types.ts | 4 + .../src/views/datasource/list/detail.tsx | 41 +++++- .../src/views/datasource/list/use-form.ts | 23 +++- .../task/components/node/fields/use-k8s.ts | 8 +- .../task/components/node/format-data.ts | 3 + .../task/components/node/tasks/use-k8s.ts | 7 +- 31 files changed, 697 insertions(+), 70 deletions(-) create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml index cb9c78f9d36e..d1f39e1ad784 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml @@ -148,5 +148,10 @@ dolphinscheduler-datasource-sagemaker ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-datasource-k8s + ${project.version} + diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml new file mode 100644 index 000000000000..2d51b85fed62 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml @@ -0,0 +1,50 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-datasource-plugin + dev-SNAPSHOT + + + dolphinscheduler-datasource-k8s + jar + ${project.artifactId} + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + + org.apache.dolphinscheduler + dolphinscheduler-datasource-api + ${project.version} + + + + io.fabric8 + kubernetes-client + + + + diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java new file mode 100644 index 000000000000..292eaf2f3d32 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.k8s; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.api.model.NamespaceList; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +@Slf4j +public class K8sClientWrapper implements AutoCloseable { + + private KubernetesClient client; + + public K8sClientWrapper() { + } + + public boolean checkConnect(String kubeConfigYaml, String namespace) { + try { + Config config = Config.fromKubeconfig(kubeConfigYaml); + client = new KubernetesClientBuilder().withConfig(config).build(); + NamespaceList namespaceList = client.namespaces().list(); + if (!namespaceList.getItems().stream().anyMatch(ns -> ns.getMetadata().getName().equals(namespace))) { + log.info("failed to connect to the K8S cluster, namespace not found\n"); + return false; + } + log.info("successfully connected to the K8S cluster"); + return true; + } catch (Exception e) { + log.info("failed to connect to the K8S cluster\n"); + return false; + } + } + + @Override + public void close() throws Exception { + + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java new file mode 100644 index 000000000000..3dac63925921 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.k8s; + +import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient; +import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient; +import org.apache.dolphinscheduler.spi.enums.DbType; + +public class K8sDataSourceChannel implements DataSourceChannel { + + @Override + public AdHocDataSourceClient createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("K8S AdHocDataSourceClient is not supported"); + } + + @Override + public PooledDataSourceClient createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("K8S AdHocDataSourceClient is not supported"); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java new file mode 100644 index 000000000000..03ec046de84e --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.k8s; + +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceChannelFactory.class) +public class K8sDataSourceChannelFactory implements DataSourceChannelFactory { + + @Override + public DataSourceChannel create() { + return new K8sDataSourceChannel(); + } + + @Override + public String getName() { + return "k8s"; + } + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java new file mode 100644 index 000000000000..09e3e706a14a --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.k8s.param; + +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonInclude; + +@Data +@JsonInclude(JsonInclude.Include.NON_NULL) +public class K8sConnectionParam implements ConnectionParam { + + protected String kubeConfig; + protected String namespace; +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java new file mode 100644 index 000000000000..f2ea50a40bfe --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.k8s.param; + +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import lombok.Data; + +@Data +public class K8sDataSourceParamDTO extends BaseDataSourceParamDTO { + + protected String kubeConfig; + protected String namespace; + + @Override + public DbType getType() { + return DbType.K8S; + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java new file mode 100644 index 000000000000..8f61e8bb45ad --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.k8s.param; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; +import org.apache.dolphinscheduler.plugin.datasource.k8s.K8sClientWrapper; +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.text.MessageFormat; + +import lombok.extern.slf4j.Slf4j; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceProcessor.class) +@Slf4j +public class K8sDataSourceProcessor implements DataSourceProcessor { + + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, K8sDataSourceParamDTO.class); + + } + + @Override + public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam) { + K8sDataSourceParamDTO k8sDataSourceParamDTO = (K8sDataSourceParamDTO) datasourceParam; + if (StringUtils.isEmpty(k8sDataSourceParamDTO.getKubeConfig())) { + throw new IllegalArgumentException("sagemaker datasource param is not valid"); + } + } + + @Override + public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) { + K8sConnectionParam baseConnectionParam = (K8sConnectionParam) connectionParam; + return MessageFormat.format("{0}@{1}@{2}", dbType.getDescp(), + PasswordUtils.encodePassword(baseConnectionParam.getKubeConfig()), baseConnectionParam.getNamespace()); + } + + @Override + public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { + K8sConnectionParam connectionParams = (K8sConnectionParam) createConnectionParams(connectionJson); + K8sDataSourceParamDTO k8sDataSourceParamDTO = new K8sDataSourceParamDTO(); + k8sDataSourceParamDTO.setKubeConfig(connectionParams.getKubeConfig()); + k8sDataSourceParamDTO.setNamespace(connectionParams.getNamespace()); + return k8sDataSourceParamDTO; + } + + @Override + public K8sConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) { + K8sDataSourceParamDTO k8sDataSourceParam = (K8sDataSourceParamDTO) datasourceParam; + K8sConnectionParam k8sConnectionParam = new K8sConnectionParam(); + k8sConnectionParam.setKubeConfig(k8sDataSourceParam.getKubeConfig()); + k8sConnectionParam.setNamespace(k8sDataSourceParam.getNamespace()); + return k8sConnectionParam; + } + + @Override + public ConnectionParam createConnectionParams(String connectionJson) { + return JSONUtils.parseObject(connectionJson, K8sConnectionParam.class); + } + + @Override + public String getDatasourceDriver() { + return ""; + } + + @Override + public String getValidationQuery() { + return ""; + } + + @Override + public String getJdbcUrl(ConnectionParam connectionParam) { + return ""; + } + + @Override + public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException, IOException { + return null; + } + + @Override + public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) { + K8sConnectionParam baseConnectionParam = (K8sConnectionParam) connectionParam; + try ( + K8sClientWrapper k8sClientWrapper = new K8sClientWrapper()) { + return k8sClientWrapper.checkConnect(baseConnectionParam.kubeConfig, baseConnectionParam.namespace); + } catch (Exception e) { + log.error("failed to connect to the K8S cluster", e); + return false; + } + } + + @Override + public DbType getDbType() { + return DbType.K8S; + } + + @Override + public DataSourceProcessor create() { + return new K8sDataSourceProcessor(); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java new file mode 100644 index 000000000000..fe41f97686fd --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.k8s; + +import org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sConnectionParam; +import org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sDataSourceProcessor; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class K8sDataSourceProcessorTest { + + private K8sDataSourceProcessor k8sDataSourceProcessor; + + private String connectJson = + "{\"namespace\":\"namespace\",\"kubeConfig\":\"kubeConfig\"}"; + + @BeforeEach + public void init() { + k8sDataSourceProcessor = new K8sDataSourceProcessor(); + } + + @Test + void testCheckDatasourceParam() { + K8sDataSourceParamDTO k8sDataSourceParamDTO = new K8sDataSourceParamDTO(); + Assertions.assertThrows(IllegalArgumentException.class, + () -> k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO)); + k8sDataSourceParamDTO.setNamespace("namespace"); + Assertions.assertThrows(IllegalArgumentException.class, + () -> k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO)); + k8sDataSourceParamDTO.setKubeConfig("kubeConfig"); + Assertions + .assertDoesNotThrow( + () -> k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO)); + } + + @Test + void testGetDatasourceUniqueId() { + K8sConnectionParam k8sConnectionParam = new K8sConnectionParam(); + k8sConnectionParam.setNamespace("namespace"); + k8sConnectionParam.setKubeConfig("kubeConfig"); + Assertions.assertEquals("k8s@kubeConfig@namespace", + k8sDataSourceProcessor.getDatasourceUniqueId(k8sConnectionParam, DbType.K8S)); + + } + + @Test + void testCreateDatasourceParamDTO() { + K8sDataSourceParamDTO k8sDataSourceParamDTO = + (K8sDataSourceParamDTO) k8sDataSourceProcessor.createDatasourceParamDTO(connectJson); + Assertions.assertEquals("namespace", k8sDataSourceParamDTO.getNamespace()); + Assertions.assertEquals("kubeConfig", k8sDataSourceParamDTO.getKubeConfig()); + } + + @Test + void testCreateConnectionParams() { + K8sDataSourceParamDTO k8sDataSourceParamDTO = + (K8sDataSourceParamDTO) k8sDataSourceProcessor.createDatasourceParamDTO(connectJson); + K8sConnectionParam k8sConnectionParam = + k8sDataSourceProcessor.createConnectionParams(k8sDataSourceParamDTO); + Assertions.assertEquals("namespace", k8sConnectionParam.getNamespace()); + Assertions.assertEquals("kubeConfig", k8sConnectionParam.getKubeConfig()); + } + + @Test + void testTestConnection() { + K8sDataSourceParamDTO k8sDataSourceParamDTO = + (K8sDataSourceParamDTO) k8sDataSourceProcessor.createDatasourceParamDTO(connectJson); + K8sConnectionParam connectionParam = + k8sDataSourceProcessor.createConnectionParams(k8sDataSourceParamDTO); + Assertions.assertFalse(k8sDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + + try ( + MockedConstruction k8sClientWrapperMockedConstruction = + Mockito.mockConstruction(K8sClientWrapper.class, (mock, context) -> { + Mockito.when( + mock.checkConnect(connectionParam.getKubeConfig(), connectionParam.getNamespace())) + .thenReturn(true); + })) { + Assertions.assertTrue(k8sDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + } + + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java index 424f4786997e..395e7e5d3ffc 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java @@ -31,7 +31,6 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) - public class SagemakerDataSourceProcessorTest { private SagemakerDataSourceProcessor sagemakerDataSourceProcessor; diff --git a/dolphinscheduler-datasource-plugin/pom.xml b/dolphinscheduler-datasource-plugin/pom.xml index fb08aa7f6dd4..f5c59cfab5c5 100644 --- a/dolphinscheduler-datasource-plugin/pom.xml +++ b/dolphinscheduler-datasource-plugin/pom.xml @@ -54,6 +54,7 @@ dolphinscheduler-datasource-vertica dolphinscheduler-datasource-doris dolphinscheduler-datasource-sagemaker + dolphinscheduler-datasource-k8s diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java index 3b8633443ddb..bd2f7e795bd7 100644 --- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java +++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java @@ -193,5 +193,11 @@ public class CreateDataSourceForm { }) private WebElement inputZeppelinRestEndpoint; + @FindBys({ + @FindBy(className = "input-kubeConfig"), + @FindBy(tagName = "textarea"), + }) + private WebElement inputKubeConfig; + } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java index 2dd895892401..e7ebbeee0a2a 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java @@ -53,7 +53,9 @@ public enum DbType { HANA(22, "hana"), DORIS(23, "doris"), ZEPPELIN(24, "zeppelin"), - SAGEMAKER(25, "sagemaker"); + SAGEMAKER(25, "sagemaker"), + + K8S(26, "k8s"); private static final Map DB_TYPE_MAP = Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, Functions.identity())); @EnumValue diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java index aa5bf62fe2b8..4138e5465fd7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java @@ -19,7 +19,7 @@ import java.io.Serializable; -import lombok.Value; +import lombok.Data; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -27,13 +27,18 @@ /** * k8s Task ExecutionContext */ -@Value +@Data public class K8sTaskExecutionContext implements Serializable { private String configYaml; private String namespace; + private String connectionParams; + + public K8sTaskExecutionContext() { + } + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) public K8sTaskExecutionContext( @JsonProperty("configYaml") String configYaml, @@ -47,6 +52,7 @@ public String toString() { return "K8sTaskExecutionContext{" + "namespace=" + namespace + ", configYaml='" + configYaml + '\'' + + ", connectionParams='" + connectionParams + '\'' + '}'; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index ac1ce69f7608..a18637a4ffb0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ResourceManagerType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -131,6 +132,8 @@ private FilterWatchListDeletable getListenPod(Kuberne private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { K8sTaskExecutionContext k8sTaskExecutionContext = kubernetesApplicationManagerContext.getK8sTaskExecutionContext(); + k8sTaskExecutionContext + .setConfigYaml(JSONUtils.getNodeString(k8sTaskExecutionContext.getConnectionParams(), "kubeConfig")); return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(), key -> new KubernetesClientBuilder() .withConfig(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())).build()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java index a6c4703f10c9..8adc9121612f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java @@ -34,7 +34,6 @@ public class K8sTaskMainParameters { private String args; private String pullSecret; private String namespaceName; - private String clusterName; private String imagePullPolicy; private double minCpuCores; private double minMemorySpace; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 2a30e4e537f6..66665512b650 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -295,7 +295,9 @@ public TaskResponse run(String k8sParameterStr) throws Exception { return result; } K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext(); - String configYaml = k8sTaskExecutionContext.getConfigYaml(); + String connectionParams = k8sTaskExecutionContext.getConnectionParams(); + String kubeConfig = JSONUtils.getNodeString(connectionParams, "kubeConfig"); + String configYaml = kubeConfig; k8sUtils.buildClient(configYaml); submitJob2k8s(k8sParameterStr); parsePodLogOutput(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java index ba9f6498af53..d3d5e4963b65 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java @@ -17,21 +17,28 @@ package org.apache.dolphinscheduler.plugin.task.api.parameters; +import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; import org.apache.dolphinscheduler.plugin.task.api.model.Label; import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import lombok.Data; +import lombok.extern.slf4j.Slf4j; /** * k8s task parameters */ @Data +@Slf4j public class K8sTaskParameters extends AbstractParameters { private String image; @@ -44,14 +51,29 @@ public class K8sTaskParameters extends AbstractParameters { private double minMemorySpace; private List