Skip to content

Commit

Permalink
feat: new getLog task
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye committed Mar 13, 2023
1 parent 0f6a784 commit 838f608
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 8 deletions.
111 changes: 111 additions & 0 deletions core/src/main/java/io/kestra/core/tasks/debugs/Fetch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.kestra.core.tasks.debugs;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.event.Level;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Output execution logs in a file.",
description = "This task is useful to propagate your logs."
)
@Plugin(
examples = {
@Example(
code = {
"level: ERROR",
}
),
@Example(
code = {
"level: WARN",
"tasksId: " +
" - \"previous-task-id\""
}
)
}
)
public class Fetch extends Task implements RunnableTask<Fetch.Output> {

@Schema(
title = "Filter on specific task(s)"
)
@PluginProperty
private Collection<String> tasksId;

@Schema(
title = "Minimum log level you want to fetch"
)
@Builder.Default
@PluginProperty
private Level level = Level.INFO;

@Override
public Output run(RunContext runContext) throws Exception {
String executionId = (String) new HashMap<>((Map<String, Object>) runContext.getVariables().get("execution")).get("id");
LogRepositoryInterface logRepository = runContext.getApplicationContext().getBean(LogRepositoryInterface.class);
List<LogEntry> logs = new ArrayList<>();

if(this.tasksId != null){
for (String taskId : tasksId) {
logs.addAll(logRepository.findByExecutionIdAndTaskId(executionId, taskId, level));
}
} else {
logs = logRepository.findByExecutionId(executionId, level);
}

File tempFile = runContext.tempFile(".ion").toFile();
AtomicLong count = new AtomicLong();

try (OutputStream output = new FileOutputStream(tempFile)) {
logs.forEach(throwConsumer(log -> {
count.incrementAndGet();
FileSerde.write(output, log);
}));
}

return Output
.builder()
.uri(runContext.putTempFile(tempFile))
.size(count.get())
.build();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The size of the rows fetch"
)
private Long size;

@Schema(
title = "The uri of store result",
description = "File format is ion"
)
private URI uri;
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 55;
public static long FLOWS_COUNT = 57;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/java/io/kestra/core/tasks/FetchTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.kestra.core.tasks;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class FetchTest extends AbstractMemoryRunnerTest {
@Inject
FlowRepositoryInterface flowRepository;

@Test
void fetch() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(2));
}

@Test
void fetchWithTaskId() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log-taskid");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(1));
}
}
13 changes: 13 additions & 0 deletions core/src/test/resources/flows/valids/get-log-taskid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
id: get-log-taskid
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.debugs.Fetch
id: get-log-task
tasksId:
- task-1
11 changes: 11 additions & 0 deletions core/src/test/resources/flows/valids/get-log.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id: get-log
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.debugs.Fetch
id: get-log-task
2 changes: 1 addition & 1 deletion core/src/test/resources/flows/valids/pause-delay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace: io.kestra.tests
tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
delay: PT1S
delay: PT10S
tasks:
- id: ko
type: io.kestra.core.tasks.scripts.Bash
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.kestra.repository.memory;

import io.kestra.core.models.executions.Execution;
import io.micronaut.data.model.Pageable;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import org.slf4j.event.Level;

import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Singleton;

import javax.annotation.Nullable;
import java.util.stream.Collectors;

@Singleton
@MemoryRepositoryEnabled
Expand All @@ -21,12 +21,12 @@ public class MemoryLogRepository implements LogRepositoryInterface {

@Override
public List<LogEntry> findByExecutionId(String id, Level minLevel) {
throw new UnsupportedOperationException();
return logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(id) && logEntry.getLevel().equals(minLevel)).collect(Collectors.toList());
}

@Override
public List<LogEntry> findByExecutionIdAndTaskId(String executionId, String taskId, Level minLevel) {
throw new UnsupportedOperationException();
return logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(executionId) && logEntry.getTaskId().equals(taskId) && logEntry.getLevel().equals(minLevel)).collect(Collectors.toList());
}

@Override
Expand Down

0 comments on commit 838f608

Please sign in to comment.