Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-13763][K8S Task] collect real-time log #14379

Merged
merged 11 commits into from
Jul 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
// Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
logger.info("Handle task log error", e);
logger.error("Handle task log error", e);
}
}

Expand Down Expand Up @@ -272,7 +272,7 @@ private void collectPodLogIfNeeded() {
ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L);
try (
LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
taskRequest.getTaskAppId())) {
taskRequest.getTaskAppId(), "")) {
Radeity marked this conversation as resolved.
Show resolved Hide resolved
if (watcher == null) {
throw new RuntimeException("The driver pod does not exist.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.dolphinscheduler.plugin.task.api.am;

import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SLEEP_TIME_MILLIS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;

import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
Expand Down Expand Up @@ -54,6 +56,8 @@ public class KubernetesApplicationManager implements ApplicationManager {
private static final String FAILED = "Failed";
private static final String UNKNOWN = "Unknown";

private static final int MAX_RETRY_TIMES = 10;

/**
* cache k8s client for same task
*/
Expand All @@ -67,7 +71,7 @@ public boolean killApplication(ApplicationManagerContext applicationManagerConte
boolean isKill;
String labelValue = kubernetesApplicationManagerContext.getLabelValue();
FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
getDriverPod(kubernetesApplicationManagerContext);
getListenPod(kubernetesApplicationManagerContext);
try {
if (getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) {
log.error("Driver pod is in FAILED or UNKNOWN status.");
Expand Down Expand Up @@ -97,16 +101,24 @@ public ResourceManagerType getResourceManagerType() {
* @param kubernetesApplicationManagerContext
* @return
*/
private FilterWatchListDeletable<Pod, PodList, PodResource> getDriverPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
private FilterWatchListDeletable<Pod, PodList, PodResource> getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
KubernetesClient client = getClient(kubernetesApplicationManagerContext);
String labelValue = kubernetesApplicationManagerContext.getLabelValue();
FilterWatchListDeletable<Pod, PodList, PodResource> watchList = client.pods()
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
.withLabel(UNIQUE_LABEL_NAME, labelValue);
List<Pod> podList = watchList.list().getItems();
if (podList.size() != 1) {
log.warn("Expected driver pod 1, but get {}.", podList.size());
List<Pod> podList = null;
FilterWatchListDeletable<Pod, PodList, PodResource> watchList = null;
int retryTimes = 0;
while (CollectionUtils.isEmpty(podList) && retryTimes < MAX_RETRY_TIMES) {
watchList = client.pods()
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
.withLabel(UNIQUE_LABEL_NAME, labelValue);
podList = watchList.list().getItems();
if (!CollectionUtils.isEmpty(podList)) {
break;
}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
retryTimes += 1;
Radeity marked this conversation as resolved.
Show resolved Hide resolved
}

return watchList;
}

Expand Down Expand Up @@ -153,7 +165,7 @@ private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerCon
String phase;
try {
if (Objects.isNull(watchList)) {
watchList = getDriverPod(kubernetesApplicationManagerContext);
watchList = getListenPod(kubernetesApplicationManagerContext);
}
List<Pod> driverPod = watchList.list().getItems();
if (!driverPod.isEmpty()) {
Expand All @@ -180,16 +192,27 @@ private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerCon
*/
public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
KubernetesClient client = getClient(kubernetesApplicationManagerContext);
FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
getDriverPod(kubernetesApplicationManagerContext);
List<Pod> driverPod = watchList.list().getItems();
if (CollectionUtils.isEmpty(driverPod)) {
return null;
boolean podIsReady = false;
Pod pod = null;
while (!podIsReady) {
FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
getListenPod(kubernetesApplicationManagerContext);
List<Pod> podList = watchList == null ? null : watchList.list().getItems();
if (CollectionUtils.isEmpty(podList)) {
return null;
Radeity marked this conversation as resolved.
Show resolved Hide resolved
}
pod = podList.get(0);
String phase = pod.getStatus().getPhase();
if (phase.equals(PENDING) || phase.equals(UNKNOWN)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
} else {
podIsReady = true;
}
}
Pod driver = driverPod.get(0);

return client.pods().inNamespace(driver.getMetadata().getNamespace())
.withName(driver.getMetadata().getName())
return client.pods().inNamespace(pod.getMetadata().getNamespace())
.withName(pod.getMetadata().getName())
Radeity marked this conversation as resolved.
Show resolved Hide resolved
.inContainer(kubernetesApplicationManagerContext.getContainerName())
.watchLog();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ public class KubernetesApplicationManagerContext implements ApplicationManagerCo
private final K8sTaskExecutionContext k8sTaskExecutionContext;

/**
* driver pod label value
* pod label value
*/
private final String labelValue;

/**
* container name (optional)
*/
private final String containerName;

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
} catch (Exception e) {
log.error("k8s task submit failed with error", e);
log.error("k8s task submit failed with error");
exitStatusCode = -1;
throw new TaskException("Execute k8s task error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.dolphinscheduler.plugin.task.api.k8s;

import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;

import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
Expand All @@ -32,14 +30,12 @@ public abstract class AbstractK8sTaskExecutor {
protected Logger log;
protected TaskExecutionContext taskRequest;
protected K8sUtils k8sUtils;
protected StringBuilder logStringBuffer;
protected Yaml yaml;

protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext taskRequest) {
this.log = log;
this.taskRequest = taskRequest;
this.k8sUtils = new K8sUtils();
this.logStringBuffer = new StringBuilder();
this.yaml = new Yaml();
}

Expand All @@ -53,14 +49,6 @@ public void waitTimeout(Boolean timeout) throws TaskException {
}
}

public void flushLog(TaskResponse taskResponse) {
if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode() == EXIT_CODE_FAILURE) {
log.error(logStringBuffer.toString());
} else if (logStringBuffer.length() != 0) {
log.info(logStringBuffer.toString());
}
}

public abstract void submitJob2k8s(String k8sParameterStr);

public abstract void stopJobOnK8s(String k8sParameterStr);
Expand Down
Loading