Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/waitfor issue #4421

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions core/src/main/java/io/kestra/core/models/tasks/ResolvedTask.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.kestra.core.models.tasks;

import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Value;

import java.util.List;
import java.util.stream.Collectors;
import jakarta.validation.constraints.NotNull;

@Builder
@Value
Expand All @@ -27,6 +26,13 @@ public NextTaskRun toNextTaskRun(Execution execution) {
);
}

public NextTaskRun toNextTaskRunIncrementIteration(Execution execution, Integer iteration) {
return new NextTaskRun(
TaskRun.of(execution, this).withIteration(iteration != null ? iteration : 1),
this.getTask()
);
}

public static ResolvedTask of(Task task) {
return ResolvedTask.builder()
.task(task)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public static List<NextTaskRun> resolveWaitForNext(
List<TaskRun> taskRuns = execution.findTaskRunByTasks(currentTasks, parentTaskRun);
if (taskRuns.isEmpty()) {
return Collections.singletonList(
currentTasks.getFirst().toNextTaskRun(execution)
currentTasks.getFirst().toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration())
);
}

Expand All @@ -127,9 +127,9 @@ public static List<NextTaskRun> resolveWaitForNext(
int lastIndex = taskRuns.indexOf(lastTerminated.get());

if (currentTasks.size() > lastIndex + 1) {
return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRun(execution));
return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
} else {
return Collections.singletonList(currentTasks.getFirst().toNextTaskRun(execution));
return Collections.singletonList(currentTasks.getFirst().toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
}
}

Expand Down
20 changes: 8 additions & 12 deletions core/src/main/java/io/kestra/core/services/ExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.multipart.CompletedPart;
import io.micronaut.http.multipart.StreamingFileUpload;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand All @@ -42,6 +41,7 @@
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -100,29 +100,25 @@ public Execution retryTask(Execution execution, String taskRunId) {
}

public Execution retryWaitFor(Execution execution, String flowableTaskRunId) {
AtomicReference<Boolean> firstDone = new AtomicReference<>(false);
List<TaskRun> newTaskRuns = execution
.getTaskRunList()
.stream()
.map(taskRun -> {
if (taskRun.getId().equals(flowableTaskRunId)) {
// Keep only CREATED/RUNNING
// To avoid having large history
return taskRun.replaceState(
new State(
State.Type.RUNNING,
taskRun.getState().getHistories().subList(0,2)
)
);
return taskRun.resetAttempts().incrementIteration();
}

if (flowableTaskRunId.equals(taskRun.getParentTaskRunId())) {
// Clean attempts and only increment iteration
// to avoid having large history
return taskRun.resetAttempts().incrementIteration();
// Clean children
return null;
}

return taskRun;
})
.filter(Objects::nonNull)
.toList();

return execution.withTaskRunList(newTaskRuns).withState(State.Type.RUNNING);
Expand Down
34 changes: 33 additions & 1 deletion ui/src/components/executions/Logs.vue
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@
{{ logDisplayButtonText }}
</el-button>
</el-form-item>
<el-form-item>
<el-tooltip
:content="!raw_view ? $t('logs_view.raw_details') : $t('logs_view.compact_details')"
>
<el-button @click="setRawView()">
{{ !raw_view ? $t('logs_view.raw') : $t('logs_view.compact') }}
</el-button>
</el-tooltip>
</el-form-item>
<el-form-item>
<el-button-group>
<el-button @click="downloadContent()">
Expand All @@ -35,6 +44,7 @@
</collapse>

<task-run-details
v-if="!raw_view"
ref="logs"
:level="level"
:exclude-metas="['namespace', 'flowId', 'taskId', 'executionId']"
Expand All @@ -45,6 +55,16 @@
:target-flow="flow"
:show-progress-bar="false"
/>
<el-card v-else>
<template v-for="log in logs" :key="`${log.timestamp}-${log.taskRun}`">
<log-line
:level="level"
filter=""
:log="log"
title
/>
</template>
</el-card>
</div>
</template>

Expand All @@ -58,9 +78,11 @@
import Collapse from "../layout/Collapse.vue";
import State from "../../utils/state";
import Utils from "../../utils/utils";
import LogLine from "../logs/LogLine.vue";

export default {
components: {
LogLine,
TaskRunDetails,
LogLevelSelector,
Kicon,
Expand All @@ -73,7 +95,8 @@
fullscreen: false,
level: undefined,
filter: undefined,
openedTaskrunsCount: 0
openedTaskrunsCount: 0,
raw_view: false
};
},
created() {
Expand Down Expand Up @@ -114,6 +137,15 @@
},
expandCollapseAll() {
this.$refs.logs.toggleExpandCollapseAll();
},
setRawView() {
this.raw_view = !this.raw_view;
if(this.raw_view) {
this.$store.dispatch("execution/loadLogs", {
executionId: this.execution.id,
minLevel: this.level
})
}
}
}
};
Expand Down
7 changes: 6 additions & 1 deletion ui/src/components/logs/LogLine.vue
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<div class="line font-monospace" v-if="filtered">
<span :class="levelClass" class="header-badge log-level el-tag noselect fw-bold">{{ log.level }}</span>
<div class="log-content d-inline-block">
<span v-if="title" class="fw-bold">{{ (log.taskId ?? log.flowId ?? "").capitalize() }}</span>
<div
class="header"
:class="{'d-inline-block': metaWithValue.length === 0, 'me-3': metaWithValue.length === 0}"
Expand Down Expand Up @@ -53,6 +54,10 @@
excludeMetas: {
type: Array,
default: () => [],
},
title: {
type: Boolean,
default: false
}
},
data() {
Expand Down Expand Up @@ -184,11 +189,11 @@
}

.header-badge {
display: inline-block;
font-size: 95%;
text-align: center;
white-space: nowrap;
vertical-align: baseline;
width: 40px;

span:first-child {
margin-right: 6px;
Expand Down
2 changes: 2 additions & 0 deletions ui/src/components/logs/TaskRunDetails.vue
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@
return Download
},
currentTaskRuns() {
// console.log(this.followedExecution?.taskRunList?.filter(tr => this.taskRunId ? tr.id === this.taskRunId : true))
// return this.logs.map(log => log.taskRunId).filter(tr => this.taskRunId ? tr.id === this.taskRunId : true)
return this.followedExecution?.taskRunList?.filter(tr => this.taskRunId ? tr.id === this.taskRunId : true) ?? [];
},
params() {
Expand Down
6 changes: 1 addition & 5 deletions ui/src/stores/executions.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,7 @@ export default {
if (options.store === false) {
return response.data
}
if(options.params.page !== 1) {
commit("appendLogs", response.data)
} else {
commit("setLogs", response.data)
}
commit("setLogs", response.data)

return response.data
});
Expand Down
6 changes: 6 additions & 0 deletions ui/src/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,12 @@
"curl": {
"command": "cURL command",
"note": "Note that for SECRET and FILE input type, the command must be accommodate to match the real values."
},
"logs_view": {
"raw": "Temporal view",
"raw_details": "Show full task logs and flow logs in a raw timestamp-ordered format",
"compact": "Default view",
"compact_details": "Show task logs in a compact view grouped by each task"
}
}
}
6 changes: 6 additions & 0 deletions ui/src/translations/fr.json
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,12 @@
"curl": {
"command": "Commande cURL",
"note": "Notez que pour les types d'entrée SECRET et FILE, la commande doit être adaptée pour correspondre aux valeurs réelles."
},
"logs_view": {
"raw": "Vue temporelle",
"raw_details": "Affiche les logs des tâches et du flow dans un format brut ordonné par horodatage",
"compact": "Vue par défaut",
"compact_details": "Affiche les logs dans un format compacte regroupé par tâche"
}
}
}