From c23bf35a61f88c4f4788d96530a46ab9c0819b4b Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Mon, 13 Mar 2023 09:18:43 +0100 Subject: [PATCH 1/5] fix(build): missing dependsOn for gradle 8 --- build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/build.gradle b/build.gradle index dd5fecd8948..d29b929d6b6 100644 --- a/build.gradle +++ b/build.gradle @@ -357,7 +357,6 @@ subprojects { } task sourcesJar(type: Jar) { - dependsOn = [':core:copyGradleProperties'] archiveClassifier.set('sources') from sourceSets.main.allSource } From 05cd340e92d4684dab14fbfb1fe4b37863e905de Mon Sep 17 00:00:00 2001 From: Yann C Date: Mon, 13 Mar 2023 10:12:50 +0100 Subject: [PATCH 2/5] feat: new getLog task --- .../io/kestra/core/tasks/debugs/Fetch.java | 111 ++++++++++++++++++ .../src/test/java/io/kestra/core/Helpers.java | 2 +- .../java/io/kestra/core/tasks/FetchTest.java | 38 ++++++ .../flows/valids/get-log-taskid.yaml | 13 ++ .../test/resources/flows/valids/get-log.yaml | 11 ++ .../resources/flows/valids/pause-delay.yaml | 2 +- .../memory/MemoryLogRepository.java | 12 +- 7 files changed, 181 insertions(+), 8 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java create mode 100644 core/src/test/java/io/kestra/core/tasks/FetchTest.java create mode 100644 core/src/test/resources/flows/valids/get-log-taskid.yaml create mode 100644 core/src/test/resources/flows/valids/get-log.yaml diff --git a/core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java b/core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java new file mode 100644 index 00000000000..39f19ea54be --- /dev/null +++ b/core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java @@ -0,0 +1,111 @@ +package io.kestra.core.tasks.debugs; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.repositories.LogRepositoryInterface; +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import lombok.experimental.SuperBuilder; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +import static io.kestra.core.utils.Rethrow.throwConsumer; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Output execution logs in a file.", + description = "This task is useful to propagate your logs." +) +@Plugin( + examples = { + @Example( + code = { + "level: ERROR", + } + ), + @Example( + code = { + "level: WARN", + "tasksId: " + + " - \"previous-task-id\"" + } + ) + } +) +public class Fetch extends Task implements RunnableTask { + + @Schema( + title = "Filter on specific task(s)" + ) + @PluginProperty + private Collection tasksId; + + @Schema( + title = "Minimum log level you want to fetch" + ) + @Builder.Default + @PluginProperty + private Level level = Level.INFO; + + @Override + public Output run(RunContext runContext) throws Exception { + String executionId = (String) new HashMap<>((Map) runContext.getVariables().get("execution")).get("id"); + LogRepositoryInterface logRepository = runContext.getApplicationContext().getBean(LogRepositoryInterface.class); + List logs = new ArrayList<>(); + + if(this.tasksId != null){ + for (String taskId : tasksId) { + logs.addAll(logRepository.findByExecutionIdAndTaskId(executionId, taskId, level)); + } + } else { + logs = logRepository.findByExecutionId(executionId, level); + } + + File tempFile = runContext.tempFile(".ion").toFile(); + AtomicLong count = new AtomicLong(); + + try (OutputStream output = new FileOutputStream(tempFile)) { + logs.forEach(throwConsumer(log -> { + count.incrementAndGet(); + FileSerde.write(output, log); + })); + } + + return Output + .builder() + .uri(runContext.putTempFile(tempFile)) + .size(count.get()) + .build(); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "The size of the rows fetch" + ) + private Long size; + + @Schema( + title = "The uri of store result", + description = "File format is ion" + ) + private URI uri; + } +} diff --git a/core/src/test/java/io/kestra/core/Helpers.java b/core/src/test/java/io/kestra/core/Helpers.java index 3fe142ef906..34675523134 100644 --- a/core/src/test/java/io/kestra/core/Helpers.java +++ b/core/src/test/java/io/kestra/core/Helpers.java @@ -19,7 +19,7 @@ import java.util.function.Consumer; public class Helpers { - public static long FLOWS_COUNT = 55; + public static long FLOWS_COUNT = 57; public static ApplicationContext applicationContext() throws URISyntaxException { return applicationContext( diff --git a/core/src/test/java/io/kestra/core/tasks/FetchTest.java b/core/src/test/java/io/kestra/core/tasks/FetchTest.java new file mode 100644 index 00000000000..d115f37005d --- /dev/null +++ b/core/src/test/java/io/kestra/core/tasks/FetchTest.java @@ -0,0 +1,38 @@ +package io.kestra.core.tasks; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.State; +import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.runners.AbstractMemoryRunnerTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +public class FetchTest extends AbstractMemoryRunnerTest { + @Inject + FlowRepositoryInterface flowRepository; + + @Test + void fetch() throws Exception { + Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log"); + + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(execution.getTaskRunList(), hasSize(3)); + TaskRun fetch = execution.getTaskRunList().get(2); + assertThat(fetch.getOutputs().get("size"), is(2)); + } + + @Test + void fetchWithTaskId() throws Exception { + Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log-taskid"); + + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(execution.getTaskRunList(), hasSize(3)); + TaskRun fetch = execution.getTaskRunList().get(2); + assertThat(fetch.getOutputs().get("size"), is(1)); + } +} diff --git a/core/src/test/resources/flows/valids/get-log-taskid.yaml b/core/src/test/resources/flows/valids/get-log-taskid.yaml new file mode 100644 index 00000000000..d9c7c73b37e --- /dev/null +++ b/core/src/test/resources/flows/valids/get-log-taskid.yaml @@ -0,0 +1,13 @@ +id: get-log-taskid +namespace: io.kestra.tests +tasks: + - type: io.kestra.core.tasks.debugs.Echo + id: task-1 + format: task 1 + - type: io.kestra.core.tasks.debugs.Echo + id: task-2 + format: task 2 + - type: io.kestra.core.tasks.debugs.Fetch + id: get-log-task + tasksId: + - task-1 \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/get-log.yaml b/core/src/test/resources/flows/valids/get-log.yaml new file mode 100644 index 00000000000..0e634f04088 --- /dev/null +++ b/core/src/test/resources/flows/valids/get-log.yaml @@ -0,0 +1,11 @@ +id: get-log +namespace: io.kestra.tests +tasks: + - type: io.kestra.core.tasks.debugs.Echo + id: task-1 + format: task 1 + - type: io.kestra.core.tasks.debugs.Echo + id: task-2 + format: task 2 + - type: io.kestra.core.tasks.debugs.Fetch + id: get-log-task \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/pause-delay.yaml b/core/src/test/resources/flows/valids/pause-delay.yaml index ace4f4a6aaf..ccbb5555e88 100644 --- a/core/src/test/resources/flows/valids/pause-delay.yaml +++ b/core/src/test/resources/flows/valids/pause-delay.yaml @@ -4,7 +4,7 @@ namespace: io.kestra.tests tasks: - id: pause type: io.kestra.core.tasks.flows.Pause - delay: PT1S + delay: PT10S tasks: - id: ko type: io.kestra.core.tasks.scripts.Bash diff --git a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java index 08841447714..be1f7783228 100644 --- a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java +++ b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java @@ -1,18 +1,18 @@ package io.kestra.repository.memory; import io.kestra.core.models.executions.Execution; -import io.micronaut.data.model.Pageable; import io.kestra.core.models.executions.LogEntry; import io.kestra.core.repositories.ArrayListTotal; import io.kestra.core.repositories.LogRepositoryInterface; +import io.micronaut.data.model.Pageable; +import jakarta.inject.Singleton; import org.slf4j.event.Level; +import javax.annotation.Nullable; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; -import jakarta.inject.Singleton; - -import javax.annotation.Nullable; +import java.util.stream.Collectors; @Singleton @MemoryRepositoryEnabled @@ -21,12 +21,12 @@ public class MemoryLogRepository implements LogRepositoryInterface { @Override public List findByExecutionId(String id, Level minLevel) { - throw new UnsupportedOperationException(); + return logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(id) && logEntry.getLevel().equals(minLevel)).collect(Collectors.toList()); } @Override public List findByExecutionIdAndTaskId(String executionId, String taskId, Level minLevel) { - throw new UnsupportedOperationException(); + return logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(executionId) && logEntry.getTaskId().equals(taskId) && logEntry.getLevel().equals(minLevel)).collect(Collectors.toList()); } @Override From f4e378c8af62b6d76a0e763cfe4ae20e9f07ab1f Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Mon, 13 Mar 2023 09:18:43 +0100 Subject: [PATCH 3/5] fix(build): missing dependsOn for gradle 8 --- build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.gradle b/build.gradle index d29b929d6b6..b209cfe0b5e 100644 --- a/build.gradle +++ b/build.gradle @@ -357,6 +357,8 @@ subprojects { } task sourcesJar(type: Jar) { + dependsOn = [':core:copyGradleProperties'] + dependsOn = [':ui:assembleFrontend'] archiveClassifier.set('sources') from sourceSets.main.allSource } From 0e101c7974a0de22f6be4baa16bb6e8b3169234f Mon Sep 17 00:00:00 2001 From: Yann C Date: Mon, 13 Mar 2023 10:12:50 +0100 Subject: [PATCH 4/5] feat: new getLog task --- .../core/tasks/{debugs => log}/Fetch.java | 54 ++++++++++++------- .../src/test/java/io/kestra/core/Helpers.java | 2 +- .../java/io/kestra/core/tasks/FetchTest.java | 10 ++++ .../flows/valids/get-log-executionid.yaml | 12 +++++ .../flows/valids/get-log-taskid.yaml | 2 +- .../test/resources/flows/valids/get-log.yaml | 2 +- 6 files changed, 59 insertions(+), 23 deletions(-) rename core/src/main/java/io/kestra/core/tasks/{debugs => log}/Fetch.java (62%) create mode 100644 core/src/test/resources/flows/valids/get-log-executionid.yaml diff --git a/core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java b/core/src/main/java/io/kestra/core/tasks/log/Fetch.java similarity index 62% rename from core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java rename to core/src/main/java/io/kestra/core/tasks/log/Fetch.java index 39f19ea54be..bd602392837 100644 --- a/core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java +++ b/core/src/main/java/io/kestra/core/tasks/log/Fetch.java @@ -1,9 +1,8 @@ -package io.kestra.core.tasks.debugs; +package io.kestra.core.tasks.log; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; -import io.kestra.core.models.executions.LogEntry; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.repositories.LogRepositoryInterface; @@ -18,7 +17,9 @@ import java.io.FileOutputStream; import java.io.OutputStream; import java.net.URI; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static io.kestra.core.utils.Rethrow.throwConsumer; @@ -43,13 +44,26 @@ code = { "level: WARN", "tasksId: " + - " - \"previous-task-id\"" + " - \"previous-task-id\"" + } + ), + @Example( + code = { + "level: WARN", + "executionId: \"{{execution.id}}\"" } ) } ) public class Fetch extends Task implements RunnableTask { + @Schema( + title = "Filter on specific execution", + description = "If not set, will use the current execution" + ) + @PluginProperty(dynamic = true) + private String executionId; + @Schema( title = "Filter on specific task(s)" ) @@ -65,26 +79,26 @@ public class Fetch extends Task implements RunnableTask { @Override public Output run(RunContext runContext) throws Exception { - String executionId = (String) new HashMap<>((Map) runContext.getVariables().get("execution")).get("id"); + String executionId = this.executionId != null ? runContext.render(this.executionId) : (String) new HashMap<>((Map) runContext.getVariables().get("execution")).get("id"); LogRepositoryInterface logRepository = runContext.getApplicationContext().getBean(LogRepositoryInterface.class); - List logs = new ArrayList<>(); - - if(this.tasksId != null){ - for (String taskId : tasksId) { - logs.addAll(logRepository.findByExecutionIdAndTaskId(executionId, taskId, level)); - } - } else { - logs = logRepository.findByExecutionId(executionId, level); - } File tempFile = runContext.tempFile(".ion").toFile(); AtomicLong count = new AtomicLong(); try (OutputStream output = new FileOutputStream(tempFile)) { - logs.forEach(throwConsumer(log -> { - count.incrementAndGet(); - FileSerde.write(output, log); - })); + if (this.tasksId != null) { + for (String taskId : tasksId) { + logRepository.findByExecutionIdAndTaskId(executionId, taskId, level).forEach(throwConsumer(log -> { + count.incrementAndGet(); + FileSerde.write(output, log); + })); + } + } else { + logRepository.findByExecutionId(executionId, level).forEach(throwConsumer(log -> { + count.incrementAndGet(); + FileSerde.write(output, log); + })); + } } return Output @@ -98,12 +112,12 @@ public Output run(RunContext runContext) throws Exception { @Getter public static class Output implements io.kestra.core.models.tasks.Output { @Schema( - title = "The size of the rows fetch" + title = "The size of the fetched rows" ) private Long size; @Schema( - title = "The uri of store result", + title = "The uri of stored results", description = "File format is ion" ) private URI uri; diff --git a/core/src/test/java/io/kestra/core/Helpers.java b/core/src/test/java/io/kestra/core/Helpers.java index 34675523134..fc6e06bb4ef 100644 --- a/core/src/test/java/io/kestra/core/Helpers.java +++ b/core/src/test/java/io/kestra/core/Helpers.java @@ -19,7 +19,7 @@ import java.util.function.Consumer; public class Helpers { - public static long FLOWS_COUNT = 57; + public static long FLOWS_COUNT = 58; public static ApplicationContext applicationContext() throws URISyntaxException { return applicationContext( diff --git a/core/src/test/java/io/kestra/core/tasks/FetchTest.java b/core/src/test/java/io/kestra/core/tasks/FetchTest.java index d115f37005d..27253a5e797 100644 --- a/core/src/test/java/io/kestra/core/tasks/FetchTest.java +++ b/core/src/test/java/io/kestra/core/tasks/FetchTest.java @@ -35,4 +35,14 @@ void fetchWithTaskId() throws Exception { TaskRun fetch = execution.getTaskRunList().get(2); assertThat(fetch.getOutputs().get("size"), is(1)); } + + @Test + void fetchWithExecutionId() throws Exception { + Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log-executionid"); + + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(execution.getTaskRunList(), hasSize(3)); + TaskRun fetch = execution.getTaskRunList().get(2); + assertThat(fetch.getOutputs().get("size"), is(2)); + } } diff --git a/core/src/test/resources/flows/valids/get-log-executionid.yaml b/core/src/test/resources/flows/valids/get-log-executionid.yaml new file mode 100644 index 00000000000..9559714b917 --- /dev/null +++ b/core/src/test/resources/flows/valids/get-log-executionid.yaml @@ -0,0 +1,12 @@ +id: get-log-executionid +namespace: io.kestra.tests +tasks: + - type: io.kestra.core.tasks.debugs.Echo + id: task-1 + format: task 1 + - type: io.kestra.core.tasks.debugs.Echo + id: task-2 + format: task 2 + - type: io.kestra.core.tasks.log.Fetch + id: get-log-task + executionId: "{{execution.id}}" \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/get-log-taskid.yaml b/core/src/test/resources/flows/valids/get-log-taskid.yaml index d9c7c73b37e..41c7a017664 100644 --- a/core/src/test/resources/flows/valids/get-log-taskid.yaml +++ b/core/src/test/resources/flows/valids/get-log-taskid.yaml @@ -7,7 +7,7 @@ tasks: - type: io.kestra.core.tasks.debugs.Echo id: task-2 format: task 2 - - type: io.kestra.core.tasks.debugs.Fetch + - type: io.kestra.core.tasks.log.Fetch id: get-log-task tasksId: - task-1 \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/get-log.yaml b/core/src/test/resources/flows/valids/get-log.yaml index 0e634f04088..4a32ac7d759 100644 --- a/core/src/test/resources/flows/valids/get-log.yaml +++ b/core/src/test/resources/flows/valids/get-log.yaml @@ -7,5 +7,5 @@ tasks: - type: io.kestra.core.tasks.debugs.Echo id: task-2 format: task 2 - - type: io.kestra.core.tasks.debugs.Fetch + - type: io.kestra.core.tasks.log.Fetch id: get-log-task \ No newline at end of file From 01c8eec338707cc49646bbedb1e7704f5a879994 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Thu, 16 Mar 2023 17:54:26 +0100 Subject: [PATCH 5/5] cleanup --- .../java/io/kestra/core/tasks/log/Fetch.java | 23 +++++++++++-------- .../src/test/java/io/kestra/core/Helpers.java | 2 +- .../memory/MemoryLogRepository.java | 10 ++++++-- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/kestra/core/tasks/log/Fetch.java b/core/src/main/java/io/kestra/core/tasks/log/Fetch.java index bd602392837..2a9ccbf474f 100644 --- a/core/src/main/java/io/kestra/core/tasks/log/Fetch.java +++ b/core/src/main/java/io/kestra/core/tasks/log/Fetch.java @@ -43,8 +43,8 @@ @Example( code = { "level: WARN", - "tasksId: " + - " - \"previous-task-id\"" + "tasksId: ", + " - \"previous-task-id\"" } ), @Example( @@ -56,7 +56,6 @@ } ) public class Fetch extends Task implements RunnableTask { - @Schema( title = "Filter on specific execution", description = "If not set, will use the current execution" @@ -88,16 +87,20 @@ public Output run(RunContext runContext) throws Exception { try (OutputStream output = new FileOutputStream(tempFile)) { if (this.tasksId != null) { for (String taskId : tasksId) { - logRepository.findByExecutionIdAndTaskId(executionId, taskId, level).forEach(throwConsumer(log -> { + logRepository + .findByExecutionIdAndTaskId(executionId, taskId, level) + .forEach(throwConsumer(log -> { + count.incrementAndGet(); + FileSerde.write(output, log); + })); + } + } else { + logRepository + .findByExecutionId(executionId, level) + .forEach(throwConsumer(log -> { count.incrementAndGet(); FileSerde.write(output, log); })); - } - } else { - logRepository.findByExecutionId(executionId, level).forEach(throwConsumer(log -> { - count.incrementAndGet(); - FileSerde.write(output, log); - })); } } diff --git a/core/src/test/java/io/kestra/core/Helpers.java b/core/src/test/java/io/kestra/core/Helpers.java index fc6e06bb4ef..62e15377b2f 100644 --- a/core/src/test/java/io/kestra/core/Helpers.java +++ b/core/src/test/java/io/kestra/core/Helpers.java @@ -19,7 +19,7 @@ import java.util.function.Consumer; public class Helpers { - public static long FLOWS_COUNT = 58; + public static long FLOWS_COUNT = 61; public static ApplicationContext applicationContext() throws URISyntaxException { return applicationContext( diff --git a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java index be1f7783228..d1ad03b4275 100644 --- a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java +++ b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryLogRepository.java @@ -21,12 +21,18 @@ public class MemoryLogRepository implements LogRepositoryInterface { @Override public List findByExecutionId(String id, Level minLevel) { - return logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(id) && logEntry.getLevel().equals(minLevel)).collect(Collectors.toList()); + return logs + .stream() + .filter(logEntry -> logEntry.getExecutionId().equals(id) && logEntry.getLevel().equals(minLevel)) + .collect(Collectors.toList()); } @Override public List findByExecutionIdAndTaskId(String executionId, String taskId, Level minLevel) { - return logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(executionId) && logEntry.getTaskId().equals(taskId) && logEntry.getLevel().equals(minLevel)).collect(Collectors.toList()); + return logs + .stream() + .filter(logEntry -> logEntry.getExecutionId().equals(executionId) && logEntry.getTaskId().equals(taskId) && logEntry.getLevel().equals(minLevel)) + .collect(Collectors.toList()); } @Override