From ed3d9d94d87a6e05f8726df06b331d2850b3444e Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 20 Jun 2023 01:56:34 +0800 Subject: [PATCH 1/6] [Improvement-13763][K8S Task] collect real-time log --- .../api/am/KubernetesApplicationManager.java | 18 ++-- .../plugin/task/api/k8s/AbstractK8sTask.java | 2 +- .../task/api/k8s/AbstractK8sTaskExecutor.java | 12 --- .../task/api/k8s/impl/K8sTaskExecutor.java | 93 +++++++++++++++---- .../plugin/task/api/utils/ProcessUtils.java | 13 +-- .../task/api/k8s/K8sTaskExecutorTest.java | 6 +- 6 files changed, 96 insertions(+), 48 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index 92aa6cb44796..ad360e53c6e1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -67,7 +67,7 @@ public boolean killApplication(ApplicationManagerContext applicationManagerConte boolean isKill; String labelValue = kubernetesApplicationManagerContext.getLabelValue(); FilterWatchListDeletable watchList = - getDriverPod(kubernetesApplicationManagerContext); + getListenPod(kubernetesApplicationManagerContext); try { if (getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) { log.error("Driver pod is in FAILED or UNKNOWN status."); @@ -97,7 +97,7 @@ public ResourceManagerType getResourceManagerType() { * @param kubernetesApplicationManagerContext * @return */ - private FilterWatchListDeletable getDriverPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + private FilterWatchListDeletable getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); String labelValue = kubernetesApplicationManagerContext.getLabelValue(); FilterWatchListDeletable watchList = client.pods() @@ -153,7 +153,7 @@ private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerCon String phase; try { if (Objects.isNull(watchList)) { - watchList = getDriverPod(kubernetesApplicationManagerContext); + watchList = getListenPod(kubernetesApplicationManagerContext); } List driverPod = watchList.list().getItems(); if (!driverPod.isEmpty()) { @@ -181,15 +181,15 @@ private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerCon public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); FilterWatchListDeletable watchList = - getDriverPod(kubernetesApplicationManagerContext); - List driverPod = watchList.list().getItems(); - if (CollectionUtils.isEmpty(driverPod)) { + getListenPod(kubernetesApplicationManagerContext); + List podList = watchList.list().getItems(); + if (CollectionUtils.isEmpty(podList)) { return null; } - Pod driver = driverPod.get(0); + Pod pod = podList.get(0); - return client.pods().inNamespace(driver.getMetadata().getNamespace()) - .withName(driver.getMetadata().getName()) + return client.pods().inNamespace(pod.getMetadata().getNamespace()) + .withName(pod.getMetadata().getName()) .watchLog(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java index 6f08dde14df5..0dbff0ba3305 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java @@ -48,7 +48,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { setExitStatusCode(response.getExitStatusCode()); setAppIds(response.getAppIds()); } catch (Exception e) { - log.error("k8s task submit failed with error", e); + log.error("k8s task submit failed with error"); exitStatusCode = -1; throw new TaskException("Execute k8s task error", e); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java index 6bdf8d39ed66..045af3a484e8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; - import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -32,14 +30,12 @@ public abstract class AbstractK8sTaskExecutor { protected Logger log; protected TaskExecutionContext taskRequest; protected K8sUtils k8sUtils; - protected StringBuilder logStringBuffer; protected Yaml yaml; protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext taskRequest) { this.log = log; this.taskRequest = taskRequest; this.k8sUtils = new K8sUtils(); - this.logStringBuffer = new StringBuilder(); this.yaml = new Yaml(); } @@ -53,14 +49,6 @@ public void waitTimeout(Boolean timeout) throws TaskException { } } - public void flushLog(TaskResponse taskResponse) { - if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode() == EXIT_CODE_FAILURE) { - log.error(logStringBuffer.toString()); - } else if (logStringBuffer.length() != 0) { - log.info(logStringBuffer.toString()); - } - } - public abstract void submitJob2k8s(String k8sParameterStr); public abstract void stopJobOnK8s(String k8sParameterStr); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 109304675ed1..a134cbfff7fb 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -31,7 +31,9 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -42,16 +44,23 @@ import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.commons.lang3.StringUtils; +import java.io.BufferedReader; +import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -66,6 +75,7 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.LogWatch; /** * K8sTaskExecutor used to submit k8s task to K8S @@ -73,6 +83,9 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { private Job job; + protected boolean podLogOutputIsFinished = false; + protected Future podLogOutputFuture; + public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) { super(logger, taskRequest); } @@ -98,6 +111,8 @@ public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { Map labelMap = k8STaskMainParameters.getLabelMap(); labelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE); labelMap.put(NAME_LABEL, k8sJobName); + Map podLabelMap = new HashMap<>(); + podLabelMap.put(UNIQUE_LABEL_NAME, taskRequest.getTaskAppId()); EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, taskInstanceId, null); List envVars = new ArrayList<>(); envVars.add(taskInstanceIdVar); @@ -139,6 +154,9 @@ public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { .withNewSpec() .withTtlSecondsAfterFinished(JOB_TTL_SECONDS) .withNewTemplate() + .withNewMetadata() + .withLabels(podLabelMap) + .endMetadata() .withNewSpec() .addNewContainer() .withName(k8sJobName) @@ -172,22 +190,25 @@ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse @Override public void eventReceived(Action action, Job job) { - log.info("event received : job:{} action:{}", job.getMetadata().getName(), action); - if (action != Action.ADDED) { - int jobStatus = getK8sJobStatus(job); - log.info("job {} status {}", job.getMetadata().getName(), jobStatus); - if (jobStatus == TaskConstants.RUNNING_CODE) { - return; + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath())) { + log.info("event received : job:{} action:{}", job.getMetadata().getName(), action); + if (action != Action.ADDED) { + int jobStatus = getK8sJobStatus(job); + log.info("job {} status {}", job.getMetadata().getName(), jobStatus); + if (jobStatus == TaskConstants.RUNNING_CODE) { + return; + } + setTaskStatus(jobStatus, taskInstanceId, taskResponse, k8STaskMainParameters); + countDownLatch.countDown(); } - setTaskStatus(jobStatus, taskInstanceId, taskResponse, k8STaskMainParameters); - countDownLatch.countDown(); } } @Override public void onClose(WatcherException e) { - logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), - e.getMessage())); + log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), e.getMessage()); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); countDownLatch.countDown(); } @@ -203,7 +224,7 @@ public void onClose(WatcherException e) { } else { countDownLatch.await(); } - flushLog(taskResponse); + // flushLog(taskResponse); } catch (InterruptedException e) { log.error("job failed in k8s: {}", e.getMessage(), e); Thread.currentThread().interrupt(); @@ -218,6 +239,32 @@ public void onClose(WatcherException e) { } } + private void parsePodLogOutput() { + ExecutorService collectPodLogExecutorService = ThreadUtils + .newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + taskRequest.getTaskName()); + + podLogOutputFuture = collectPodLogExecutorService.submit(() -> { + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), + taskRequest.getTaskAppId())) { + String line; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { + while ((line = reader.readLine()) != null) { + log.info("[K8S-pod-log] {}", line); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + podLogOutputIsFinished = true; + } + }); + + collectPodLogExecutorService.shutdown(); + } + @Override public TaskResponse run(String k8sParameterStr) throws Exception { TaskResponse result = new TaskResponse(); @@ -237,7 +284,17 @@ public TaskResponse run(String k8sParameterStr) throws Exception { String configYaml = k8sTaskExecutionContext.getConfigYaml(); k8sUtils.buildClient(configYaml); submitJob2k8s(k8sParameterStr); + parsePodLogOutput(); registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result, k8STaskMainParameters); + + if (podLogOutputFuture != null) { + try { + // Wait kubernetes pod log collection finished + podLogOutputFuture.get(); + } catch (ExecutionException e) { + log.error("Handle pod log error", e); + } + } } catch (Exception e) { cancelApplication(k8sParameterStr); result.setExitStatusCode(EXIT_CODE_FAILURE); @@ -265,9 +322,9 @@ public void submitJob2k8s(String k8sParameterStr) { stopJobOnK8s(k8sParameterStr); String namespaceName = k8STaskMainParameters.getNamespaceName(); k8sUtils.createJob(namespaceName, job); - log.info("[K8sJobExecutor-{}-{}] submitted job successfully", taskName, taskInstanceId); + log.info("[K8sJobExecutor-{}-{}] submitted job successfully", taskName, taskInstanceId); } catch (Exception e) { - log.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName, taskInstanceId); + log.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName, taskInstanceId); throw new TaskException("K8sJobExecutor fail to submit job", e); } } @@ -283,7 +340,7 @@ public void stopJobOnK8s(String k8sParameterStr) { k8sUtils.deleteJob(jobName, namespaceName); } } catch (Exception e) { - log.error("[K8sJobExecutor-{}] fail to stop job", jobName); + log.error("[K8sJobExecutor-{}] fail to stop job", jobName); throw new TaskException("K8sJobExecutor fail to stop job", e); } } @@ -303,17 +360,15 @@ public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse tas K8sTaskMainParameters k8STaskMainParameters) { if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) { - logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed", job.getMetadata().getName())); + log.info("[K8sJobExecutor-{}] killed", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_KILL); } else if (jobStatus == EXIT_CODE_SUCCESS) { - logStringBuffer - .append(String.format("[K8sJobExecutor-%s] succeed in k8s", job.getMetadata().getName())); + log.info("[K8sJobExecutor-{}] succeed in k8s", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); } else { String errorMessage = k8sUtils.getPodLog(job.getMetadata().getName(), k8STaskMainParameters.getNamespaceName()); - logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), - errorMessage)); + log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), errorMessage); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index d60c007c36b4..2956edead624 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -142,12 +142,13 @@ public static String getPidsStr(int processId) throws Exception { */ public static void cancelApplication(TaskExecutionContext taskExecutionContext) { try { - if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext()) && - !TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) { - applicationManagerMap.get(ResourceManagerType.KUBERNETES) - .killApplication(new KubernetesApplicationManagerContext( - taskExecutionContext.getK8sTaskExecutionContext(), - taskExecutionContext.getTaskAppId())); + if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) { + if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) { + applicationManagerMap.get(ResourceManagerType.KUBERNETES) + .killApplication(new KubernetesApplicationManagerContext( + taskExecutionContext.getK8sTaskExecutionContext(), + taskExecutionContext.getTaskAppId())); + } } else { String host = taskExecutionContext.getHost(); String executePath = taskExecutionContext.getExecutePath(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index 5ee023f1bc26..0119bce5adb8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -34,6 +34,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -41,6 +43,8 @@ public class K8sTaskExecutorTest { + private static final Logger logger = LoggerFactory.getLogger(K8sTaskExecutorTest.class); + private K8sTaskExecutor k8sTaskExecutor = null; private K8sTaskMainParameters k8sTaskMainParameters = null; private final String image = "ds-dev"; @@ -65,7 +69,7 @@ public void before() { requirement.setKey("node-label"); requirement.setOperator("In"); requirement.setValues(Arrays.asList("1234", "123456")); - k8sTaskExecutor = new K8sTaskExecutor(null, taskRequest); + k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest); k8sTaskMainParameters = new K8sTaskMainParameters(); k8sTaskMainParameters.setImage(image); k8sTaskMainParameters.setNamespaceName(namespaceName); From 4eea14f306064780122e1cc72a0e559d2f84f3cd Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 20 Jun 2023 11:06:12 +0800 Subject: [PATCH 2/6] fix codesmell --- .../plugin/task/api/AbstractCommandExecutor.java | 2 +- .../plugin/task/api/k8s/impl/K8sTaskExecutor.java | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 5fcdab0f5d75..154ea296d6d7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -261,7 +261,7 @@ public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Ex // Wait the task log process finished. taskOutputFuture.get(); } catch (ExecutionException e) { - logger.info("Handle task log error", e); + logger.error("Handle task log error", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index a134cbfff7fb..d0caf2b8a711 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -213,9 +213,7 @@ public void onClose(WatcherException e) { countDownLatch.countDown(); } }; - Watch watch = null; - try { - watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher); + try (Watch watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) { boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; if (timeoutFlag) { @@ -224,7 +222,6 @@ public void onClose(WatcherException e) { } else { countDownLatch.await(); } - // flushLog(taskResponse); } catch (InterruptedException e) { log.error("job failed in k8s: {}", e.getMessage(), e); Thread.currentThread().interrupt(); @@ -232,10 +229,6 @@ public void onClose(WatcherException e) { } catch (Exception e) { log.error("job failed in k8s: {}", e.getMessage(), e); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } finally { - if (watch != null) { - watch.close(); - } } } @@ -297,6 +290,7 @@ public TaskResponse run(String k8sParameterStr) throws Exception { } } catch (Exception e) { cancelApplication(k8sParameterStr); + Thread.currentThread().interrupt(); result.setExitStatusCode(EXIT_CODE_FAILURE); throw e; } From ba99579d13ee9fcbd5aa0364be7e7254517f2da3 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Mon, 10 Jul 2023 22:26:20 +0800 Subject: [PATCH 3/6] get pod watcher until pod is ready --- .../api/am/KubernetesApplicationManager.java | 24 ++++++++++++++----- .../task/api/k8s/impl/K8sTaskExecutor.java | 14 ++++------- .../task/api/k8s/K8sTaskExecutorTest.java | 3 +-- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index ad360e53c6e1..03f7bae4709e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.plugin.task.api.am; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME; import org.apache.dolphinscheduler.common.enums.ResourceManagerType; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -180,13 +182,23 @@ private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerCon */ public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); - FilterWatchListDeletable watchList = - getListenPod(kubernetesApplicationManagerContext); - List podList = watchList.list().getItems(); - if (CollectionUtils.isEmpty(podList)) { - return null; + boolean podReady = false; + Pod pod = null; + while (!podReady) { + FilterWatchListDeletable watchList = + getListenPod(kubernetesApplicationManagerContext); + List podList = watchList.list().getItems(); + if (CollectionUtils.isEmpty(podList)) { + return null; + } + pod = podList.get(0); + System.out.println("!!!!!" + pod.getStatus().getPhase()); + if (pod.getStatus().getPhase().equals(PENDING)) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } else { + podReady = true; + } } - Pod pod = podList.get(0); return client.pods().inNamespace(pod.getMetadata().getNamespace()) .withName(pod.getMetadata().getName()) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 734b97fb7f75..d51b01566af9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -188,8 +188,7 @@ public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { return jobBuilder.build(); } - public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse, - K8sTaskMainParameters k8STaskMainParameters) { + public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse) { CountDownLatch countDownLatch = new CountDownLatch(1); Watcher watcher = new Watcher() { @@ -205,7 +204,7 @@ public void eventReceived(Action action, Job job) { if (jobStatus == TaskConstants.RUNNING_CODE) { return; } - setTaskStatus(jobStatus, taskInstanceId, taskResponse, k8STaskMainParameters); + setTaskStatus(jobStatus, taskInstanceId, taskResponse); countDownLatch.countDown(); } } @@ -283,7 +282,7 @@ public TaskResponse run(String k8sParameterStr) throws Exception { k8sUtils.buildClient(configYaml); submitJob2k8s(k8sParameterStr); parsePodLogOutput(); - registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result, k8STaskMainParameters); + registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result); if (podLogOutputFuture != null) { try { @@ -355,8 +354,7 @@ public int getK8sJobStatus(Job job) { } } - public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse, - K8sTaskMainParameters k8STaskMainParameters) { + public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse) { if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) { log.info("[K8sJobExecutor-{}] killed", job.getMetadata().getName()); @@ -365,9 +363,7 @@ public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse tas log.info("[K8sJobExecutor-{}] succeed in k8s", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); } else { - String errorMessage = - k8sUtils.getPodLog(job.getMetadata().getName(), k8STaskMainParameters.getNamespaceName()); - log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), errorMessage); + log.error("[K8sJobExecutor-{}] fail in k8s", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index f1eda9db601d..46b226268cf3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -94,9 +94,8 @@ public void testGetK8sJobStatusNormal() { public void testSetTaskStatusNormal() { int jobStatus = 0; TaskResponse taskResponse = new TaskResponse(); - K8sTaskMainParameters k8STaskMainParameters = new K8sTaskMainParameters(); k8sTaskExecutor.setJob(job); - k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse, k8STaskMainParameters); + k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse); Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL, taskResponse.getExitStatusCode())); } @Test From 2f9d91527af6c662f9bcb1ba17ece7c1d5c950b2 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Mon, 10 Jul 2023 23:21:33 +0800 Subject: [PATCH 4/6] fix codesmell --- .../plugin/task/api/am/KubernetesApplicationManager.java | 7 +++---- .../plugin/task/api/k8s/impl/K8sTaskExecutor.java | 2 -- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index 03f7bae4709e..05853cc8553b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -182,9 +182,9 @@ private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerCon */ public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); - boolean podReady = false; + boolean podIsReady = false; Pod pod = null; - while (!podReady) { + while (!podIsReady) { FilterWatchListDeletable watchList = getListenPod(kubernetesApplicationManagerContext); List podList = watchList.list().getItems(); @@ -192,11 +192,10 @@ public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesA return null; } pod = podList.get(0); - System.out.println("!!!!!" + pod.getStatus().getPhase()); if (pod.getStatus().getPhase().equals(PENDING)) { ThreadUtils.sleep(SLEEP_TIME_MILLIS); } else { - podReady = true; + podIsReady = true; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index d51b01566af9..5ae1b20819ce 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -266,8 +266,6 @@ private void parsePodLogOutput() { public TaskResponse run(String k8sParameterStr) throws Exception { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); - K8sTaskMainParameters k8STaskMainParameters = - JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class); try { if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { result.setExitStatusCode(EXIT_CODE_KILL); From 6385fbb999d4f5db3a0fa1aceff10dd0a5efcda3 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 11 Jul 2023 20:11:42 +0800 Subject: [PATCH 5/6] specify container name & loop waiting pod creation --- .../task/api/AbstractCommandExecutor.java | 2 +- .../api/am/KubernetesApplicationManager.java | 22 +++++++++++++------ .../KubernetesApplicationManagerContext.java | 7 +++++- .../task/api/k8s/impl/K8sTaskExecutor.java | 5 ++++- .../plugin/task/api/utils/ProcessUtils.java | 11 ++++++---- 5 files changed, 33 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 69b816c32f2c..89114b1f6b9a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -327,7 +327,7 @@ private void collectPodLogIfNeeded() { ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L); try ( LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), - taskRequest.getTaskAppId())) { + taskRequest.getTaskAppId(), "")) { if (watcher == null) { throw new RuntimeException("The driver pod does not exist."); } else { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index 05853cc8553b..e93d9ca38491 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -56,6 +56,8 @@ public class KubernetesApplicationManager implements ApplicationManager { private static final String FAILED = "Failed"; private static final String UNKNOWN = "Unknown"; + private static final int MAX_RETRY_TIMES = 10; + /** * cache k8s client for same task */ @@ -102,13 +104,17 @@ public ResourceManagerType getResourceManagerType() { private FilterWatchListDeletable getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); String labelValue = kubernetesApplicationManagerContext.getLabelValue(); - FilterWatchListDeletable watchList = client.pods() - .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) - .withLabel(UNIQUE_LABEL_NAME, labelValue); - List podList = watchList.list().getItems(); - if (podList.size() != 1) { - log.warn("Expected driver pod 1, but get {}.", podList.size()); + List podList = null; + FilterWatchListDeletable watchList = null; + int retryTimes = 0; + while (CollectionUtils.isEmpty(podList) && retryTimes < MAX_RETRY_TIMES) { + watchList = client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withLabel(UNIQUE_LABEL_NAME, labelValue); + podList = watchList.list().getItems(); + retryTimes += 1; } + return watchList; } @@ -192,7 +198,8 @@ public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesA return null; } pod = podList.get(0); - if (pod.getStatus().getPhase().equals(PENDING)) { + String phase = pod.getStatus().getPhase(); + if (phase.equals(PENDING) || phase.equals(UNKNOWN)) { ThreadUtils.sleep(SLEEP_TIME_MILLIS); } else { podIsReady = true; @@ -201,6 +208,7 @@ public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesA return client.pods().inNamespace(pod.getMetadata().getNamespace()) .withName(pod.getMetadata().getName()) + .inContainer(kubernetesApplicationManagerContext.getContainerName()) .watchLog(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java index 19dd223477db..2a37d16d353a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java @@ -32,8 +32,13 @@ public class KubernetesApplicationManagerContext implements ApplicationManagerCo private final K8sTaskExecutionContext k8sTaskExecutionContext; /** - * driver pod label value + * pod label value */ private final String labelValue; + /** + * container name (optional) + */ + private final String containerName; + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 5ae1b20819ce..aecb8b1aac4b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -240,12 +240,15 @@ private void parsePodLogOutput() { ExecutorService collectPodLogExecutorService = ThreadUtils .newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + taskRequest.getTaskName()); + String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId()); + String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT); + String containerName = String.format("%s-%s", taskName, taskInstanceId); podLogOutputFuture = collectPodLogExecutorService.submit(() -> { try ( final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), - taskRequest.getTaskAppId())) { + taskRequest.getTaskAppId(), containerName)) { String line; try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { while ((line = reader.readLine()) != null) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index 2956edead624..d7a96360adaf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -144,10 +144,11 @@ public static void cancelApplication(TaskExecutionContext taskExecutionContext) try { if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) { if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) { + // Set empty container name for Spark on K8S task applicationManagerMap.get(ResourceManagerType.KUBERNETES) .killApplication(new KubernetesApplicationManagerContext( taskExecutionContext.getK8sTaskExecutionContext(), - taskExecutionContext.getTaskAppId())); + taskExecutionContext.getTaskAppId(), "")); } } else { String host = taskExecutionContext.getHost(); @@ -197,7 +198,7 @@ public static TaskExecutionStatus getApplicationStatus(K8sTaskExecutionContext k KubernetesApplicationManager applicationManager = (KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES); return applicationManager - .getApplicationStatus(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); + .getApplicationStatus(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, "")); } /** @@ -207,12 +208,14 @@ public static TaskExecutionStatus getApplicationStatus(K8sTaskExecutionContext k * @param taskAppId * @return */ - public static LogWatch getPodLogWatcher(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId) { + public static LogWatch getPodLogWatcher(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId, + String containerName) { KubernetesApplicationManager applicationManager = (KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES); return applicationManager - .getPodLogWatcher(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); + .getPodLogWatcher( + new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, containerName)); } } From 7da8707e88ee2e07b44734b456c3679438e74029 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Wed, 12 Jul 2023 21:29:55 +0800 Subject: [PATCH 6/6] sleep when pod is not initialized --- .../plugin/task/api/am/KubernetesApplicationManager.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index e93d9ca38491..ac1ce69f7608 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -112,6 +112,10 @@ private FilterWatchListDeletable getListenPod(Kuberne .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) .withLabel(UNIQUE_LABEL_NAME, labelValue); podList = watchList.list().getItems(); + if (!CollectionUtils.isEmpty(podList)) { + break; + } + ThreadUtils.sleep(SLEEP_TIME_MILLIS); retryTimes += 1; } @@ -193,7 +197,7 @@ public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesA while (!podIsReady) { FilterWatchListDeletable watchList = getListenPod(kubernetesApplicationManagerContext); - List podList = watchList.list().getItems(); + List podList = watchList == null ? null : watchList.list().getItems(); if (CollectionUtils.isEmpty(podList)) { return null; }