Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Zeta] Submit job scheduling support pending #7693

Merged
merged 58 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
cf958f9
Merge remote-tracking branch 'upstream/dev' into dev
zhangshenghang Sep 1, 2024
9e5b72b
Merge remote-tracking branch 'upstream/dev' into HEAD
zhangshenghang Sep 11, 2024
00f30b7
init
zhangshenghang Sep 12, 2024
40be145
[feature]fix some problem
zhangshenghang Sep 12, 2024
a19d833
fix some problem
zhangshenghang Sep 12, 2024
dd77a10
fix some problem
zhangshenghang Sep 13, 2024
5c7ca61
Modify the expected result judgment when there are no resources avail…
zhangshenghang Sep 13, 2024
708e04a
improve
zhangshenghang Sep 17, 2024
a0a3e1c
improve
zhangshenghang Sep 17, 2024
f187a81
add job-pending params
zhangshenghang Sep 18, 2024
e30d357
improve
zhangshenghang Sep 18, 2024
f1ee1c5
add test case
zhangshenghang Sep 18, 2024
041e366
improve
zhangshenghang Sep 18, 2024
0744cc6
improve
zhangshenghang Sep 18, 2024
5da49f2
improve
zhangshenghang Sep 18, 2024
d77d2bb
improve
zhangshenghang Sep 18, 2024
9667bb2
add test case
zhangshenghang Sep 19, 2024
dd3fdce
format
zhangshenghang Sep 18, 2024
1be10d3
improve
zhangshenghang Sep 20, 2024
8dd9fab
Merge branch 'apache:dev' into feature-task-pending
zhangshenghang Sep 21, 2024
3a31cc2
Update docs/en/seatunnel-engine/hybrid-cluster-deployment.md
zhangshenghang Sep 21, 2024
a4dc283
Update docs/en/seatunnel-engine/separated-cluster-deployment.md
zhangshenghang Sep 21, 2024
4d5d8b3
Update seatunnel-engine/seatunnel-engine-common/src/main/java/org/apa…
zhangshenghang Sep 21, 2024
bc1a16a
Merge branch 'feature-task-pending' of github.com:zhangshenghang/seat…
zhangshenghang Sep 21, 2024
47b4933
improve
zhangshenghang Sep 22, 2024
fc34cd5
Merge remote-tracking branch 'upstream/dev' into feature-task-pending
zhangshenghang Sep 23, 2024
390450f
Merge remote-tracking branch 'upstream/dev' into feature-task-pending
zhangshenghang Sep 24, 2024
ba8798a
fix problem
zhangshenghang Sep 24, 2024
137e3bc
Merge remote-tracking branch 'upstream/dev' into feature-task-pending
zhangshenghang Sep 25, 2024
66c730a
fix problem
zhangshenghang Sep 25, 2024
c154fee
improve
zhangshenghang Sep 26, 2024
d0810f3
improve
zhangshenghang Sep 26, 2024
166fe19
[improve] fix ci
zhangshenghang Sep 27, 2024
9b173e7
[improve] code style
zhangshenghang Sep 27, 2024
e2edb12
[improve] code style
zhangshenghang Sep 27, 2024
2cece10
[improve]
zhangshenghang Sep 28, 2024
2b68f05
Merge branch 'apache:dev' into feature-task-pending
zhangshenghang Sep 29, 2024
fe05a67
improve
zhangshenghang Oct 7, 2024
c44c9d1
improve
zhangshenghang Oct 7, 2024
7ab4b7d
improve
zhangshenghang Oct 8, 2024
b76ce60
merge
zhangshenghang Oct 8, 2024
953387c
improve
zhangshenghang Oct 9, 2024
7e3102c
update doc
zhangshenghang Oct 9, 2024
6ef8c2a
update doc
zhangshenghang Oct 9, 2024
ecfa1cf
improve
zhangshenghang Oct 9, 2024
bdf245c
improve
zhangshenghang Oct 10, 2024
8d73f21
improve
zhangshenghang Oct 10, 2024
bd738ed
improve
zhangshenghang Oct 11, 2024
9906a6f
improve
zhangshenghang Oct 11, 2024
7fc01bf
improve
zhangshenghang Oct 11, 2024
a6a2dd8
Merge remote-tracking branch 'upstream/dev' into feature-task-pending
zhangshenghang Oct 12, 2024
da4cd11
improve
zhangshenghang Oct 12, 2024
696e942
improve
zhangshenghang Oct 12, 2024
91c7f93
improve
zhangshenghang Oct 14, 2024
54946ce
add-license
zhangshenghang Oct 14, 2024
fd3a431
add desc
zhangshenghang Oct 16, 2024
b222b24
update job status runner
zhangshenghang Oct 17, 2024
785f085
improve peekqueue
zhangshenghang Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/en/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ seatunnel:
classloader-cache-mode: true
```

### 4.6 Job Scheduling Strategy

When resources are insufficient, the job scheduling strategy can be configured in the following two modes:

1. `WAIT`: Wait for resources to be available.

2. `REJECT`: Reject the job, default value.

Example

```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```

When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots.

## 5. Configure The SeaTunnel Engine Network Service

All SeaTunnel Engine network-related configurations are in the `hazelcast.yaml` file.
Expand Down
17 changes: 17 additions & 0 deletions docs/en/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,23 @@ netty-common-4.1.89.Final.jar
seatunnel-hadoop3-3.1.4-uber.jar
```

### 4.7 Job Scheduling Strategy

When resources are insufficient, the job scheduling strategy can be configured in the following two modes:

1. `WAIT`: Wait for resources to be available.

2. `REJECT`: Reject the job, default value.

Example

```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```
When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots.

## 5. Configuring SeaTunnel Engine Network Services

All network-related configurations of the SeaTunnel Engine are in the `hazelcast-master.yaml` and `hazelcast-worker.yaml` files.
Expand Down
17 changes: 17 additions & 0 deletions docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ seatunnel:
classloader-cache-mode: true
```

### 4.6 作业调度策略

当资源不足时,作业调度策略可以配置为以下两种模式:

1. `WAIT`:等待资源可用。
2. `REJECT`:拒绝作业,默认值。

示例

```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```

当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。

## 5. 配置 SeaTunnel Engine 网络服务

所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中.
Expand Down
17 changes: 17 additions & 0 deletions docs/zh/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,23 @@ netty-common-4.1.89.Final.jar
seatunnel-hadoop3-3.1.4-uber.jar
```

### 4.7 作业调度策略

当资源不足时,作业调度策略可以配置为以下两种模式:

1. `WAIT`:等待资源可用。
2. `REJECT`:拒绝作业,默认值。

示例

```yaml
seatunnel:
engine:
job-schedule-strategy: WAIT
```

当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。

## 5. 配置 SeaTunnel Engine 网络服务

所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml`和`hazelcast-worker.yaml` 文件中.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.client.job.JobStatusRunner;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.EngineConfig;
Expand Down Expand Up @@ -187,7 +188,8 @@ public void execute() throws CommandExecuteException {
long jobId = clientJobProxy.getJobId();
JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService =
Executors.newSingleThreadScheduledExecutor(
Executors.newScheduledThreadPool(
2,
new ThreadFactoryBuilder()
.setNameFormat("job-metrics-runner-%d")
.setDaemon(true)
Expand All @@ -197,6 +199,12 @@ public void execute() throws CommandExecuteException {
0,
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);

executorService.schedule(
new JobStatusRunner(engineClient.getJobClient(), jobId),
0,
TimeUnit.SECONDS);

// wait for job complete
JobResult jobResult = clientJobProxy.waitForJobCompleteV2();
jobStatus = jobResult.getStatus();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.client.job;

import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.core.job.JobStatus;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JobStatusRunner implements Runnable {

private final JobClient jobClient;
private final Long jobId;
private boolean isEnterPending = false;

public JobStatusRunner(JobClient jobClient, Long jobId) {
this.jobClient = jobClient;
this.jobId = jobId;
}

@Override
public void run() {
Thread.currentThread().setName("job-status-runner-" + jobId);
try {
while (isPrint(jobClient.getJobStatus(jobId))) {
Thread.sleep(5000);
}
} catch (Exception e) {
log.error("Failed to get job runner status. {}", ExceptionUtils.getMessage(e));
}
}

private boolean isPrint(String jobStatus) {
boolean isPrint = true;
switch (JobStatus.fromString(jobStatus)) {
case PENDING:
isEnterPending = true;
log.info(
"Job Id : {} enter pending queue, current status:{} ,please wait task schedule",
jobId,
jobStatus);
break;
case RUNNING:
case SCHEDULED:
case FAILING:
case FAILED:
case DOING_SAVEPOINT:
case SAVEPOINT_DONE:
case CANCELING:
case CANCELED:
case FINISHED:
case UNKNOWABLE:
if (isEnterPending) {
// Log only if it transitioned from the PENDING state
log.info(
"Job ID: {} has been scheduled and entered the next state. Current status: {}",
jobId,
jobStatus);
}
isPrint = false;
default:
break;
}
return isPrint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,11 @@ public void testGetJobInfo() {
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
long jobId = clientJobProxy.getJobId();

// Running
Assertions.assertNotNull(jobClient.getJobInfo(jobId));
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() -> {
Assertions.assertNotNull(jobClient.getJobInfo(jobId));
});

await().atMost(180000, TimeUnit.MILLISECONDS)
.untilAsserted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;

import org.awaitility.Awaitility;
Expand Down Expand Up @@ -173,6 +176,136 @@ public void canNotSubmitJobWhenHaveNoWorkerNode() {
}
}

@SneakyThrows
@Test
public void enterPendingWhenResourcesNotEnough() {
HazelcastInstanceImpl masterNode = null;
String testClusterName = "Test_enterPendingWhenResourcesNotEnough";
SeaTunnelClient seaTunnelClient = null;

SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
// set job pending
EngineConfig engineConfig = seaTunnelConfig.getEngineConfig();
engineConfig.setScheduleStrategy(ScheduleStrategy.WAIT);
engineConfig.getSlotServiceConfig().setDynamicSlot(false);
engineConfig.getSlotServiceConfig().setSlotNum(3);
seaTunnelConfig
.getHazelcastConfig()
.setClusterName(TestUtils.getClusterName(testClusterName));

// submit job
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("Test_enterPendingWhenResourcesNotEnough");

try {
// master node must start first in ci
masterNode = SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);

HazelcastInstanceImpl finalMasterNode = masterNode;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
1, finalMasterNode.getCluster().getMembers().size()));

// new seatunnel client and submit job
seaTunnelClient = createSeaTunnelClient(testClusterName);
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
clientJobProxy.getJobStatus(), JobStatus.PENDING));
// start two worker nodes
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);

// There are already resources available, wait for job enter running or complete
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED, clientJobProxy.getJobStatus()));
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (seaTunnelClient != null) {
seaTunnelClient.close();
}
if (masterNode != null) {
masterNode.shutdown();
}
}
}

@SneakyThrows
@Test
public void pendingJobCancel() {
HazelcastInstanceImpl masterNode = null;
String clusterAndJobName = "Test_pendingJobCancel";
SeaTunnelClient seaTunnelClient = null;

SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
// set job pending
EngineConfig engineConfig = seaTunnelConfig.getEngineConfig();
engineConfig.setScheduleStrategy(ScheduleStrategy.WAIT);
engineConfig.getSlotServiceConfig().setDynamicSlot(false);
engineConfig.getSlotServiceConfig().setSlotNum(1);

seaTunnelConfig
.getHazelcastConfig()
.setClusterName(TestUtils.getClusterName(clusterAndJobName));

// submit job
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName(clusterAndJobName);

try {
// master node must start first in ci
masterNode = SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);

// new seatunnel client and submit job
seaTunnelClient = createSeaTunnelClient(clusterAndJobName);
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
clientJobProxy.getJobStatus(), JobStatus.PENDING));

// Cancel the job in the pending state
seaTunnelClient.getJobClient().cancelJob(clientJobProxy.getJobId());
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertNotEquals(
clientJobProxy.getJobStatus(), JobStatus.CANCELED));

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (seaTunnelClient != null) {
seaTunnelClient.close();
}
if (masterNode != null) {
masterNode.shutdown();
}
}
}

private SeaTunnelClient createSeaTunnelClient(String clusterName) {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(clusterName));
Expand Down
Loading
Loading