diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java similarity index 53% rename from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java index 3f85fbbe35f..5acfad55921 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java @@ -15,194 +15,241 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.e2e; - -import org.apache.seatunnel.common.config.Common; -import org.apache.seatunnel.common.config.DeployMode; -import org.apache.seatunnel.engine.client.SeaTunnelClient; -import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; -import org.apache.seatunnel.engine.client.job.ClientJobProxy; -import org.apache.seatunnel.engine.common.config.ConfigProvider; -import org.apache.seatunnel.engine.common.config.JobConfig; -import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; -import org.apache.seatunnel.engine.common.config.server.TelemetryConfig; -import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig; -import org.apache.seatunnel.engine.core.job.JobStatus; -import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +package org.apache.seatunnel.engine.e2e.telemetry; + +import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer; +import org.apache.seatunnel.e2e.common.util.ContainerUtil; +import org.apache.seatunnel.engine.server.rest.RestConstant; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; -import com.hazelcast.client.config.ClientConfig; -import com.hazelcast.instance.impl.HazelcastInstanceImpl; -import lombok.extern.slf4j.Slf4j; +import io.restassured.response.Response; +import io.restassured.response.ValidatableResponse; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import static io.restassured.RestAssured.given; +import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; +import static org.apache.seatunnel.engine.server.rest.RestConstant.CONTEXT_PATH; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.matchesRegex; -@Slf4j -public class TelemetryApiIT { +public class MasterWorkerClusterSeaTunnelWithTelemetryIT extends SeaTunnelContainer { + + private GenericContainer secondServer; + + private final Network NETWORK = Network.newNetwork(); - private static final String HOST = "http://localhost:"; + private static final String jobName = "test测试"; + private static final String paramJobName = "param_test测试"; - private static ClientJobProxy clientJobProxy; + private static final String http = "http://"; - private static HazelcastInstanceImpl hazelcastInstance; + private static final String colon = ":"; - private static String testClusterName; + private static final String confFile = "/fakesource_to_console.conf"; - @BeforeAll - static void beforeClass() throws Exception { - testClusterName = TestUtils.getClusterName("TelemetryApiIT"); - SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); - seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName); - TelemetryMetricConfig telemetryMetricConfig = new TelemetryMetricConfig(); - telemetryMetricConfig.setEnabled(true); - TelemetryConfig telemetryConfig = new TelemetryConfig(); - telemetryConfig.setMetric(telemetryMetricConfig); - seaTunnelConfig.getEngineConfig().setTelemetryConfig(telemetryConfig); - hazelcastInstance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); - Common.setDeployMode(DeployMode.CLIENT); - String filePath = TestUtils.getResource("stream_fakesource_to_file.conf"); - JobConfig jobConfig = new JobConfig(); - jobConfig.setName("fake_to_file"); + private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL); + private static final Path config = Paths.get(SEATUNNEL_HOME, "config"); + private static final Path hadoopJar = + Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar"); - ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); - clientConfig.setClusterName(testClusterName); - SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); - ClientJobExecutionEnvironment jobExecutionEnv = - engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); + @Test + public void testSubmitJobs() throws InterruptedException { + testGetMetrics(server, "seatunnel", true); + testGetMetrics(secondServer, "seatunnel", false); + } - clientJobProxy = jobExecutionEnv.execute(); + @Override + @BeforeEach + public void startUp() throws Exception { + + server = createServer("server", "master"); + secondServer = createServer("secondServer", "worker"); + + // check cluster + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + Response response = + given().get( + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + "/hazelcast/rest/cluster"); + response.then().statusCode(200); + Assertions.assertEquals( + 2, response.jsonPath().getList("members").size()); + }); + String JobId = + submitJob( + server, + server.getMappedPort(5801), + RestConstant.CONTEXT_PATH, + "STREAMING", + jobName, + paramJobName) + .getBody() + .jsonPath() + .getString("jobId"); Awaitility.await() .atMost(2, TimeUnit.MINUTES) .untilAsserted( - () -> - Assertions.assertEquals( - JobStatus.RUNNING, clientJobProxy.getJobStatus())); + () -> { + Assertions.assertNotNull(JobId); + given().get( + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + CONTEXT_PATH + + RestConstant.JOB_INFO_URL + + "/" + + JobId) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING")); + }); } - @Test - public void testGetMetrics() throws InterruptedException { - given().get( - HOST - + hazelcastInstance - .getCluster() - .getLocalMember() - .getAddress() - .getPort() - + "/hazelcast/rest/instance/metrics") - .then() - .statusCode(200) - // Use regular expressions to verify whether the response body is the indicator data - // of Prometheus - // Metric data is usually multi-line, use newlines for validation - .body(matchesRegex("(?s)^.*# HELP.*# TYPE.*$")) - // Verify that the response body contains a specific metric - // JVM metrics - .body(containsString("jvm_threads")) - .body(containsString("jvm_memory_pool")) - .body(containsString("jvm_gc")) - .body(containsString("jvm_info")) - .body(containsString("jvm_memory_bytes")) - .body(containsString("jvm_classes")) - .body(containsString("jvm_buffer_pool")) - .body(containsString("process_start")) - // cluster_info - .body(containsString("cluster_info{cluster=\"" + testClusterName)) - // cluster_time - .body(containsString("cluster_time{cluster=\"" + testClusterName)) - // Job thread pool metrics - .body( - matchesRegex( - "(?s)^.*job_thread_pool_activeCount\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - .body( - matchesRegex( - "(?s)^.*job_thread_pool_completedTask_total\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - .body( - matchesRegex( - "(?s)^.*job_thread_pool_corePoolSize\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - .body( - matchesRegex( - "(?s)^.*job_thread_pool_maximumPoolSize\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - .body( - matchesRegex( - "(?s)^.*job_thread_pool_poolSize\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - .body( - matchesRegex( - "(?s)^.*job_thread_pool_task_total\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - .body( - matchesRegex( - "(?s)^.*job_thread_pool_queueTaskCount\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - .body( - matchesRegex( - "(?s)^.*job_thread_pool_rejection_total\\{cluster=\"" - + testClusterName - + "\",address=.*$")) - // Job count metrics - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"canceled\",} 0.0")) - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"cancelling\",} 0.0")) - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"created\",} 0.0")) - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"failed\",} 0.0")) - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"failing\",} 0.0")) - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"finished\",} 0.0")) - // Running job count is 1 - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"running\",} 1.0")) - .body( - containsString( - "job_count{cluster=\"" - + testClusterName - + "\",type=\"scheduled\",} 0.0")) - // Node + public void testGetMetrics(GenericContainer server, String testClusterName, boolean isMaster) + throws InterruptedException { + Response response = + given().get( + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + "/hazelcast/rest/instance/metrics"); + ValidatableResponse validatableResponse = + response.then() + .statusCode(200) + // Use regular expressions to verify whether the response body is the + // indicator data + // of Prometheus + // Metric data is usually multi-line, use newlines for validation + .body(matchesRegex("(?s)^.*# HELP.*# TYPE.*$")) + // Verify that the response body contains a specific metric + // JVM metrics + .body(containsString("jvm_threads")) + .body(containsString("jvm_memory_pool")) + .body(containsString("jvm_gc")) + .body(containsString("jvm_info")) + .body(containsString("jvm_memory_bytes")) + .body(containsString("jvm_classes")) + .body(containsString("jvm_buffer_pool")) + .body(containsString("process_start")) + // cluster_info + .body(containsString("cluster_info{cluster=\"" + testClusterName)) + // cluster_time + .body(containsString("cluster_time{cluster=\"" + testClusterName)); + + if (isMaster) { + validatableResponse + // Job thread pool metrics + .body( + matchesRegex( + "(?s)^.*job_thread_pool_activeCount\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + .body( + matchesRegex( + "(?s)^.*job_thread_pool_completedTask_total\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + .body( + matchesRegex( + "(?s)^.*job_thread_pool_corePoolSize\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + .body( + matchesRegex( + "(?s)^.*job_thread_pool_maximumPoolSize\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + .body( + matchesRegex( + "(?s)^.*job_thread_pool_poolSize\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + .body( + matchesRegex( + "(?s)^.*job_thread_pool_task_total\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + .body( + matchesRegex( + "(?s)^.*job_thread_pool_queueTaskCount\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + .body( + matchesRegex( + "(?s)^.*job_thread_pool_rejection_total\\{cluster=\"" + + testClusterName + + "\",address=.*$")) + // Job count metrics + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"canceled\",} 0.0")) + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"cancelling\",} 0.0")) + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"created\",} 0.0")) + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"failed\",} 0.0")) + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"failing\",} 0.0")) + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"finished\",} 0.0")) + // Running job count is 1 + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"running\",} 1.0")) + .body( + containsString( + "job_count{cluster=\"" + + testClusterName + + "\",type=\"scheduled\",} 0.0")); + } + // Node + validatableResponse .body( matchesRegex( "(?s)^.*node_state\\{cluster=\"" @@ -528,10 +575,139 @@ public void testGetMetrics() throws InterruptedException { + "\",address=.*$")); } - @AfterAll - static void afterClass() { - if (hazelcastInstance != null) { - hazelcastInstance.shutdown(); + @Override + @AfterEach + public void tearDown() throws Exception { + super.tearDown(); + if (secondServer != null) { + secondServer.close(); } } + + private Response submitJob( + GenericContainer container, + int port, + String contextPath, + String jobMode, + String jobName, + String paramJobName) { + return submitJob(jobMode, container, port, contextPath, false, jobName, paramJobName); + } + + private Response submitJob( + String jobMode, + GenericContainer container, + int port, + String contextPath, + boolean isStartWithSavePoint, + String jobName, + String paramJobName) { + String requestBody = + "{\n" + + " \"env\": {\n" + + " \"job.name\": \"" + + jobName + + "\",\n" + + " \"job.mode\": \"" + + jobMode + + "\"\n" + + " },\n" + + " \"source\": [\n" + + " {\n" + + " \"plugin_name\": \"FakeSource\",\n" + + " \"result_table_name\": \"fake\",\n" + + " \"row.num\": 100,\n" + + " \"schema\": {\n" + + " \"fields\": {\n" + + " \"name\": \"string\",\n" + + " \"age\": \"int\",\n" + + " \"card\": \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"transform\": [\n" + + " ],\n" + + " \"sink\": [\n" + + " {\n" + + " \"plugin_name\": \"Console\",\n" + + " \"source_table_name\": [\"fake\"]\n" + + " }\n" + + " ]\n" + + "}"; + String parameters = null; + if (paramJobName != null) { + parameters = "jobName=" + paramJobName; + } + if (isStartWithSavePoint) { + parameters = parameters + "&isStartWithSavePoint=true"; + } + Response response = + given().body(requestBody) + .header("Content-Type", "application/json; charset=utf-8") + .post( + parameters == null + ? http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + : http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + return response; + } + + private GenericContainer createServer(String networkAlias, String role) + throws IOException, InterruptedException { + + GenericContainer server = + new GenericContainer<>(getDockerImage()) + .withNetwork(NETWORK) + .withEnv("TZ", "UTC") + .withCommand( + ContainerUtil.adaptPathForWin(binPath.toString()) + " -r " + role) + .withNetworkAliases(networkAlias) + .withExposedPorts() + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger( + "seatunnel-engine:" + JDK_DOCKER_IMAGE))) + .waitingFor(Wait.forListeningPort()); + copySeaTunnelStarterToContainer(server); + server.setExposedPorts(Arrays.asList(5801)); + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"), + config.toString()); + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/"), + config.toString()); + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"), + hadoopJar.toString()); + server.start(); + // execute extra commands + executeExtraCommands(server); + ContainerUtil.copyConnectorJarToContainer( + server, + confFile, + getConnectorModulePath(), + getConnectorNamePrefix(), + getConnectorType(), + SEATUNNEL_HOME); + + return server; + } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml new file mode 100644 index 00000000000..0aeade2fc1d --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +hazelcast: + cluster-name: seatunnel + network: + rest-api: + enabled: true + endpoint-groups: + CLUSTER_WRITE: + enabled: true + DATA: + enabled: true + join: + tcp-ip: + enabled: true + member-list: + - secondServer + - server + port: + auto-increment: true + port-count: 100 + port: 5801 + properties: + hazelcast.invocation.max.retry.count: 20 + hazelcast.tcp.join.port.try.count: 30 + hazelcast.logging.type: log4j2 + hazelcast.operation.generic.thread.count: 50 + hazelcast.heartbeat.failuredetector.type: phi-accrual + hazelcast.heartbeat.interval.seconds: 2 + hazelcast.max.no.heartbeat.seconds: 180 + hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 + hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 + hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 + diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml new file mode 100644 index 00000000000..0aeade2fc1d --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +hazelcast: + cluster-name: seatunnel + network: + rest-api: + enabled: true + endpoint-groups: + CLUSTER_WRITE: + enabled: true + DATA: + enabled: true + join: + tcp-ip: + enabled: true + member-list: + - secondServer + - server + port: + auto-increment: true + port-count: 100 + port: 5801 + properties: + hazelcast.invocation.max.retry.count: 20 + hazelcast.tcp.join.port.try.count: 30 + hazelcast.logging.type: log4j2 + hazelcast.operation.generic.thread.count: 50 + hazelcast.heartbeat.failuredetector.type: phi-accrual + hazelcast.heartbeat.interval.seconds: 2 + hazelcast.max.no.heartbeat.seconds: 180 + hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 + hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 + hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 + diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options new file mode 100644 index 00000000000..f7d00c6eaf5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# JVM Heap +-Xms2g +-Xmx2g + +# JVM Dump +-XX:+HeapDumpOnOutOfMemoryError +-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server + +# Only used for test!!! We should make sure soft reference be collected ASAP +-XX:SoftRefLRUPolicyMSPerMB=1 diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options new file mode 100644 index 00000000000..f7d00c6eaf5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# JVM Heap +-Xms2g +-Xmx2g + +# JVM Dump +-XX:+HeapDumpOnOutOfMemoryError +-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server + +# Only used for test!!! We should make sure soft reference be collected ASAP +-XX:SoftRefLRUPolicyMSPerMB=1 diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml new file mode 100644 index 00000000000..24aab7762c7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +seatunnel: + engine: + history-job-expire-minutes: 1440 + backup-count: 2 + queue-type: blockingqueue + print-execution-info-interval: 10 + slot-service: + dynamic-slot: true + checkpoint: + interval: 300000 + timeout: 100000 + storage: + type: localfile + max-retained: 3 + plugin-config: + namespace: /tmp/seatunnel/checkpoint_snapshot/ + http: + enable-http: true + port: 8080 + telemetry: + metric: + enabled: true diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf new file mode 100644 index 00000000000..6a7bcd46540 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + parallelism = 1 + result_table_name = "fake" + schema = { + fields { + c_map = "map>" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + c_row = { + c_map = "map>" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + } +} + +transform { +} + +sink { + console { + + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java index 36f500a2790..8e69abd5280 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java @@ -79,25 +79,13 @@ public static HazelcastInstanceImpl createMasterAndWorkerHazelcastInstance( seaTunnelConfig .getEngineConfig() .setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER); - return ((HazelcastInstanceProxy) - HazelcastInstanceFactory.newHazelcastInstance( - seaTunnelConfig.getHazelcastConfig(), - HazelcastInstanceFactory.createInstanceName( - seaTunnelConfig.getHazelcastConfig()), - new SeaTunnelNodeContext(seaTunnelConfig))) - .getOriginal(); + return initializeHazelcastInstance(seaTunnelConfig, null); } public static HazelcastInstanceImpl createMasterHazelcastInstance( @NonNull SeaTunnelConfig seaTunnelConfig) { seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER); - return ((HazelcastInstanceProxy) - HazelcastInstanceFactory.newHazelcastInstance( - seaTunnelConfig.getHazelcastConfig(), - HazelcastInstanceFactory.createInstanceName( - seaTunnelConfig.getHazelcastConfig()), - new SeaTunnelNodeContext(seaTunnelConfig))) - .getOriginal(); + return initializeHazelcastInstance(seaTunnelConfig, null); } public static HazelcastInstanceImpl createWorkerHazelcastInstance( @@ -105,13 +93,7 @@ public static HazelcastInstanceImpl createWorkerHazelcastInstance( seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER); // in hazelcast lite node will not store IMap data. seaTunnelConfig.getHazelcastConfig().setLiteMember(true); - return ((HazelcastInstanceProxy) - HazelcastInstanceFactory.newHazelcastInstance( - seaTunnelConfig.getHazelcastConfig(), - HazelcastInstanceFactory.createInstanceName( - seaTunnelConfig.getHazelcastConfig()), - new SeaTunnelNodeContext(seaTunnelConfig))) - .getOriginal(); + return initializeHazelcastInstance(seaTunnelConfig, null); } public static HazelcastInstanceImpl createHazelcastInstance() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java index e50bffd5471..c75a3324c8f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java @@ -39,84 +39,87 @@ public JobThreadPoolStatusExports(Node node) { @Override public List collect() { List mfs = new ArrayList(); - - ThreadPoolStatus threadPoolStatusMetrics = getServer().getThreadPoolStatusMetrics(); - List labelNames = clusterLabelNames(ADDRESS, "type"); - - GaugeMetricFamily activeCount = - new GaugeMetricFamily( - "job_thread_pool_activeCount", - String.format(HELP, "activeCount"), - labelNames); - activeCount.addMetric( - labelValues(localAddress(), "activeCount"), - threadPoolStatusMetrics.getActiveCount()); - mfs.add(activeCount); - - CounterMetricFamily completedTask = - new CounterMetricFamily( - "job_thread_pool_completedTask", - String.format(HELP, "completedTask"), - labelNames); - completedTask.addMetric( - labelValues(localAddress(), "completedTask"), - threadPoolStatusMetrics.getCompletedTaskCount()); - mfs.add(completedTask); - - GaugeMetricFamily corePoolSize = - new GaugeMetricFamily( - "job_thread_pool_corePoolSize", - String.format(HELP, "corePoolSize"), - labelNames); - corePoolSize.addMetric( - labelValues(localAddress(), "corePoolSize"), - threadPoolStatusMetrics.getCorePoolSize()); - mfs.add(corePoolSize); - - GaugeMetricFamily maximumPoolSize = - new GaugeMetricFamily( - "job_thread_pool_maximumPoolSize", - String.format(HELP, "maximumPoolSize"), - labelNames); - maximumPoolSize.addMetric( - labelValues(localAddress(), "maximumPoolSize"), - threadPoolStatusMetrics.getMaximumPoolSize()); - mfs.add(maximumPoolSize); - - GaugeMetricFamily poolSize = - new GaugeMetricFamily( - "job_thread_pool_poolSize", String.format(HELP, "poolSize"), labelNames); - poolSize.addMetric( - labelValues(localAddress(), "poolSize"), threadPoolStatusMetrics.getPoolSize()); - mfs.add(poolSize); - - CounterMetricFamily taskCount = - new CounterMetricFamily( - "job_thread_pool_task", String.format(HELP, "taskCount"), labelNames); - taskCount.addMetric( - labelValues(localAddress(), "taskCount"), threadPoolStatusMetrics.getTaskCount()); - mfs.add(taskCount); - - GaugeMetricFamily queueTaskCount = - new GaugeMetricFamily( - "job_thread_pool_queueTaskCount", - String.format(HELP, "queueTaskCount"), - labelNames); - queueTaskCount.addMetric( - labelValues(localAddress(), "queueTaskCount"), - threadPoolStatusMetrics.getQueueTaskCount()); - mfs.add(queueTaskCount); - - CounterMetricFamily rejectedTaskCount = - new CounterMetricFamily( - "job_thread_pool_rejection", - String.format(HELP, "rejectionCount"), - labelNames); - rejectedTaskCount.addMetric( - labelValues(localAddress(), "rejectionCount"), - threadPoolStatusMetrics.getRejectionCount()); - mfs.add(rejectedTaskCount); - + if (isMaster()) { + ThreadPoolStatus threadPoolStatusMetrics = getServer().getThreadPoolStatusMetrics(); + List labelNames = clusterLabelNames(ADDRESS, "type"); + + GaugeMetricFamily activeCount = + new GaugeMetricFamily( + "job_thread_pool_activeCount", + String.format(HELP, "activeCount"), + labelNames); + activeCount.addMetric( + labelValues(localAddress(), "activeCount"), + threadPoolStatusMetrics.getActiveCount()); + mfs.add(activeCount); + + CounterMetricFamily completedTask = + new CounterMetricFamily( + "job_thread_pool_completedTask", + String.format(HELP, "completedTask"), + labelNames); + completedTask.addMetric( + labelValues(localAddress(), "completedTask"), + threadPoolStatusMetrics.getCompletedTaskCount()); + mfs.add(completedTask); + + GaugeMetricFamily corePoolSize = + new GaugeMetricFamily( + "job_thread_pool_corePoolSize", + String.format(HELP, "corePoolSize"), + labelNames); + corePoolSize.addMetric( + labelValues(localAddress(), "corePoolSize"), + threadPoolStatusMetrics.getCorePoolSize()); + mfs.add(corePoolSize); + + GaugeMetricFamily maximumPoolSize = + new GaugeMetricFamily( + "job_thread_pool_maximumPoolSize", + String.format(HELP, "maximumPoolSize"), + labelNames); + maximumPoolSize.addMetric( + labelValues(localAddress(), "maximumPoolSize"), + threadPoolStatusMetrics.getMaximumPoolSize()); + mfs.add(maximumPoolSize); + + GaugeMetricFamily poolSize = + new GaugeMetricFamily( + "job_thread_pool_poolSize", + String.format(HELP, "poolSize"), + labelNames); + poolSize.addMetric( + labelValues(localAddress(), "poolSize"), threadPoolStatusMetrics.getPoolSize()); + mfs.add(poolSize); + + CounterMetricFamily taskCount = + new CounterMetricFamily( + "job_thread_pool_task", String.format(HELP, "taskCount"), labelNames); + taskCount.addMetric( + labelValues(localAddress(), "taskCount"), + threadPoolStatusMetrics.getTaskCount()); + mfs.add(taskCount); + + GaugeMetricFamily queueTaskCount = + new GaugeMetricFamily( + "job_thread_pool_queueTaskCount", + String.format(HELP, "queueTaskCount"), + labelNames); + queueTaskCount.addMetric( + labelValues(localAddress(), "queueTaskCount"), + threadPoolStatusMetrics.getQueueTaskCount()); + mfs.add(queueTaskCount); + + CounterMetricFamily rejectedTaskCount = + new CounterMetricFamily( + "job_thread_pool_rejection", + String.format(HELP, "rejectionCount"), + labelNames); + rejectedTaskCount.addMetric( + labelValues(localAddress(), "rejectionCount"), + threadPoolStatusMetrics.getRejectionCount()); + mfs.add(rejectedTaskCount); + } return mfs; } }