Skip to content

Commit

Permalink
feat(core): store metrics in a dedicated repository (#1047)
Browse files Browse the repository at this point in the history
close #969
  • Loading branch information
loicmathieu authored Mar 20, 2023
1 parent 3e08b16 commit 570374e
Show file tree
Hide file tree
Showing 43 changed files with 888 additions and 59 deletions.
3 changes: 3 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ kestra:
logs:
table: "logs"
cls: io.kestra.core.models.executions.LogEntry
metrics:
table: "metrics"
cls: io.kestra.core.models.executions.MetricEntry
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.time.Instant;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -23,7 +24,7 @@
@JsonSubTypes.Type(value = Timer.class, name = "timer"),
})
@ToString
@EqualsAndHashCode
@EqualsAndHashCode(exclude="timestamp")
@Getter
@NoArgsConstructor
@Introspected
Expand All @@ -35,6 +36,8 @@ abstract public class AbstractMetricEntry<T> {

protected Map<String, String> tags;

protected Instant timestamp = Instant.now();

protected AbstractMetricEntry(@NotNull String name, String[] tags) {
this.name = name;
this.tags = tagsAsMap(tags);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.kestra.core.models.executions;

import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.micronaut.core.annotation.Nullable;
import lombok.Builder;
import lombok.Value;

import java.time.Instant;
import java.util.Map;
import javax.validation.constraints.NotNull;

@Value
@Builder(toBuilder = true)
public class MetricEntry implements DeletedInterface {
@NotNull
String namespace;

@NotNull
String flowId;

@Nullable
String taskId;

@Nullable
String executionId;

@Nullable
String taskRunId;

@NotNull
String type;

@NotNull
String name;

@NotNull
Double value;

@NotNull
Instant timestamp;

@Nullable
Map<String, String> tags;

@NotNull
@Builder.Default
boolean deleted = false;

public static MetricEntry of(TaskRun taskRun, AbstractMetricEntry<?> metricEntry) {
return MetricEntry.builder()
.namespace(taskRun.getNamespace())
.flowId(taskRun.getFlowId())
.executionId(taskRun.getExecutionId())
.taskId(taskRun.getTaskId())
.taskRunId(taskRun.getId())
.type(metricEntry.getType())
.name(metricEntry.name)
.tags(metricEntry.getTags())
.value(computeValue(metricEntry))
.timestamp(metricEntry.getTimestamp())
.build();
}

private static Double computeValue(AbstractMetricEntry<?> metricEntry) {
if (metricEntry instanceof Counter) {
return ((Counter) metricEntry).getValue();
}

if (metricEntry instanceof Timer) {
return (double) ((Timer) metricEntry).getValue().toMillis();
}

throw new IllegalArgumentException("Unknown metric type: " + metricEntry.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Builder;
import lombok.Value;
import lombok.With;
import io.kestra.core.models.flows.State;

import java.util.List;
Expand All @@ -12,23 +11,20 @@
@Value
@Builder
public class TaskRunAttempt {
@With
List<AbstractMetricEntry<?>> metrics;
/**
* @deprecated Should always be null, we need to keep it for backward compatibility or the deserialization of old attempt will no longer work.
*/
@Deprecated
public void setMetrics(List<AbstractMetricEntry<?>> metrics) {

}

@NotNull
State state;

public TaskRunAttempt withState(State.Type state) {
return new TaskRunAttempt(
this.metrics,
this.state.withState(state)
);
}

public Optional<AbstractMetricEntry<?>> findMetrics(String name) {
return this.metrics
.stream()
.filter(metricEntry -> metricEntry.getName().equals(name))
.findFirst();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
import io.kestra.core.models.flows.Flow;
Expand All @@ -16,11 +17,11 @@ public interface QueueFactoryInterface {
String FLOW_NAMED = "flowQueue";
String TEMPLATE_NAMED = "templateQueue";
String WORKERTASKLOG_NAMED = "workerTaskLogQueue";
String METRIC_QUEUE = "workerTaskMetricQueue";
String KILL_NAMED = "executionKilledQueue";
String WORKERINSTANCE_NAMED = "workerInstanceQueue";
String WORKERTASKRUNNING_NAMED = "workerTaskRuninngQueue";
String TRIGGER_NAMED = "triggerQueue";
String LOG_NAMED = "logQueue";

QueueInterface<Execution> execution();

Expand All @@ -32,6 +33,8 @@ public interface QueueFactoryInterface {

QueueInterface<LogEntry> logEntry();

QueueInterface<MetricEntry> metricEntry();

QueueInterface<Flow> flow();

QueueInterface<ExecutionKilled> kill();
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.topologies.FlowTopology;
Expand Down Expand Up @@ -49,6 +50,8 @@ public String key(Object object) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == FlowTopology.class) {
return ((FlowTopology) object).uid();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.core.repositories;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.MetricEntry;
import io.micronaut.data.model.Pageable;

import java.util.List;

public interface MetricRepositoryInterface extends SaveRepositoryInterface<MetricEntry> {
ArrayListTotal<MetricEntry> findByExecutionId(String id, Pageable pageable);

ArrayListTotal<MetricEntry> findByExecutionIdAndTaskId(String executionId, String taskId, Pageable pageable);

ArrayListTotal<MetricEntry> findByExecutionIdAndTaskRunId(String executionId, String taskRunId, Pageable pageable);

Integer purge(Execution execution);
}
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.micronaut.context.annotation.Requires;
Expand All @@ -23,6 +25,9 @@ public class Indexer implements IndexerInterface {
private final QueueInterface<Execution> executionQueue;
private final LogRepositoryInterface logRepository;
private final QueueInterface<LogEntry> logQueue;

private final MetricRepositoryInterface metricRepository;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;

@Inject
Expand All @@ -31,19 +36,24 @@ public Indexer(
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
LogRepositoryInterface logRepository,
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface<MetricEntry> metricQueue,
MetricRegistry metricRegistry
) {
this.executionRepository = executionRepository;
this.executionQueue = executionQueue;
this.logRepository = logRepository;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
}

@Override
public void run() {
this.send(executionQueue, executionRepository);
this.send(logQueue, logRepository);
this.send(metricQueue, metricRepository);
}

protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
Expand All @@ -62,5 +72,6 @@ protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterfac
public void close() throws IOException {
this.executionQueue.close();
this.logQueue.close();
this.metricQueue.close();
}
}
4 changes: 0 additions & 4 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
Expand All @@ -17,7 +16,6 @@
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.Slugify;
Expand All @@ -36,8 +34,6 @@

@NoArgsConstructor
public class RunContext {
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();

// Injected
private ApplicationContext applicationContext;
private VariableRenderer variableRenderer;
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.Hashing;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import io.micronaut.context.ApplicationContext;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class Worker implements Runnable, Closeable {
private final WorkerTaskQueueInterface workerTaskQueue;
private final QueueInterface<WorkerTaskResult> workerTaskResultQueue;
private final QueueInterface<ExecutionKilled> executionKilledQueue;
private final QueueInterface<MetricEntry> metricEntryQueue;
private final MetricRegistry metricRegistry;

private final Set<String> killedExecution = ConcurrentHashMap.newKeySet();
Expand All @@ -75,6 +77,10 @@ public Worker(ApplicationContext applicationContext, int thread) {
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.KILL_NAMED)
);
this.metricEntryQueue = (QueueInterface<MetricEntry>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.METRIC_QUEUE)
);
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);

ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class);
Expand All @@ -87,9 +93,7 @@ public void run() {
if (executionKilled != null) {
// @FIXME: the hashset will never expire killed execution
killedExecution.add(executionKilled.getExecutionId());
}

if (executionKilled != null) {
synchronized (this) {
workerThreadReferences
.stream()
Expand Down Expand Up @@ -189,6 +193,8 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu

this.logTerminated(workerTask);

//FIXME should we remove it from the killedExecution set ?

return workerTaskResult;
}

Expand Down Expand Up @@ -350,19 +356,21 @@ private WorkerTask runAttempt(WorkerTask workerTask) {

// attempt
TaskRunAttempt taskRunAttempt = builder
.metrics(runContext.metrics())
.build()
.withState(state);

// logs
if (workerThread.getTaskOutput() != null) {
if (workerThread.getTaskOutput() != null && log.isDebugEnabled()) {
log.debug("Outputs\n{}", JacksonMapper.log(workerThread.getTaskOutput()));
}

if (runContext.metrics().size() > 0) {
if (runContext.metrics().size() > 0 && log.isTraceEnabled()) {
log.trace("Metrics\n{}", JacksonMapper.log(runContext.metrics()));
}

// metrics
runContext.metrics().forEach(metric -> this.metricEntryQueue.emit(MetricEntry.of(workerTask.getTaskRun(), metric)));

// save outputs
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, taskRunAttempt);

Expand Down Expand Up @@ -448,6 +456,7 @@ public void close() throws IOException {
workerTaskQueue.close();
executionKilledQueue.close();
workerTaskResultQueue.close();
metricEntryQueue.close();
}

@Getter
Expand Down
Loading

0 comments on commit 570374e

Please sign in to comment.