From a0224b65f98ab66b0fed8a9598ea75df2623591d Mon Sep 17 00:00:00 2001 From: Mohammad Arshad Date: Wed, 14 Aug 2024 00:46:54 +0530 Subject: [PATCH] [Improvement][Seatunnel-web] Add support to execute seatunnel-web REST API e2e --- .gitignore | 1 + pom.xml | 3 +- .../datasource/AbstractDataSourceClient.java | 5 +- seatunnel-web-it/README.md | 13 ++ seatunnel-web-it/pom.xml | 99 +++++++++ .../app/common/SeaTunnelWebCluster.java | 95 +++++++++ .../app/common/SeatunnelWebTestingBase.java | 116 +++++++++++ .../seatunnel/app/common/TokenProvider.java | 40 ++++ .../ConnectorControllerWrapper.java | 64 ++++++ .../JobConfigControllerWrapper.java | 63 ++++++ .../JobDefinitionControllerWrapper.java | 75 +++++++ .../JobExecutorControllerWrapper.java | 50 +++++ .../JobMetricsControllerWrapper.java | 49 +++++ .../controller/JobTaskControllerWrapper.java | 189 ++++++++++++++++++ .../SeatunnelDatasourceControllerWrapper.java | 107 ++++++++++ .../app/controller/UserControllerWrapper.java | 58 ++++++ .../app/domain/ConnectorInfoDeserializer.java | 48 +++++ .../domain/PluginIdentifierDeserializer.java | 42 ++++ .../app/test/ConnectorControllerTest.java | 78 ++++++++ .../app/test/JobConfigControllerTest.java | 74 +++++++ .../app/test/JobDefinitionControllerTest.java | 82 ++++++++ .../app/test/JobExecutorControllerTest.java | 74 +++++++ .../app/test/JobMetricsControllerTest.java | 88 ++++++++ .../app/test/JobTaskControllerTest.java | 124 ++++++++++++ .../SeatunnelDatasourceControllerTest.java | 131 ++++++++++++ .../app/test/UserControllerTest.java | 104 ++++++++++ .../seatunnel/app/utils/JSONTestUtils.java | 151 ++++++++++++++ .../apache/seatunnel/app/utils/JobUtils.java | 141 +++++++++++++ .../src/test/resources/application.yml | 60 ++++++ .../src/test/resources/hazelcast-client.yaml | 27 +++ .../src/test/resources/hazelcast.yaml | 47 +++++ .../src/test/resources/logback-spring.xml | 48 +++++ .../src/test/resources/seatunnel.yaml | 36 ++++ tools/dependencies/known-dependencies.txt | 7 +- 34 files changed, 2386 insertions(+), 3 deletions(-) create mode 100644 seatunnel-web-it/README.md create mode 100644 seatunnel-web-it/pom.xml create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeaTunnelWebCluster.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/TokenProvider.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobMetricsControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/ConnectorInfoDeserializer.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/PluginIdentifierDeserializer.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/ConnectorControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobConfigControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobTaskControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/SeatunnelDatasourceControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/UserControllerTest.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JSONTestUtils.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java create mode 100644 seatunnel-web-it/src/test/resources/application.yml create mode 100644 seatunnel-web-it/src/test/resources/hazelcast-client.yaml create mode 100644 seatunnel-web-it/src/test/resources/hazelcast.yaml create mode 100644 seatunnel-web-it/src/test/resources/logback-spring.xml create mode 100644 seatunnel-web-it/src/test/resources/seatunnel.yaml diff --git a/.gitignore b/.gitignore index 9bf8b904b..fb8bd5d72 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,4 @@ spark-warehouse /seatunnel-ui/node/* /seatunnel-ui/node_modules /seatunnel-ui/node_modules/* +/seatunnel-web-it/profile/* diff --git a/pom.xml b/pom.xml index 520087420..880c3162b 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ seatunnel-server seatunnel-datasource seatunnel-web-dist + seatunnel-web-it @@ -68,7 +69,7 @@ 5.9.0 4.4 3.4 - 19.0 + 33.2.1-jre 3.10.0 4.2.0 2.3.6 diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java index 34fb2850e..53ec0f061 100644 --- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java +++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java @@ -221,7 +221,10 @@ public Pair getTableSyncMaxValue( } private ClassLoader getCustomClassloader(String pluginName) { - String getenv = System.getenv(ST_WEB_BASEDIR_PATH); + String getenv = + System.getenv(ST_WEB_BASEDIR_PATH) == null + ? System.getProperty(ST_WEB_BASEDIR_PATH) + : System.getenv(ST_WEB_BASEDIR_PATH); log.info("ST_WEB_BASEDIR_PATH is : " + getenv); String libPath = StringUtils.isEmpty(getenv) ? "/datasource" : (getenv + "/datasource"); diff --git a/seatunnel-web-it/README.md b/seatunnel-web-it/README.md new file mode 100644 index 000000000..9e6028f15 --- /dev/null +++ b/seatunnel-web-it/README.md @@ -0,0 +1,13 @@ +Build seatunnel-web +./mvnw clean install -DskipTests + +Update mysql database details in src/test/resources/application.yml and Run the seatunnel-web-it integration tests +./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -DSEATUNNEL_HOME=/some/path/apache-seatunnel-2.3.6 -DST_WEB_BASEDIR_PATH=seatunnel-web-dist/target/apache-seatunnel-web-1.0.0-SNAPSHOT/apache-seatunnel-web-1.0.0-SNAPSHOT +NOTE: Please remember to update the versions according to the latest supported versions. + +If you're using a version of Java higher than Java 8 for running the tests, add the following VM options: +-DitJvmArgs="--add-opens java.base/java.lang.invoke=ALL-UNNAMED". + +While running integrations tests from IDE, ensure following VM options are set +SEATUNNEL_HOME=/some/path/apache-seatunnel-2.3.6 +ST_WEB_BASEDIR_PATH=/some/path/seatunnel-web-dist/target/apache-seatunnel-web-1.0.0-SNAPSHOT/apache-seatunnel-web-1.0.0-SNAPSHOT diff --git a/seatunnel-web-it/pom.xml b/seatunnel-web-it/pom.xml new file mode 100644 index 000000000..328dffdda --- /dev/null +++ b/seatunnel-web-it/pom.xml @@ -0,0 +1,99 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-web + ${revision} + + + seatunnel-web-it + + true + -Xmx1024m + + + + + org.apache.seatunnel + seatunnel-engine-server + ${seatunnel-framework.version} + test + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + org.apache.seatunnel + seatunnel-server + ${project.version} + pom + test + + + org.apache.seatunnel + seatunnel-app + ${project.version} + test + + + mysql + mysql-connector-java + ${mysql-connector.version} + test + + + org.apache.seatunnel + seatunnel-hadoop3-3.1.4-uber + ${seatunnel-framework.version} + test + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipIT} + true + 1 + false + ${itJvmArgs} + + + + + diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeaTunnelWebCluster.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeaTunnelWebCluster.java new file mode 100644 index 000000000..a5b630a90 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeaTunnelWebCluster.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.common; + +import org.apache.seatunnel.app.SeatunnelApplication; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; + +import org.springframework.boot.SpringApplication; +import org.springframework.context.ConfigurableApplicationContext; + +import com.hazelcast.config.Config; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import com.hazelcast.logging.ILogger; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Slf4j +public class SeaTunnelWebCluster { + private SeaTunnelServer server; + private HazelcastInstanceImpl instance; + private ConfigurableApplicationContext applicationContext; + + public void start() { + String seatunnelHome = System.getProperty("SEATUNNEL_HOME"); + if (seatunnelHome == null) { + throw new RuntimeException( + "SEATUNNEL_HOME is not set. Please set it before running the tests."); + } + if (!new File(seatunnelHome).exists()) { + throw new RuntimeException( + seatunnelHome + + " does not exist. Please make sure it exists before running the tests"); + } + Config hazelcastConfig = Config.loadDefault(); + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.setHazelcastConfig(hazelcastConfig); + instance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + server = instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME); + ILogger LOGGER = instance.node.nodeEngine.getLogger(SeaTunnelWebCluster.class); + + // String[] args = {"--spring.profiles.active=h2"}; + String[] args = {}; + applicationContext = SpringApplication.run(SeatunnelApplication.class, args); + LOGGER.info("SeaTunnel-web server started."); + assertTrue(isRunning()); + } + + public boolean isRunning() { + return server.isMasterNode(); + } + + public void stop() { + try { + if (applicationContext != null) { + int exit = SpringApplication.exit(applicationContext); + log.info("Sea tunnel application exited with code: {}", exit); + } + } catch (Throwable throwable) { + log.error("Error stopping application context", throwable); + } + + try { + if (server != null) { + server.shutdown(true); + } + + if (instance != null) { + instance.shutdown(); + } + } catch (Exception e) { + log.error(ExceptionUtils.getMessage(e)); + } + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java new file mode 100644 index 000000000..19bd07467 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.common; + +import org.apache.seatunnel.app.domain.request.user.UserLoginReq; +import org.apache.seatunnel.app.domain.response.user.UserSimpleInfoRes; +import org.apache.seatunnel.app.utils.JSONTestUtils; +import org.apache.seatunnel.app.utils.JSONUtils; + +import com.fasterxml.jackson.core.type.TypeReference; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.net.HttpURLConnection; +import java.net.URL; + +public class SeatunnelWebTestingBase { + protected final String baseUrl = "http://localhost:8802/seatunnel/api/v1"; + + protected Result login(UserLoginReq userLoginReq) { + String requestBody = JSONUtils.toPrettyJsonString(userLoginReq); + String response = sendRequest(url("user/login"), requestBody, "POST"); + return JSONTestUtils.parseObject( + response, new TypeReference>() {}); + } + + protected String url(String path) { + return String.format("%s/%s?", baseUrl, path); + } + + protected String urlWithParam(String pathAndParam) { + return String.format("%s/%s", baseUrl, pathAndParam); + } + + protected String sendRequest(String url) { + return sendRequest(url, null, "GET"); + } + + protected String sendRequest(String url, String requestBody, String httpMethod) { + HttpURLConnection connection = null; + try { + URL urlObject = new URL(url); + connection = (HttpURLConnection) urlObject.openConnection(); + if ("PATCH".equalsIgnoreCase(httpMethod)) { + setRequestMethodUsingReflection(connection, "PATCH"); + } else { + connection.setRequestMethod(httpMethod); + } + + connection.setRequestProperty("Content-Type", "application/json"); + if (!url.endsWith("user/login?")) { + connection.setRequestProperty("token", TokenProvider.getToken()); + } + connection.setDoOutput(true); + if (requestBody != null) { + try (OutputStream os = connection.getOutputStream()) { + byte[] input = requestBody.getBytes("utf-8"); + os.write(input, 0, input.length); + } + } + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + return readResponse(connection); + } else { + String message = "API Request failed with status code: " + responseCode; + throw new RuntimeException(message); + } + + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + private static void setRequestMethodUsingReflection( + HttpURLConnection httpURLConnection, String method) throws Exception { + try { + Field methodField = HttpURLConnection.class.getDeclaredField("method"); + methodField.setAccessible(true); + methodField.set(httpURLConnection, method); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new Exception("Failed to set HTTP method to PATCH", e); + } + } + + private String readResponse(HttpURLConnection connection) throws IOException { + BufferedReader rd = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String line; + StringBuilder result = new StringBuilder(); + while ((line = rd.readLine()) != null) { + result.append(line); + } + rd.close(); + return result.toString(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/TokenProvider.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/TokenProvider.java new file mode 100644 index 000000000..7f58495a3 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/TokenProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.common; + +import org.apache.seatunnel.app.domain.request.user.UserLoginReq; +import org.apache.seatunnel.app.domain.response.user.UserSimpleInfoRes; + +public class TokenProvider { + private static String token; + + public static String getToken() { + if (token == null) { + initToken(); + } + return token; + } + + private static void initToken() { + SeatunnelWebTestingBase seatunnelWebTestingBase = new SeatunnelWebTestingBase(); + UserLoginReq userLoginReq = new UserLoginReq(); + userLoginReq.setUsername("admin"); + userLoginReq.setPassword("admin"); + Result loginResponse = seatunnelWebTestingBase.login(userLoginReq); + TokenProvider.token = loginResponse.getData().getToken(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java new file mode 100644 index 000000000..8a431febf --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo; +import org.apache.seatunnel.app.utils.JSONTestUtils; +import org.apache.seatunnel.app.utils.JSONUtils; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.List; + +public class ConnectorControllerWrapper extends SeatunnelWebTestingBase { + + public List listAllTransform() { + String response = sendRequest(url("connector/transforms")); + JsonNode data = JSONUtils.parseObject(response).findValue("data"); + return JSONTestUtils.toList(data.toString(), ConnectorInfo.class); + } + + public List listSource(String status) { + String response = sendRequest(urlWithParam("connector/sources?status=" + status)); + JsonNode data = JSONUtils.parseObject(response).findValue("data"); + return JSONTestUtils.toList(data.toString(), ConnectorInfo.class); + } + + public List listSink(String status) { + String response = sendRequest(urlWithParam("connector/sinks?status=" + status)); + JsonNode data = JSONUtils.parseObject(response).findValue("data"); + return JSONTestUtils.toList(data.toString(), ConnectorInfo.class); + } + + public Result sync() { + String response = sendRequest(url("connector/sync")); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result getConnectorFormStructure(String connectorType, String connectorName) { + String response = + sendRequest( + urlWithParam( + "connector/form?connectorType=" + + connectorType + + "&connectorName=" + + connectorName)); + return JSONTestUtils.parseObject(response, Result.class); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java new file mode 100644 index 000000000..55c5627cb --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.EngineType; +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.request.job.JobConfig; +import org.apache.seatunnel.app.domain.response.job.JobConfigRes; +import org.apache.seatunnel.app.utils.JSONTestUtils; +import org.apache.seatunnel.app.utils.JSONUtils; + +import org.apache.commons.collections.map.HashedMap; + +import com.fasterxml.jackson.core.type.TypeReference; + +import java.util.Map; + +public class JobConfigControllerWrapper extends SeatunnelWebTestingBase { + + public Result updateJobConfig(long jobVersionId, JobConfig jobConfig) { + String requestBody = JSONUtils.toPrettyJsonString(jobConfig); + String response = sendRequest(url("job/config/" + jobVersionId), requestBody, "PUT"); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result getJobConfig(long jobVersionId) { + String response = sendRequest(url("job/config/" + jobVersionId)); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public JobConfig populateJobConfigObject(String jobName) { + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(jobName); + jobConfig.setDescription(jobName + " description from config"); + jobConfig.setEngine(EngineType.SeaTunnel); + Map env = new HashedMap(); + env.put("job.mode", "BATCH"); + env.put("job.name", "SeaTunnel_Job"); + env.put("jars", ""); + env.put("checkpoint.interval", "30"); + env.put("checkpoint.timeout", ""); + env.put("read_limit.rows_per_second", ""); + env.put("read_limit.bytes_per_second", ""); + env.put("custom_parameters", ""); + jobConfig.setEnv(env); + return jobConfig; + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java new file mode 100644 index 000000000..de10511f8 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.request.connector.BusinessMode; +import org.apache.seatunnel.app.domain.request.job.JobReq; +import org.apache.seatunnel.app.domain.response.PageInfo; +import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes; +import org.apache.seatunnel.app.utils.JSONTestUtils; +import org.apache.seatunnel.app.utils.JSONUtils; + +import com.fasterxml.jackson.core.type.TypeReference; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobDefinitionControllerWrapper extends SeatunnelWebTestingBase { + + public Result createJobDefinition(JobReq jobReq) { + String requestBody = JSONUtils.toPrettyJsonString(jobReq); + String response = sendRequest(url("job/definition"), requestBody, "POST"); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Long createJobDefinition(String jobName) { + JobReq jobReq = new JobReq(); + jobReq.setName(jobName); + jobReq.setDescription(jobName + " description"); + jobReq.setJobType(BusinessMode.DATA_INTEGRATION); + Result result = createJobDefinition(jobReq); + assertTrue(result.isSuccess()); + return result.getData(); + } + + public Result> getJobDefinition( + String searchName, Integer pageNo, Integer pageSize) { + String response = + sendRequest( + urlWithParam("job/definition?") + + "searchName=" + + searchName + + "&pageNo=" + + pageNo + + "&pageSize=" + + pageSize); + return JSONTestUtils.parseObject( + response, new TypeReference>>() {}); + } + + public Result getJobDefinitionById(long jobId) { + String response = sendRequest(url("job/definition/" + jobId)); + return JSONTestUtils.parseObject( + response, new TypeReference>() {}); + } + + public Result deleteJobDefinition(long id) { + String response = sendRequest(urlWithParam("job/definition?id=" + id), null, "DELETE"); + return JSONTestUtils.parseObject(response, Result.class); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java new file mode 100644 index 000000000..760d0831d --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.utils.JSONTestUtils; + +import com.fasterxml.jackson.core.type.TypeReference; + +public class JobExecutorControllerWrapper extends SeatunnelWebTestingBase { + + public Result jobExecutor(Long jobDefineId) { + String response = + sendRequest(urlWithParam("job/executor/execute?jobDefineId=" + jobDefineId)); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result resource(Long jobDefineId) { + String response = + sendRequest(urlWithParam("job/executor/resource?jobDefineId=" + jobDefineId)); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result jobPause(Long jobInstanceId) { + String response = + sendRequest(urlWithParam("job/executor/pause?jobInstanceId=" + jobInstanceId)); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result jobRestore(Long jobInstanceId) { + String response = + sendRequest(urlWithParam("job/executor/restore?jobInstanceId=" + jobInstanceId)); + return JSONTestUtils.parseObject(response, Result.class); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobMetricsControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobMetricsControllerWrapper.java new file mode 100644 index 000000000..0204eded3 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobMetricsControllerWrapper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.response.metrics.JobDAG; +import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; +import org.apache.seatunnel.app.utils.JSONTestUtils; + +import com.fasterxml.jackson.core.type.TypeReference; + +import java.util.List; + +public class JobMetricsControllerWrapper extends SeatunnelWebTestingBase { + + public Result> detail(Long jobInstanceId) { + String response = + sendRequest(urlWithParam("job/metrics/detail?jobInstanceId=" + jobInstanceId)); + return JSONTestUtils.parseObject( + response, new TypeReference>>() {}); + } + + public Result getJobDAG(Long jobInstanceId) { + String response = + sendRequest(urlWithParam("job/metrics/dag?jobInstanceId=" + jobInstanceId)); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result summary(Long jobInstanceId) { + String response = + sendRequest(urlWithParam("job/metrics/summary?jobInstanceId=" + jobInstanceId)); + return JSONTestUtils.parseObject(response, Result.class); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java new file mode 100644 index 000000000..a89581ca7 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.request.connector.SceneMode; +import org.apache.seatunnel.app.domain.request.job.DataSourceOption; +import org.apache.seatunnel.app.domain.request.job.DatabaseTableSchemaReq; +import org.apache.seatunnel.app.domain.request.job.JobDAG; +import org.apache.seatunnel.app.domain.request.job.JobTaskInfo; +import org.apache.seatunnel.app.domain.request.job.PluginConfig; +import org.apache.seatunnel.app.domain.request.job.SelectTableFields; +import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; +import org.apache.seatunnel.app.utils.JSONTestUtils; +import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.datasource.plugin.api.model.TableField; + +import com.fasterxml.jackson.core.type.TypeReference; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobTaskControllerWrapper extends SeatunnelWebTestingBase { + + public Result saveJobDAG(long jobVersionId, JobDAG jobDAG) { + String requestBody = JSONUtils.toPrettyJsonString(jobDAG); + String response = sendRequest(url("job/dag/" + jobVersionId), requestBody, "POST"); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result getJob(long jobVersionId) { + String response = sendRequest(url("job/" + jobVersionId)); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result saveSingleTask(long jobVersionId, PluginConfig pluginConfig) { + String requestBody = JSONUtils.toPrettyJsonString(pluginConfig); + String response = sendRequest(url("job/task/" + jobVersionId), requestBody, "POST"); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result getSingleTask(long jobVersionId, String pluginId) { + String response = sendRequest(url("job/task/" + jobVersionId) + "pluginId=" + pluginId); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result deleteSingleTask(long jobVersionId, String pluginId) { + String response = + sendRequest( + url("job/task/" + jobVersionId) + "pluginId=" + pluginId, null, "DELETE"); + return JSONTestUtils.parseObject(response, Result.class); + } + + public String createFakeSourcePlugin(String datasourceId, long jobVersionId) { + DataSourceOption tableOption = new DataSourceOption(); + tableOption.setDatabases(Arrays.asList("fake_database")); + tableOption.setTables(Arrays.asList("fake_table")); + String sourcePluginId = "src_" + System.currentTimeMillis(); + PluginConfig sourcePluginConfig = + PluginConfig.builder() + .pluginId(sourcePluginId) + .name("source-fakesource") + .type(PluginType.SOURCE) + .tableOption(tableOption) + .selectTableFields(getSelectTableFields()) + .transformOptions(null) + .outputSchema(getOutputSchema()) + .dataSourceId(Long.parseLong(datasourceId)) + .sceneMode(SceneMode.SINGLE_TABLE) + .config( + "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"double.fake.mode\":\"RANGE\",\"double.template\":\"\",\"rows\":\"\",\"row.num\":5,\"split.num\":1,\"split.read-interval\":1,\"map.size\":5,\"array.size\":5,\"bytes.length\":5,\"date.year.template\":\"\",\"date.month.template\":\"\",\"date.day.template\":\"\",\"time.hour.template\":\"\",\"time.minute.template\":\"\",\"time.second.template\":\"\",\"parallelism\":1}") + .build(); + + Result srcResult = saveSingleTask(jobVersionId, sourcePluginConfig); + assertTrue(srcResult.isSuccess()); + return sourcePluginId; + } + + public String createConsoleSinkPlugin(String datasourceId, long jobVersionId) { + DataSourceOption sinkTableOption = new DataSourceOption(); + sinkTableOption.setDatabases(Arrays.asList("console_fake_database")); + sinkTableOption.setTables(Arrays.asList("console_fake_table")); + + String sinkPluginId = "sink_" + System.currentTimeMillis(); + PluginConfig sinkPluginConfig = + PluginConfig.builder() + .pluginId(sinkPluginId) + .name("sink-console") + .type(PluginType.SINK) + .tableOption(sinkTableOption) + .selectTableFields(getSelectTableFields()) + .transformOptions(null) + .outputSchema(null) + .dataSourceId(Long.parseLong(datasourceId)) + .sceneMode(SceneMode.SINGLE_TABLE) + .config("{\"query\":\"\"}") + .build(); + + Result sinkResult = saveSingleTask(jobVersionId, sinkPluginConfig); + assertTrue(sinkResult.isSuccess()); + return sinkPluginId; + } + + public String createReplaceTransformPlugin(long jobVersionId) { + String transPluginId = "trans_" + System.currentTimeMillis(); + PluginConfig transformPluginConfig = + PluginConfig.builder() + .pluginId(transPluginId) + .name("transform-replace") + .type(PluginType.TRANSFORM) + .connectorType("Replace") + .transformOptions(null) + .outputSchema(null) + .sceneMode(SceneMode.SINGLE_TABLE) + .config( + "{\"query\":\"\",\"replace_field\":\"name\",\"pattern\":\"OK\",\"replacement\":\"ITS OK.\",\"is_regex\":\"false\",\"replace_first\":null}") + .build(); + Result transResult = saveSingleTask(jobVersionId, transformPluginConfig); + assertTrue(transResult.isSuccess()); + return transPluginId; + } + + private List getOutputSchema() { + DatabaseTableSchemaReq databaseTableSchemaReq = new DatabaseTableSchemaReq(); + databaseTableSchemaReq.setDatabase("fake_database"); + databaseTableSchemaReq.setTableName("fake_table"); + databaseTableSchemaReq.setFields(createFields()); + return Arrays.asList(databaseTableSchemaReq); + } + + private List createFields() { + List fields = new ArrayList<>(); + fields.add( + createTableField("string", "name", "", true, null, false, null, false, "STRING")); + fields.add(createTableField("int", "age", "", false, null, false, null, false, "INT")); + return fields; + } + + private TableField createTableField( + String type, + String name, + String comment, + Boolean primaryKey, + String defaultValue, + Boolean nullable, + Map properties, + Boolean unSupport, + String outputDataType) { + TableField field = new TableField(); + field.setType(type); + field.setName(name); + field.setComment(comment); + field.setPrimaryKey(primaryKey); + field.setDefaultValue(defaultValue); + field.setNullable(nullable); + field.setProperties(properties); + field.setUnSupport(unSupport); + field.setOutputDataType(outputDataType); + return field; + } + + private SelectTableFields getSelectTableFields() { + SelectTableFields selectTableFields = new SelectTableFields(); + selectTableFields.setAll(true); + List tableFields = Arrays.asList("name", "age"); + selectTableFields.setTableFields(tableFields); + return selectTableFields; + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java new file mode 100644 index 000000000..effca68d0 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.request.datasource.DatasourceCheckReq; +import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq; +import org.apache.seatunnel.app.domain.response.PageInfo; +import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes; +import org.apache.seatunnel.app.domain.response.datasource.DatasourceRes; +import org.apache.seatunnel.app.utils.JSONTestUtils; +import org.apache.seatunnel.app.utils.JSONUtils; + +import com.fasterxml.jackson.core.type.TypeReference; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SeatunnelDatasourceControllerWrapper extends SeatunnelWebTestingBase { + + public String createFakeSourceDatasource(String datasourceName) { + DatasourceReq req = getFakeSourceDatasourceReq(datasourceName); + Result result = createDatasource(req); + assertTrue(result.isSuccess()); + return result.getData(); + } + + public String createConsoleDatasource(String datasourceName) { + DatasourceReq req = getConsoleDatasourceReq(datasourceName); + Result result = createDatasource(req); + assertTrue(result.isSuccess()); + return result.getData(); + } + + public DatasourceReq getFakeSourceDatasourceReq(String datasourceName) { + DatasourceReq req = new DatasourceReq(); + req.setDatasourceName(datasourceName); + req.setPluginName("FakeSource"); + req.setDescription(datasourceName + " desc"); + req.setDatasourceConfig( + "{\"fields\":\"{\\n \\\"name\\\": \\\"string\\\",\\n \\\"age\\\": \\\"int\\\"\\n }\"}"); + return req; + } + + private DatasourceReq getConsoleDatasourceReq(String datasourceName) { + DatasourceReq req = new DatasourceReq(); + req.setDatasourceName(datasourceName); + req.setPluginName("Console"); + req.setDescription(datasourceName + " description"); + req.setDatasourceConfig("{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"100\"}"); + return req; + } + + public Result createDatasource(DatasourceReq req) { + String requestBody = JSONUtils.toPrettyJsonString(req); + String response = sendRequest(url("datasource/create"), requestBody, "POST"); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result testConnect(DatasourceCheckReq req) { + String requestBody = JSONUtils.toPrettyJsonString(req); + String response = sendRequest(url("datasource/check/connect"), requestBody, "POST"); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result updateDatasource(String id, DatasourceReq req) { + String requestBody = JSONUtils.toPrettyJsonString(req); + String response = sendRequest(url("datasource/" + id), requestBody, "PUT"); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result deleteDatasource(String id) { + String response = sendRequest(url("datasource/" + id), null, "DELETE"); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result getDatasource(String id) { + String response = sendRequest(url("datasource/" + id)); + return JSONTestUtils.parseObject( + response, new TypeReference>() {}); + } + + public Result> getDatasourceList( + String searchVal, String pluginName, Integer pageNo, Integer pageSize) { + String response = + sendRequest( + String.format( + "%s/datasource/list?searchVal=%s&pluginName=%s&pageNo=%d&pageSize=%d", + baseUrl, searchVal, pluginName, pageNo, pageSize)); + return JSONTestUtils.parseObject( + response, new TypeReference>>() {}); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java new file mode 100644 index 000000000..af0cb5563 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.request.user.AddUserReq; +import org.apache.seatunnel.app.domain.request.user.UpdateUserReq; +import org.apache.seatunnel.app.domain.response.user.AddUserRes; +import org.apache.seatunnel.app.utils.JSONTestUtils; +import org.apache.seatunnel.app.utils.JSONUtils; + +import com.fasterxml.jackson.core.type.TypeReference; + +public class UserControllerWrapper extends SeatunnelWebTestingBase { + + public Result addUser(AddUserReq addUserReq) { + String requestBody = JSONUtils.toPrettyJsonString(addUserReq); + String response = sendRequest(url("user"), requestBody, "POST"); + return JSONTestUtils.parseObject(response, new TypeReference>() {}); + } + + public Result updateUser(String userId, UpdateUserReq updateUserReq) { + String requestBody = JSONUtils.toPrettyJsonString(updateUserReq); + String response = sendRequest(url("user/" + userId), requestBody, "PUT"); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result deleteUser(String userId) { + String response = sendRequest(url("user/" + userId), null, "DELETE"); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result listUsers(Integer pageNo, Integer pageSize) { + String response = + sendRequest(urlWithParam("user?pageNo=" + pageNo + "&pageSize=" + pageSize)); + return JSONTestUtils.parseObject(response, Result.class); + } + + public Result logout() { + String response = sendRequest(url("user/logout"), null, "PATCH"); + return JSONTestUtils.parseObject(response, Result.class); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/ConnectorInfoDeserializer.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/ConnectorInfoDeserializer.java new file mode 100644 index 000000000..fb164e419 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/ConnectorInfoDeserializer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.domain; + +import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo; +import org.apache.seatunnel.plugin.discovery.PluginIdentifier; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; + +import java.io.IOException; + +public class ConnectorInfoDeserializer extends JsonDeserializer { + + @Override + public ConnectorInfo deserialize( + JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException, JsonProcessingException { + JsonNode node = jsonParser.getCodec().readTree(jsonParser); + JsonNode pluginIdentifierNode = node.get("pluginIdentifier"); + String artifactId = node.get("artifactId").asText(); + + PluginIdentifier pluginIdentifier = + new PluginIdentifierDeserializer() + .deserialize( + pluginIdentifierNode.traverse(jsonParser.getCodec()), + deserializationContext); + + return new ConnectorInfo(pluginIdentifier, artifactId); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/PluginIdentifierDeserializer.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/PluginIdentifierDeserializer.java new file mode 100644 index 000000000..3d097aa48 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/PluginIdentifierDeserializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.domain; + +import org.apache.seatunnel.plugin.discovery.PluginIdentifier; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; + +import java.io.IOException; + +public class PluginIdentifierDeserializer extends JsonDeserializer { + + @Override + public PluginIdentifier deserialize( + JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException, JsonProcessingException { + JsonNode node = jsonParser.getCodec().readTree(jsonParser); + String engineType = node.get("engineType").asText(); + String pluginType = node.get("pluginType").asText(); + String pluginName = node.get("pluginName").asText(); + + return PluginIdentifier.of(engineType, pluginType, pluginName); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/ConnectorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/ConnectorControllerTest.java new file mode 100644 index 000000000..222f48631 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/ConnectorControllerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.ConnectorControllerWrapper; +import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConnectorControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static ConnectorControllerWrapper connectorControllerWrapper; + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + connectorControllerWrapper = new ConnectorControllerWrapper(); + } + + @Test + public void testListAllTransform() { + List listResult = connectorControllerWrapper.listAllTransform(); + assertFalse(listResult.isEmpty()); + } + + @Test + public void testListSource() { + List result = connectorControllerWrapper.listSource("ALL"); + assertFalse(result.isEmpty()); + } + + @Test + public void testListSink() { + List result = connectorControllerWrapper.listSink("ALL"); + assertFalse(result.isEmpty()); + } + + @Test + void testSync() { + Result sync = connectorControllerWrapper.sync(); + assertTrue(sync.isSuccess()); + } + + @Test + void testGetConnectorFormStructure() { + Result connectorFormStructure = + connectorControllerWrapper.getConnectorFormStructure("source", "FakeSource"); + assertTrue(connectorFormStructure.isSuccess()); + } + + @AfterAll + public static void tearDown() { + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobConfigControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobConfigControllerTest.java new file mode 100644 index 000000000..215d7a1d7 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobConfigControllerTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.JobConfigControllerWrapper; +import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper; +import org.apache.seatunnel.app.domain.request.job.JobConfig; +import org.apache.seatunnel.app.domain.response.job.JobConfigRes; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobConfigControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static JobConfigControllerWrapper jobConfigControllerWrapper; + private static JobDefinitionControllerWrapper jobDefinitionControllerWrapper; + private static String uniqueId = "_" + System.currentTimeMillis(); + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + jobConfigControllerWrapper = new JobConfigControllerWrapper(); + jobDefinitionControllerWrapper = new JobDefinitionControllerWrapper(); + } + + @Test + public void updateJobConfig_shouldReturnSuccess_whenValidRequest() { + String jobName = "config_job1" + uniqueId; + updateConfig(jobName); + } + + private static void updateConfig(String jobName) { + long jobId = jobDefinitionControllerWrapper.createJobDefinition(jobName); + JobConfig jobConfig = jobConfigControllerWrapper.populateJobConfigObject(jobName); + Result result = jobConfigControllerWrapper.updateJobConfig(jobId, jobConfig); + assertTrue(result.isSuccess()); + } + + @Test + public void getJobConfig_shouldReturnData_whenValidRequest() { + String jobName = "config_job2" + uniqueId; + long jobId = jobDefinitionControllerWrapper.createJobDefinition(jobName); + Result result = jobConfigControllerWrapper.getJobConfig(jobId); + assertTrue(result.isSuccess()); + assertNotNull(result.getData()); + assertEquals(jobName, result.getData().getName()); + } + + @AfterAll + public static void tearDown() { + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java new file mode 100644 index 000000000..734041857 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper; +import org.apache.seatunnel.app.domain.response.PageInfo; +import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobDefinitionControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static JobDefinitionControllerWrapper jobDefinitionControllerWrapper; + private static String uniqueId = "_" + System.currentTimeMillis(); + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + jobDefinitionControllerWrapper = new JobDefinitionControllerWrapper(); + } + + @Test + public void createJobDefinition_shouldReturnSuccess_whenValidRequest() { + long jobId = jobDefinitionControllerWrapper.createJobDefinition("job1" + uniqueId); + assertTrue(jobId > 0); + } + + @Test + public void getJobDefinitionById_shouldReturnData_whenValidRequest() { + String job2 = "job2" + uniqueId; + long jobId = jobDefinitionControllerWrapper.createJobDefinition(job2); + Result result = + jobDefinitionControllerWrapper.getJobDefinitionById(jobId); + assertTrue(result.isSuccess()); + assertEquals(job2, result.getData().getName()); + } + + @Test + public void getJobDefinition_shouldReturnData_whenValidRequest() { + String job3 = "job3" + uniqueId; + long jobId = jobDefinitionControllerWrapper.createJobDefinition(job3); + Result> result = + jobDefinitionControllerWrapper.getJobDefinition(job3, 1, 10); + assertTrue(result.isSuccess()); + assertEquals(1, result.getData().getData().size()); + assertEquals(jobId, result.getData().getData().get(0).getId()); + } + + @Test + public void deleteJobDefinition_shouldReturnSuccess_whenValidId() { + String job7 = "job7" + uniqueId; + long jobId = jobDefinitionControllerWrapper.createJobDefinition(job7); + Result result = jobDefinitionControllerWrapper.deleteJobDefinition(jobId); + assertTrue(result.isSuccess()); + } + + @AfterAll + public static void tearDown() { + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java new file mode 100644 index 000000000..4a01df280 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; +import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; +import org.apache.seatunnel.app.utils.JobUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobExecutorControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static JobExecutorControllerWrapper jobExecutorControllerWrapper; + private static final String uniqueId = "_" + System.currentTimeMillis(); + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + jobExecutorControllerWrapper = new JobExecutorControllerWrapper(); + } + + @Test + public void executeJob_shouldReturnSuccess_whenValidRequest() { + String jobName = "execJob" + uniqueId; + long jobVersionId = JobUtils.createJob(jobName); + Result result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); + assertTrue(result.isSuccess()); + assertTrue(result.getData() > 0); + Result> listResult = + JobUtils.waitForJobCompletion(result.getData()); + assertEquals(1, listResult.getData().size()); + assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(5, listResult.getData().get(0).getReadRowCount()); + assertEquals(5, listResult.getData().get(0).getWriteRowCount()); + } + + @Test + public void restoreJob_shouldReturnSuccess_whenValidRequest() { + String jobName = "jobRestore" + uniqueId; + long jobVersionId = JobUtils.createJob(jobName); + Result executorResult = jobExecutorControllerWrapper.jobExecutor(jobVersionId); + assertTrue(executorResult.isSuccess()); + Result result = jobExecutorControllerWrapper.jobRestore(executorResult.getData()); + assertTrue(result.isSuccess()); + } + + @AfterAll + public static void tearDown() { + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java new file mode 100644 index 000000000..5086ba74b --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; +import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper; +import org.apache.seatunnel.app.domain.response.metrics.JobDAG; +import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; +import org.apache.seatunnel.app.utils.JobUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobMetricsControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static JobMetricsControllerWrapper jobMetricsControllerWrapper; + private static JobExecutorControllerWrapper jobExecutorControllerWrapper; + private static final String uniqueId = "_" + System.currentTimeMillis(); + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + jobMetricsControllerWrapper = new JobMetricsControllerWrapper(); + jobExecutorControllerWrapper = new JobExecutorControllerWrapper(); + } + + @Test + public void detailMetrics_shouldReturnData_whenValidRequest() { + String jobName = "jobDetail" + uniqueId; + long jobInstanceId = executeJob(jobName); + Result> result = + jobMetricsControllerWrapper.detail(jobInstanceId); + assertTrue(result.isSuccess()); + assertFalse(result.getData().isEmpty()); + } + + private static Long executeJob(String jobName) { + Long jobVersionId = JobUtils.createJob(jobName); + Result jobExecutionResult = jobExecutorControllerWrapper.jobExecutor(jobVersionId); + assertTrue(jobExecutionResult.isSuccess()); + return jobExecutionResult.getData(); + } + + @Test + public void getJobDAG_shouldReturnData_whenValidRequest() { + String jobName = "jobDAG" + uniqueId; + long jobInstanceId = executeJob(jobName); + Result result = jobMetricsControllerWrapper.getJobDAG(jobInstanceId); + assertTrue(result.isSuccess()); + assertNotNull(result.getData()); + } + + @Test + public void summaryMetrics_shouldReturnData_whenValidRequest() { + String jobName = "jobSummary" + uniqueId; + long jobInstanceId = executeJob(jobName); + Result result = jobMetricsControllerWrapper.summary(jobInstanceId); + assertTrue(result.isSuccess()); + } + + @AfterAll + public static void tearDown() { + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobTaskControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobTaskControllerTest.java new file mode 100644 index 000000000..c183f7860 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobTaskControllerTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.JobConfigControllerWrapper; +import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper; +import org.apache.seatunnel.app.controller.JobTaskControllerWrapper; +import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; +import org.apache.seatunnel.app.domain.request.job.Edge; +import org.apache.seatunnel.app.domain.request.job.JobConfig; +import org.apache.seatunnel.app.domain.request.job.JobDAG; +import org.apache.seatunnel.app.domain.request.job.JobTaskInfo; +import org.apache.seatunnel.app.domain.request.job.PluginConfig; +import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobTaskControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static JobTaskControllerWrapper jobTaskControllerWrapper; + private static JobDefinitionControllerWrapper jobDefinitionControllerWrapper; + private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper; + private static JobConfigControllerWrapper jobConfigControllerWrapper; + private static String uniqueId = "_" + System.currentTimeMillis(); + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + jobTaskControllerWrapper = new JobTaskControllerWrapper(); + jobDefinitionControllerWrapper = new JobDefinitionControllerWrapper(); + seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper(); + jobConfigControllerWrapper = new JobConfigControllerWrapper(); + } + + @Test + public void getJob_shouldReturnData_whenValidRequest() { + long jobId = jobDefinitionControllerWrapper.createJobDefinition("task_job1" + uniqueId); + Result result = jobTaskControllerWrapper.getJob(jobId); + assertTrue(result.isSuccess()); + } + + @Test + public void saveSingleTask_shouldReturnSuccess_whenValidRequest() { + String jobName = "task_job2" + uniqueId; + long jobId = jobDefinitionControllerWrapper.createJobDefinition(jobName); + String sourceDatasourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource( + "task_db2_source" + uniqueId); + String sinkDataSourceId = + seatunnelDatasourceControllerWrapper.createConsoleDatasource( + "task_db2_sink" + uniqueId); + String sourcePluginId = + jobTaskControllerWrapper.createFakeSourcePlugin(sourceDatasourceId, jobId); + String sinkPluginId = + jobTaskControllerWrapper.createConsoleSinkPlugin(sinkDataSourceId, jobId); + String transPluginId = jobTaskControllerWrapper.createReplaceTransformPlugin(jobId); + + JobConfig jobConfig = jobConfigControllerWrapper.populateJobConfigObject(jobName); + jobConfigControllerWrapper.updateJobConfig(jobId, jobConfig); + + JobDAG jobDAG = new JobDAG(); + List edges = new ArrayList<>(); + edges.add(new Edge(sourcePluginId, transPluginId)); + edges.add(new Edge(transPluginId, sinkPluginId)); + jobDAG.setEdges(edges); + + Result dagResult = jobTaskControllerWrapper.saveJobDAG(jobId, jobDAG); + assertTrue(dagResult.isSuccess()); + } + + @Test + public void getSingleTask_shouldReturnData_whenValidRequest() { + String jobName = "task_job3" + uniqueId; + long jobId = jobDefinitionControllerWrapper.createJobDefinition(jobName); + String datasourceName = "task_job1_db3" + uniqueId; + String datasourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName); + String sourcePluginId = + jobTaskControllerWrapper.createFakeSourcePlugin(datasourceId, jobId); + Result result = jobTaskControllerWrapper.getSingleTask(jobId, sourcePluginId); + assertTrue(result.isSuccess()); + } + + @Test + public void deleteSingleTask_shouldReturnSuccess_whenValidRequest() { + String jobName = "task_job7" + uniqueId; + long jobId = jobDefinitionControllerWrapper.createJobDefinition(jobName); + String datasourceName = "task_job1_db4" + uniqueId; + String datasourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName); + String sourcePluginId = + jobTaskControllerWrapper.createFakeSourcePlugin(datasourceId, jobId); + Result result = jobTaskControllerWrapper.deleteSingleTask(jobId, sourcePluginId); + assertTrue(result.isSuccess()); + } + + @AfterAll + public static void tearDown() { + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/SeatunnelDatasourceControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/SeatunnelDatasourceControllerTest.java new file mode 100644 index 000000000..b928e0cc3 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/SeatunnelDatasourceControllerTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; +import org.apache.seatunnel.app.domain.request.datasource.DatasourceCheckReq; +import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq; +import org.apache.seatunnel.app.domain.response.PageInfo; +import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes; +import org.apache.seatunnel.app.domain.response.datasource.DatasourceRes; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SeatunnelDatasourceControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper; + private static String uniqueId = "_" + System.currentTimeMillis(); + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper(); + } + + @Test + public void createDatasource_shouldReturnSuccess() { + String datasourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource("ds1" + uniqueId); + assertTrue(!datasourceId.isEmpty()); + } + + @Test + public void testConnect_shouldReturnSuccess() { + DatasourceCheckReq req = new DatasourceCheckReq(); + req.setPluginName("FakeSource"); + Map datasourceConfig = new HashMap<>(); + datasourceConfig.put("fields", "{\"name\" : \"string\", \"age\" : \"int\"}"); + req.setDatasourceConfig(datasourceConfig); + Result result = seatunnelDatasourceControllerWrapper.testConnect(req); + assertTrue(result.isSuccess()); + } + + @Test + public void updateDatasource_shouldReturnSuccess() { + String datasourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource("ds2" + uniqueId); + Result result = + seatunnelDatasourceControllerWrapper.getDatasource(datasourceId); + assertNotNull(result.getData()); + DatasourceReq req = new DatasourceReq(); + req.setDescription("new Description"); + // Populate req with valid data + Result updateResult = + seatunnelDatasourceControllerWrapper.updateDatasource(datasourceId, req); + assertTrue(updateResult.isSuccess()); + result = seatunnelDatasourceControllerWrapper.getDatasource(datasourceId); + assertEquals(req.getDescription(), result.getData().getDescription()); + } + + @Test + public void deleteDatasource_shouldReturnSuccess() { + String id = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource("ds3" + uniqueId); + Result result = seatunnelDatasourceControllerWrapper.deleteDatasource(id); + assertTrue(result.isSuccess()); + } + + @Test + public void getDatasourceDetail_shouldReturnSuccess() { + String id = seatunnelDatasourceControllerWrapper.createConsoleDatasource("ds4" + uniqueId); + Result result = seatunnelDatasourceControllerWrapper.getDatasource(id); + assertTrue(result.isSuccess()); + } + + @Test + public void getDatasourceDetailByName_shouldReturnSuccess() { + String datasourceName = "ds5" + uniqueId; + String id = seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName); + Result> datasourceList = + seatunnelDatasourceControllerWrapper.getDatasourceList( + datasourceName, "FakeSource", 1, 10); + assertTrue(datasourceList.isSuccess()); + assertNotNull(datasourceList.getData()); + assertEquals(1, datasourceList.getData().getData().size()); + assertEquals(id, datasourceList.getData().getData().get(0).getId()); + } + + @Test + public void createDatasource_shouldFailIfDuplicate() { + String datasourceName = "ds6" + uniqueId; + String datasourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName); + assertTrue(!datasourceId.isEmpty()); + + DatasourceReq req = + seatunnelDatasourceControllerWrapper.getFakeSourceDatasourceReq(datasourceName); + Result result = seatunnelDatasourceControllerWrapper.createDatasource(req); + assertTrue(result.isFailed()); + assertEquals(60004, result.getCode()); + } + + @AfterAll + public static void tearDown() { + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/UserControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/UserControllerTest.java new file mode 100644 index 000000000..f6e728576 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/UserControllerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.test; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.UserControllerWrapper; +import org.apache.seatunnel.app.domain.request.user.AddUserReq; +import org.apache.seatunnel.app.domain.request.user.UpdateUserReq; +import org.apache.seatunnel.app.domain.response.user.AddUserRes; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class UserControllerTest { + private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); + private static UserControllerWrapper userControllerWrapper; + private static String uniqueId = "_" + System.currentTimeMillis(); + + @BeforeAll + public static void setUp() { + seaTunnelWebCluster.start(); + userControllerWrapper = new UserControllerWrapper(); + } + + @Test + public void addUser_shouldReturnSuccess_whenValidRequest() { + String user = "addUser" + uniqueId; + AddUserReq addUserReq = getAddUserReq(user, "pass1"); + Result result = userControllerWrapper.addUser(addUserReq); + assertTrue(result.isSuccess()); + assertTrue(result.getData().getId() > 0); + } + + private static AddUserReq getAddUserReq(String user, String pass) { + AddUserReq addUserReq = new AddUserReq(); + addUserReq.setUsername(user); + addUserReq.setPassword(pass); + addUserReq.setStatus((byte) 0); + addUserReq.setType((byte) 0); + return addUserReq; + } + + @Test + public void updateUser_shouldReturnSuccess_whenValidRequest() { + String user = "updateUser" + uniqueId; + AddUserReq addUserReq = getAddUserReq(user, "pass2"); + Result result = userControllerWrapper.addUser(addUserReq); + assertTrue(result.isSuccess()); + UpdateUserReq updateUserReq = new UpdateUserReq(); + updateUserReq.setUsername(user); + updateUserReq.setUserId(result.getData().getId()); + updateUserReq.setPassword("pass3"); + updateUserReq.setStatus((byte) 0); + updateUserReq.setType((byte) 0); + Result updateUserResult = + userControllerWrapper.updateUser( + String.valueOf(updateUserReq.getUserId()), updateUserReq); + assertTrue(updateUserResult.isSuccess()); + } + + @Test + public void deleteUser_shouldReturnSuccess_whenValidUserId() { + String user = "deleteUser" + uniqueId; + AddUserReq addUserReq = getAddUserReq(user, "pass3"); + Result result = userControllerWrapper.addUser(addUserReq); + assertTrue(result.isSuccess()); + Result deleteUserResult = + userControllerWrapper.deleteUser(String.valueOf(result.getData().getId())); + assertTrue(deleteUserResult.isSuccess()); + } + + @Test + public void listUsers_shouldReturnUsers_whenUsersExist() { + Result result = userControllerWrapper.listUsers(1, 10); + assertTrue(result.isSuccess()); + assertNotNull(result.getData()); + } + + @AfterAll + public static void tearDown() { + Result logout = userControllerWrapper.logout(); + assertTrue(logout.isSuccess()); + seaTunnelWebCluster.stop(); + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JSONTestUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JSONTestUtils.java new file mode 100644 index 000000000..cfd29d029 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JSONTestUtils.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.utils; + +import org.apache.seatunnel.app.common.Constants; +import org.apache.seatunnel.app.domain.ConnectorInfoDeserializer; +import org.apache.seatunnel.app.domain.PluginIdentifierDeserializer; +import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo; +import org.apache.seatunnel.plugin.discovery.PluginIdentifier; + +import org.apache.commons.lang3.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import javax.annotation.Nullable; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.TimeZone; + +import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; +import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; + +public class JSONTestUtils { + + private static final Logger logger = LoggerFactory.getLogger(JSONTestUtils.class); + + static { + logger.info("init timezone: {}", TimeZone.getDefault()); + } + + private static final ObjectMapper objectMapper = + JsonMapper.builder() + .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) + .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) + .configure(REQUIRE_SETTERS_FOR_GETTERS, true) + .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) + .addModule(new JavaTimeModule()) + .defaultTimeZone(TimeZone.getDefault()) + .defaultDateFormat(new SimpleDateFormat(Constants.YYYY_MM_DD_HH_MM_SS)) + .defaultPrettyPrinter(new DefaultPrettyPrinter()) + .build(); + /* can use static singleton, inject: just make sure to reuse! */ + static { + SimpleModule module = new SimpleModule(); + module.addDeserializer(PluginIdentifier.class, new PluginIdentifierDeserializer()); + module.addDeserializer(ConnectorInfo.class, new ConnectorInfoDeserializer()); + objectMapper.registerModule(module); + } + + /** + * This method deserializes the specified Json into an object of the specified class. It is not + * suitable to use if the specified class is a generic type since it will not have the generic + * type information because of the Type Erasure feature of Java. Therefore, this method should + * not be used if the desired type is a generic type. Note that this method works fine if any of + * the fields of the specified object are generics, just the object itself should not be a + * generic type. + * + * @param json the string from which the object is to be deserialized + * @param clazz the class of T + * @param T + * @return an object of type T from the string classOfT + */ + public static @Nullable T parseObject(String json, Class clazz) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, clazz); + } catch (Exception e) { + logger.error("parse object exception! json: {}", json, e); + } + return null; + } + + /** + * json to list + * + * @param json json string + * @param clazz class + * @param T + * @return list + */ + public static List toList(String json, Class clazz) { + if (StringUtils.isEmpty(json)) { + return Collections.emptyList(); + } + try { + CollectionType listType = + objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, clazz); + return objectMapper.readValue(json, listType); + } catch (Exception e) { + logger.error("parse list exception! json: {}", json, e); + } + + return Collections.emptyList(); + } + + /** + * json to object + * + * @param json json string + * @param type type reference + * @param + * @return return parse object + */ + public static @Nullable T parseObject(String json, TypeReference type) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, type); + } catch (Exception e) { + logger.error("json to map exception!, json: {}", json, e); + } + + return null; + } +} diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java new file mode 100644 index 000000000..0a530d3b7 --- /dev/null +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.utils; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.controller.JobConfigControllerWrapper; +import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper; +import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper; +import org.apache.seatunnel.app.controller.JobTaskControllerWrapper; +import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; +import org.apache.seatunnel.app.domain.request.job.Edge; +import org.apache.seatunnel.app.domain.request.job.JobConfig; +import org.apache.seatunnel.app.domain.request.job.JobDAG; +import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; +import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JobUtils { + private static JobMetricsControllerWrapper jobMetricsControllerWrapper = + new JobMetricsControllerWrapper(); + private static JobConfigControllerWrapper jobConfigControllerWrapper = + new JobConfigControllerWrapper(); + private static JobDefinitionControllerWrapper jobDefinitionControllerWrapper = + new JobDefinitionControllerWrapper(); + private static JobTaskControllerWrapper jobTaskControllerWrapper = + new JobTaskControllerWrapper(); + private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper = + new SeatunnelDatasourceControllerWrapper(); + private static final long TIMEOUT = 60; // 1 minute + private static final long INTERVAL = 2; // 1 second + + public static Result> waitForJobCompletion( + long jobInstanceId) { + return waitForJobCompletion(jobInstanceId, TIMEOUT, INTERVAL); + } + + public static Result> waitForJobCompletion( + long jobInstanceId, long timeout, long interval) { + long startTime = System.currentTimeMillis(); + while (true) { + Result> detail = + jobMetricsControllerWrapper.detail(jobInstanceId); + if (!detail.isSuccess()) { + throw new RuntimeException("Failed to get job detail metrics"); + } + if (isAllFinished(detail.getData())) { + return detail; + } + if (System.currentTimeMillis() - startTime > TimeUnit.SECONDS.toMillis(timeout)) { + throw new RuntimeException( + "Timeout waiting for job to complete. Job not completed in " + + timeout + + " seconds."); + } + try { + TimeUnit.SECONDS.sleep(interval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Thread was interrupted while waiting for job completion", e); + } + } + } + + private static boolean isAllFinished(List details) { + for (JobPipelineDetailMetricsRes metrics : details) { + if (!isFinished(metrics)) { + return false; + } + } + return true; + } + + private static boolean isFinished(JobPipelineDetailMetricsRes metrics) { + if (metrics == null || metrics.getStatus() == null) { + return false; + } + switch (metrics.getStatus()) { + case "FINISHED": + case "CANCELED": + case "FAILED": + return true; + default: + return false; + } + } + + public static Long createJob(String jobName) { + Long jobId = jobDefinitionControllerWrapper.createJobDefinition(jobName); + JobConfig jobConfig = jobConfigControllerWrapper.populateJobConfigObject(jobName); + // jobVersionId is same as jobId + long jobVersionId = jobId; + + Result jobConfigResult = + jobConfigControllerWrapper.updateJobConfig(jobVersionId, jobConfig); + assertTrue(jobConfigResult.isSuccess()); + + String fakeSourceDatasourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource( + "source_" + jobName); + String consoleDatasourceId = + seatunnelDatasourceControllerWrapper.createConsoleDatasource("console_" + jobName); + + String sourcePluginId = + jobTaskControllerWrapper.createFakeSourcePlugin( + fakeSourceDatasourceId, jobVersionId); + String transPluginId = jobTaskControllerWrapper.createReplaceTransformPlugin(jobVersionId); + String sinkPluginId = + jobTaskControllerWrapper.createConsoleSinkPlugin(consoleDatasourceId, jobVersionId); + + JobDAG jobDAG = new JobDAG(); + List edges = new ArrayList<>(); + edges.add(new Edge(sourcePluginId, transPluginId)); + edges.add(new Edge(transPluginId, sinkPluginId)); + jobDAG.setEdges(edges); + + Result jobTaskCheckResResult = + jobTaskControllerWrapper.saveJobDAG(jobVersionId, jobDAG); + assertTrue(jobTaskCheckResResult.isSuccess()); + return jobVersionId; + } +} diff --git a/seatunnel-web-it/src/test/resources/application.yml b/seatunnel-web-it/src/test/resources/application.yml new file mode 100644 index 000000000..3b49d2a32 --- /dev/null +++ b/seatunnel-web-it/src/test/resources/application.yml @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server: + port: 8802 + +spring: + application: + name: seatunnel + jackson: + date-format: yyyy-MM-dd HH:mm:ss + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://localhost:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true + username: seatunnel_user + password: seaTunnel_1234 + mvc: + pathmatch: + matching-strategy: ant_path_matcher + +jwt: + expireTime: 86400 + # please add key when deploy + secretKey: https://github.com/apache/seatunnel + algorithm: HS256 + +--- +spring: + config: + activate: + on-profile: h2 + sql: + init: + schema-locations: classpath*:script/seatunnel_server_h2.sql + datasource: + driver-class-name: org.h2.Driver + url: jdbc:h2:mem:seatunnel;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true + username: sa + password: sa + h2: + console: + enabled: true + path: /h2 + settings: + trace: false + web-allow-others: false \ No newline at end of file diff --git a/seatunnel-web-it/src/test/resources/hazelcast-client.yaml b/seatunnel-web-it/src/test/resources/hazelcast-client.yaml new file mode 100644 index 000000000..d3b7f3d7e --- /dev/null +++ b/seatunnel-web-it/src/test/resources/hazelcast-client.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +hazelcast-client: + cluster-name: seatunnel-test + properties: + hazelcast.logging.type: log4j2 + connection-strategy: + connection-retry: + cluster-connect-timeout-millis: 3000 + network: + cluster-members: + - localhost:5901 \ No newline at end of file diff --git a/seatunnel-web-it/src/test/resources/hazelcast.yaml b/seatunnel-web-it/src/test/resources/hazelcast.yaml new file mode 100644 index 000000000..7ac730e01 --- /dev/null +++ b/seatunnel-web-it/src/test/resources/hazelcast.yaml @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +hazelcast: + cluster-name: seatunnel-test + network: + rest-api: + enabled: true + endpoint-groups: + CLUSTER_WRITE: + enabled: true + DATA: + enabled: true + join: + tcp-ip: + enabled: true + member-list: + - localhost + port: + auto-increment: true + port: 5901 + properties: + hazelcast.invocation.max.retry.count: 20 + hazelcast.tcp.join.port.try.count: 30 + hazelcast.logging.type: log4j2 + hazelcast.operation.generic.thread.count: 50 + hazelcast.heartbeat.failuredetector.type: phi-accrual + hazelcast.heartbeat.interval.seconds: 2 + hazelcast.max.no.heartbeat.seconds: 180 + hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 + hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 + hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 + diff --git a/seatunnel-web-it/src/test/resources/logback-spring.xml b/seatunnel-web-it/src/test/resources/logback-spring.xml new file mode 100644 index 000000000..145a239d2 --- /dev/null +++ b/seatunnel-web-it/src/test/resources/logback-spring.xml @@ -0,0 +1,48 @@ + + + + + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} ${APP_NAME} ${HOST_NAME} %p ${TRACE} [%thread] [%C{0}.%M\(\):%L] - %m%n + UTF-8 + + + + + ${APP_LOG_PATH}/seatunnel-web.log + true + + ${APP_LOG_PATH}/seatunnel-web.log.%d{yyyy-MM-dd}.%i.log + 30 + 100MB + + + %d{yyyy-MM-dd HH:mm:ss.SSS} ${APP_NAME} ${HOST_NAME} %p [%thread] [%C{0}.%M\(\):%L] - %m%n + UTF-8 + + + + + + + + diff --git a/seatunnel-web-it/src/test/resources/seatunnel.yaml b/seatunnel-web-it/src/test/resources/seatunnel.yaml new file mode 100644 index 000000000..5961c8392 --- /dev/null +++ b/seatunnel-web-it/src/test/resources/seatunnel.yaml @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +seatunnel: + engine: + history-job-expire-minutes: 1440 + backup-count: 1 + queue-type: blockingqueue + print-execution-info-interval: 60 + print-job-metrics-info-interval: 60 + slot-service: + dynamic-slot: true + checkpoint: + interval: 10000 + timeout: 60000 + storage: + type: hdfs + max-retained: 3 + plugin-config: + namespace: /tmp/seatunnel/checkpoint_snapshot + storage.type: hdfs + fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission \ No newline at end of file diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 0a42b54b7..0a8dfa022 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -7,7 +7,12 @@ commons-io-2.11.0.jar config-1.3.3.jar db2jcc-db2jcc4.jar gson-2.8.6.jar -guava-19.0.jar +guava-33.2.1-jre.jar +checker-qual-3.10.0.jar +error_prone_annotations-2.26.1.jar +failureaccess-1.0.2.jar +j2objc-annotations-3.0.0.jar +listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar hibernate-validator-6.2.2.Final.jar jackson-annotations-2.12.6.jar jackson-core-2.13.3.jar