Skip to content

Commit

Permalink
[Hotfix][Zeta] Fix job can not restore when last checkpoint failed (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored and chaorongzhi committed Aug 21, 2024
1 parent 9d018ee commit 3e8c69e
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private void initResourceManager() {
@Override
public List<MultiTableAggregatedCommitInfo> commit(
List<MultiTableAggregatedCommitInfo> aggregatedCommitInfo) throws IOException {
List<MultiTableAggregatedCommitInfo> errorList = new ArrayList<>();
for (String sinkIdentifier : aggCommitters.keySet()) {
SinkAggregatedCommitter<?, ?> sinkCommitter = aggCommitters.get(sinkIdentifier);
if (sinkCommitter != null) {
Expand All @@ -85,10 +86,20 @@ public List<MultiTableAggregatedCommitInfo> commit(
.get(sinkIdentifier))
.filter(Objects::nonNull)
.collect(Collectors.toList());
sinkCommitter.commit(commitInfo);
List errCommitList = sinkCommitter.commit(commitInfo);
if (errCommitList.size() == 0) {
continue;
}

for (int i = 0; i < errCommitList.size(); i++) {
if (errorList.size() < i + 1) {
errorList.add(i, new MultiTableAggregatedCommitInfo(new HashMap<>()));
}
errorList.get(i).getCommitInfo().put(sinkIdentifier, errCommitList.get(i));
}
}
}
return new ArrayList<>();
return errorList;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,31 @@ void afterClass() {
hazelcastInstance.shutdown();
}
}

@Test
public void testLastCheckpointErrorJob() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("batch_last_checkpoint_error.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("batch_last_checkpoint_error");

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

await().atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
objectCompletableFuture.isDone()
&& JobStatus.FAILED.equals(
objectCompletableFuture.get())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class RestApiIT {

private static HazelcastInstanceImpl node2;

private static SeaTunnelClient engineClient;

private static final String jobName = "test测试";
private static final String paramJobName = "param_test测试";

Expand All @@ -80,7 +82,7 @@ void beforeClass() throws Exception {

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);

Expand Down Expand Up @@ -456,6 +458,10 @@ public void testEncryptConfig() {

@AfterEach
void afterClass() {
if (engineClient != null) {
engineClient.close();
}

if (node1 != null) {
node1.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
parallelism = 1
result_table_name = "fake"
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
c_row = {
c_map = "map<string, map<string, string>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
}

transform {
}

sink {
LocalFile {
path = "/hive/warehouse/test1"
field_delimiter = "\t"
row_delimiter = "\n"
partition_by = ["c_string"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "text"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
save_mode = "error"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,18 +470,24 @@ public static Map<Long, Integer> getPipelineTasks(Set<TaskLocation> pipelineSubt
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().size()));
}

@SneakyThrows
public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
LOG.info(String.format("Start save point for Job (%s)", jobId));
if (!isAllTaskReady) {
CompletableFuture savepointFuture = new CompletableFuture();
CompletableFuture<CompletedCheckpoint> savepointFuture = new CompletableFuture<>();
savepointFuture.completeExceptionally(
new CheckpointException(
CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT));
return new PassiveCompletableFuture<>(savepointFuture);
}
CompletableFuture<PendingCheckpoint> savepoint =
createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
startTriggerPendingCheckpoint(savepoint);
CompletableFuture<PendingCheckpoint> savepoint;
synchronized (lock) {
while (pendingCounter.get() > 0) {
Thread.sleep(500);
}
savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
startTriggerPendingCheckpoint(savepoint);
}
PendingCheckpoint savepointPendingCheckpoint = savepoint.join();
LOG.info(
String.format(
Expand Down Expand Up @@ -827,6 +833,18 @@ public boolean isCompleted() {
&& !latestCompletedCheckpoint.isRestored();
}

public boolean isNoErrorCompleted() {
if (latestCompletedCheckpoint == null) {
return false;
}
CheckpointCoordinatorStatus status =
(CheckpointCoordinatorStatus) runningJobStateIMap.get(checkpointStateImapKey);
return latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint()
&& (status.equals(CheckpointCoordinatorStatus.FINISHED)
|| status.equals(CheckpointCoordinatorStatus.SUSPEND))
&& !latestCompletedCheckpoint.isRestored();
}

public boolean isEndOfSavePoint() {
if (latestCompletedCheckpoint == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,6 @@ public PassiveCompletableFuture<CompletedCheckpoint>[] triggerSavePoints() {
.toArray(PassiveCompletableFuture[]::new);
}

/**
* Called by the JobMaster, actually triggered by the user. <br>
* After the savepoint is triggered, it will cause the pipeline to stop automatically.
*/
public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int pipelineId) {
return getCheckpointCoordinator(pipelineId).startSavepoint();
}

public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) {
log.info(
"reported pipeline running stack: "
Expand Down Expand Up @@ -253,7 +245,7 @@ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
* the pipeline has been completed;
*/
public boolean isCompletedPipeline(int pipelineId) {
return getCheckpointCoordinator(pipelineId).isCompleted();
return getCheckpointCoordinator(pipelineId).isNoErrorCompleted();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ public void handleCheckpointError() {
log.warn(
String.format(
"%s checkpoint have error, cancel the pipeline", getPipelineFullName()));
this.cancelPipeline();
if (!getPipelineState().isEndState()) {
updatePipelineState(PipelineStatus.CANCELING);
}
}

public void startSubPlanStateProcess() {
Expand Down

0 comments on commit 3e8c69e

Please sign in to comment.