Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new getLog task #1059

Merged
merged 6 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ subprojects {

task sourcesJar(type: Jar) {
dependsOn = [':core:copyGradleProperties']
dependsOn = [':ui:assembleFrontend']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it needed or is it a leftover of something else ?
Seems not related to this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably leftover of this : f4e378c

archiveClassifier.set('sources')
from sourceSets.main.allSource
}
Expand Down
128 changes: 128 additions & 0 deletions core/src/main/java/io/kestra/core/tasks/log/Fetch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package io.kestra.core.tasks.log;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.event.Level;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Output execution logs in a file.",
description = "This task is useful to propagate your logs."
)
@Plugin(
examples = {
@Example(
code = {
"level: ERROR",
}
),
@Example(
code = {
"level: WARN",
"tasksId: ",
" - \"previous-task-id\""
}
),
@Example(
code = {
"level: WARN",
"executionId: \"{{execution.id}}\""
}
)
}
)
public class Fetch extends Task implements RunnableTask<Fetch.Output> {
@Schema(
title = "Filter on specific execution",
description = "If not set, will use the current execution"
)
@PluginProperty(dynamic = true)
private String executionId;

@Schema(
title = "Filter on specific task(s)"
)
@PluginProperty
private Collection<String> tasksId;

@Schema(
title = "Minimum log level you want to fetch"
)
@Builder.Default
@PluginProperty
private Level level = Level.INFO;

@Override
public Output run(RunContext runContext) throws Exception {
String executionId = this.executionId != null ? runContext.render(this.executionId) : (String) new HashMap<>((Map<String, Object>) runContext.getVariables().get("execution")).get("id");
LogRepositoryInterface logRepository = runContext.getApplicationContext().getBean(LogRepositoryInterface.class);

File tempFile = runContext.tempFile(".ion").toFile();
AtomicLong count = new AtomicLong();

try (OutputStream output = new FileOutputStream(tempFile)) {
if (this.tasksId != null) {
for (String taskId : tasksId) {
logRepository
.findByExecutionIdAndTaskId(executionId, taskId, level)
.forEach(throwConsumer(log -> {
count.incrementAndGet();
FileSerde.write(output, log);
}));
}
} else {
logRepository
.findByExecutionId(executionId, level)
.forEach(throwConsumer(log -> {
count.incrementAndGet();
FileSerde.write(output, log);
}));
}
}

return Output
.builder()
.uri(runContext.putTempFile(tempFile))
.size(count.get())
.build();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The size of the fetched rows"
)
private Long size;

@Schema(
title = "The uri of stored results",
description = "File format is ion"
)
private URI uri;
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 58;
public static long FLOWS_COUNT = 61;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/java/io/kestra/core/tasks/FetchTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.kestra.core.tasks;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class FetchTest extends AbstractMemoryRunnerTest {
@Inject
FlowRepositoryInterface flowRepository;

@Test
void fetch() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(2));
}

@Test
void fetchWithTaskId() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log-taskid");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(1));
}

@Test
void fetchWithExecutionId() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log-executionid");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(2));
}
}
12 changes: 12 additions & 0 deletions core/src/test/resources/flows/valids/get-log-executionid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
id: get-log-executionid
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.log.Fetch
id: get-log-task
executionId: "{{execution.id}}"
13 changes: 13 additions & 0 deletions core/src/test/resources/flows/valids/get-log-taskid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
id: get-log-taskid
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.log.Fetch
id: get-log-task
tasksId:
- task-1
11 changes: 11 additions & 0 deletions core/src/test/resources/flows/valids/get-log.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id: get-log
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.log.Fetch
id: get-log-task
2 changes: 1 addition & 1 deletion core/src/test/resources/flows/valids/pause-delay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace: io.kestra.tests
tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
delay: PT1S
delay: PT10S
tasks:
- id: ko
type: io.kestra.core.tasks.scripts.Bash
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.kestra.repository.memory;

import io.kestra.core.models.executions.Execution;
import io.micronaut.data.model.Pageable;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import org.slf4j.event.Level;

import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Singleton;

import javax.annotation.Nullable;
import java.util.stream.Collectors;

@Singleton
@MemoryRepositoryEnabled
Expand All @@ -21,12 +21,18 @@ public class MemoryLogRepository implements LogRepositoryInterface {

@Override
public List<LogEntry> findByExecutionId(String id, Level minLevel) {
throw new UnsupportedOperationException();
return logs
.stream()
.filter(logEntry -> logEntry.getExecutionId().equals(id) && logEntry.getLevel().equals(minLevel))
.collect(Collectors.toList());
}

@Override
public List<LogEntry> findByExecutionIdAndTaskId(String executionId, String taskId, Level minLevel) {
throw new UnsupportedOperationException();
return logs
.stream()
.filter(logEntry -> logEntry.getExecutionId().equals(executionId) && logEntry.getTaskId().equals(taskId) && logEntry.getLevel().equals(minLevel))
.collect(Collectors.toList());
}

@Override
Expand Down