Skip to content

Commit

Permalink
[Feature][Zeta] Submit job scheduling support pending (#7693)
Browse files Browse the repository at this point in the history
Co-authored-by: Jia Fan <fanjiaeminem@qq.com>
  • Loading branch information
zhangshenghang and Hisoka-X authored Oct 18, 2024
1 parent ed90a7c commit ebd1609
Show file tree
Hide file tree
Showing 25 changed files with 974 additions and 71 deletions.
18 changes: 18 additions & 0 deletions docs/en/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ seatunnel:
classloader-cache-mode: true
```

### 4.6 Job Scheduling Strategy

When resources are insufficient, the job scheduling strategy can be configured in the following two modes:

1. `WAIT`: Wait for resources to be available.

2. `REJECT`: Reject the job, default value.

Example

```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```

When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots.

## 5. Configure The SeaTunnel Engine Network Service

All SeaTunnel Engine network-related configurations are in the `hazelcast.yaml` file.
Expand Down
17 changes: 17 additions & 0 deletions docs/en/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,23 @@ netty-common-4.1.89.Final.jar
seatunnel-hadoop3-3.1.4-uber.jar
```

### 4.7 Job Scheduling Strategy

When resources are insufficient, the job scheduling strategy can be configured in the following two modes:

1. `WAIT`: Wait for resources to be available.

2. `REJECT`: Reject the job, default value.

Example

```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```
When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots.

## 5. Configuring SeaTunnel Engine Network Services

All network-related configurations of the SeaTunnel Engine are in the `hazelcast-master.yaml` and `hazelcast-worker.yaml` files.
Expand Down
17 changes: 17 additions & 0 deletions docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ seatunnel:
classloader-cache-mode: true
```

### 4.6 作业调度策略

当资源不足时,作业调度策略可以配置为以下两种模式:

1. `WAIT`:等待资源可用。
2. `REJECT`:拒绝作业,默认值。

示例

```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```

当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。

## 5. 配置 SeaTunnel Engine 网络服务

所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中.
Expand Down
17 changes: 17 additions & 0 deletions docs/zh/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,23 @@ netty-common-4.1.89.Final.jar
seatunnel-hadoop3-3.1.4-uber.jar
```
### 4.7 作业调度策略
当资源不足时,作业调度策略可以配置为以下两种模式:
1. `WAIT`:等待资源可用。
2. `REJECT`:拒绝作业,默认值。
示例
```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```

`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。

## 5. 配置 SeaTunnel Engine 网络服务

所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml``hazelcast-worker.yaml` 文件中.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.client.job.JobStatusRunner;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.EngineConfig;
Expand Down Expand Up @@ -187,7 +188,8 @@ public void execute() throws CommandExecuteException {
long jobId = clientJobProxy.getJobId();
JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService =
Executors.newSingleThreadScheduledExecutor(
Executors.newScheduledThreadPool(
2,
new ThreadFactoryBuilder()
.setNameFormat("job-metrics-runner-%d")
.setDaemon(true)
Expand All @@ -197,6 +199,12 @@ public void execute() throws CommandExecuteException {
0,
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);

executorService.schedule(
new JobStatusRunner(engineClient.getJobClient(), jobId),
0,
TimeUnit.SECONDS);

// wait for job complete
JobResult jobResult = clientJobProxy.waitForJobCompleteV2();
jobStatus = jobResult.getStatus();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.client.job;

import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.core.job.JobStatus;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JobStatusRunner implements Runnable {

private final JobClient jobClient;
private final Long jobId;
private boolean isEnterPending = false;

public JobStatusRunner(JobClient jobClient, Long jobId) {
this.jobClient = jobClient;
this.jobId = jobId;
}

@Override
public void run() {
Thread.currentThread().setName("job-status-runner-" + jobId);
try {
while (isPrint(jobClient.getJobStatus(jobId))) {
Thread.sleep(5000);
}
} catch (Exception e) {
log.error("Failed to get job runner status. {}", ExceptionUtils.getMessage(e));
}
}

private boolean isPrint(String jobStatus) {
boolean isPrint = true;
switch (JobStatus.fromString(jobStatus)) {
case PENDING:
isEnterPending = true;
log.info(
"Job Id : {} enter pending queue, current status:{} ,please wait task schedule",
jobId,
jobStatus);
break;
case RUNNING:
case SCHEDULED:
case FAILING:
case FAILED:
case DOING_SAVEPOINT:
case SAVEPOINT_DONE:
case CANCELING:
case CANCELED:
case FINISHED:
case UNKNOWABLE:
if (isEnterPending) {
// Log only if it transitioned from the PENDING state
log.info(
"Job ID: {} has been scheduled and entered the next state. Current status: {}",
jobId,
jobStatus);
}
isPrint = false;
default:
break;
}
return isPrint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,11 @@ public void testGetJobInfo() {
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
long jobId = clientJobProxy.getJobId();

// Running
Assertions.assertNotNull(jobClient.getJobInfo(jobId));
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() -> {
Assertions.assertNotNull(jobClient.getJobInfo(jobId));
});

await().atMost(180000, TimeUnit.MILLISECONDS)
.untilAsserted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;

import org.awaitility.Awaitility;
Expand Down Expand Up @@ -173,6 +176,136 @@ public void canNotSubmitJobWhenHaveNoWorkerNode() {
}
}

@SneakyThrows
@Test
public void enterPendingWhenResourcesNotEnough() {
HazelcastInstanceImpl masterNode = null;
String testClusterName = "Test_enterPendingWhenResourcesNotEnough";
SeaTunnelClient seaTunnelClient = null;

SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
// set job pending
EngineConfig engineConfig = seaTunnelConfig.getEngineConfig();
engineConfig.setScheduleStrategy(ScheduleStrategy.WAIT);
engineConfig.getSlotServiceConfig().setDynamicSlot(false);
engineConfig.getSlotServiceConfig().setSlotNum(3);
seaTunnelConfig
.getHazelcastConfig()
.setClusterName(TestUtils.getClusterName(testClusterName));

// submit job
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("Test_enterPendingWhenResourcesNotEnough");

try {
// master node must start first in ci
masterNode = SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);

HazelcastInstanceImpl finalMasterNode = masterNode;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
1, finalMasterNode.getCluster().getMembers().size()));

// new seatunnel client and submit job
seaTunnelClient = createSeaTunnelClient(testClusterName);
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
clientJobProxy.getJobStatus(), JobStatus.PENDING));
// start two worker nodes
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);

// There are already resources available, wait for job enter running or complete
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED, clientJobProxy.getJobStatus()));
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (seaTunnelClient != null) {
seaTunnelClient.close();
}
if (masterNode != null) {
masterNode.shutdown();
}
}
}

@SneakyThrows
@Test
public void pendingJobCancel() {
HazelcastInstanceImpl masterNode = null;
String clusterAndJobName = "Test_pendingJobCancel";
SeaTunnelClient seaTunnelClient = null;

SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
// set job pending
EngineConfig engineConfig = seaTunnelConfig.getEngineConfig();
engineConfig.setScheduleStrategy(ScheduleStrategy.WAIT);
engineConfig.getSlotServiceConfig().setDynamicSlot(false);
engineConfig.getSlotServiceConfig().setSlotNum(1);

seaTunnelConfig
.getHazelcastConfig()
.setClusterName(TestUtils.getClusterName(clusterAndJobName));

// submit job
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName(clusterAndJobName);

try {
// master node must start first in ci
masterNode = SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);

// new seatunnel client and submit job
seaTunnelClient = createSeaTunnelClient(clusterAndJobName);
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
clientJobProxy.getJobStatus(), JobStatus.PENDING));

// Cancel the job in the pending state
seaTunnelClient.getJobClient().cancelJob(clientJobProxy.getJobId());
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertNotEquals(
clientJobProxy.getJobStatus(), JobStatus.CANCELED));

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (seaTunnelClient != null) {
seaTunnelClient.close();
}
if (masterNode != null) {
masterNode.shutdown();
}
}
}

private SeaTunnelClient createSeaTunnelClient(String clusterName) {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(clusterName));
Expand Down
Loading

0 comments on commit ebd1609

Please sign in to comment.