From c8b77bcd1c7a603935eb578f2a7e3ed35f4dde46 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sun, 24 Dec 2023 23:16:31 +0800 Subject: [PATCH] Recreate new TaskInstance Working Directory when exist in worker --- .../worker/runner/WorkerTaskExecutor.java | 2 +- .../utils/TaskExecutionContextUtils.java | 35 +++++++++++-------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java index c335f3499441..e47a28d264ca 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java @@ -217,7 +217,7 @@ protected void beforeExecute() { taskExecutionContext.setTenantCode(tenant); log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode()); - TaskExecutionContextUtils.createProcessLocalPathIfAbsent(taskExecutionContext); + TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext); log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath()); TaskChannel taskChannel = diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java index bbe3b1ab4b4c..dcfeec4e1c18 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java @@ -80,22 +80,29 @@ public static String getOrCreateTenant(WorkerConfig workerConfig, TaskExecutionC } } - public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException { + public static void createTaskInstanceWorkingDirectory(TaskExecutionContext taskExecutionContext) throws TaskException { + // local execute path + String taskInstanceWorkingDirectory = FileUtils.getProcessExecDir( + taskExecutionContext.getTenantCode(), + taskExecutionContext.getProjectCode(), + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); try { - // local execute path - String execLocalPath = FileUtils.getProcessExecDir( - taskExecutionContext.getTenantCode(), - taskExecutionContext.getProjectCode(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setExecutePath(execLocalPath); - taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(execLocalPath)); - Path executePath = Paths.get(taskExecutionContext.getExecutePath()); - FileUtils.createDirectoryIfNotPresent(executePath); + Path path = Paths.get(taskInstanceWorkingDirectory); + if (Files.deleteIfExists(path)) { + log.warn("The TaskInstance WorkingDirectory: {} is exist, will recreate again", + taskInstanceWorkingDirectory); + } + Files.createDirectories(path); + taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory); + + taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory); + taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(taskInstanceWorkingDirectory)); } catch (Throwable ex) { - throw new TaskException("Cannot create process execute dir", ex); + throw new TaskException( + "Cannot create TaskInstance WorkingDirectory: " + taskInstanceWorkingDirectory + " failed", ex); } }