Skip to content

Commit

Permalink
[Improve][ST-Engine] Improve log output (#3651)
Browse files Browse the repository at this point in the history
* improve log output

* Update seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java

Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com>

Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com>
  • Loading branch information
EricJoy2048 and TaoZex authored Dec 6, 2022
1 parent 35c020f commit 0b1bdb7
Showing 1 changed file with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,31 @@ public PhysicalVertex(int subTaskGroupIndex,
}

this.nodeEngine = nodeEngine;
this.taskFullName =
String.format(
"Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
pipelineId,
totalPipelineNum,
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
parallelism,
taskGroupLocation);
if (LOGGER.isFineEnabled() || LOGGER.isFinestEnabled()) {
this.taskFullName =
String.format(
"Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
pipelineId,
totalPipelineNum,
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
parallelism,
taskGroupLocation);
} else {
this.taskFullName =
String.format(
"Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
pipelineId,
totalPipelineNum,
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
parallelism);
}

this.taskFuture = new CompletableFuture<>();

this.runningJobStateIMap = runningJobStateIMap;
Expand All @@ -167,7 +181,8 @@ public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
this.taskFuture = new CompletableFuture<>();
ExecutionState executionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
if (executionState != null) {
LOGGER.info(String.format("The task %s is in state %s when init state future", taskFullName, executionState));
LOGGER.info(
String.format("The task %s is in state %s when init state future", taskFullName, executionState));
}
// If the task state is CANCELING we need call noticeTaskExecutionServiceCancel().
if (ExecutionState.CANCELING.equals(executionState)) {
Expand Down Expand Up @@ -329,7 +344,8 @@ private void noticeTaskExecutionServiceCancel() {
while (!taskFuture.isDone() && nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null) {
try {
i++;
LOGGER.info(String.format("send cancel %s operator to member %s", taskFullName, getCurrentExecutionAddress()));
LOGGER.info(
String.format("Send cancel %s operator to member %s", taskFullName, getCurrentExecutionAddress()));
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
new CancelTaskOperation(taskGroupLocation),
getCurrentExecutionAddress())
Expand Down

0 comments on commit 0b1bdb7

Please sign in to comment.