Skip to content

Commit

Permalink
chore(version): update to version 'v0.4.5'.
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed May 5, 2022
2 parents 2863d03 + 6929bc8 commit 316bf5a
Show file tree
Hide file tree
Showing 25 changed files with 476 additions and 365 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:
packages: ""
- name: "-full"
plugins: io.kestra.storage:storage-azure:LATEST io.kestra.storage:storage-gcs:LATEST io.kestra.storage:storage-minio:LATEST io.kestra.plugin:plugin-aws:LATEST io.kestra.plugin:plugin-azure:LATEST io.kestra.plugin:plugin-cassandra:LATEST io.kestra.plugin:plugin-compress:LATEST io.kestra.plugin:plugin-crypto:LATEST io.kestra.plugin:plugin-dbt:LATEST io.kestra.plugin:plugin-debezium-mysql:LATEST io.kestra.plugin:plugin-debezium-postgres:LATEST io.kestra.plugin:plugin-debezium-sqlserver:LATEST io.kestra.plugin:plugin-elasticsearch:LATEST io.kestra.plugin:plugin-fs:LATEST io.kestra.plugin:plugin-gcp:LATEST io.kestra.plugin:plugin-googleworkspace:LATEST io.kestra.plugin:plugin-jdbc-clickhouse:LATEST io.kestra.plugin:plugin-jdbc-mysql:LATEST io.kestra.plugin:plugin-jdbc-oracle:LATEST io.kestra.plugin:plugin-jdbc-postgres:LATEST io.kestra.plugin:plugin-jdbc-redshift:LATEST io.kestra.plugin:plugin-jdbc-snowflake:LATEST io.kestra.plugin:plugin-jdbc-sqlserver:LATEST io.kestra.plugin:plugin-jdbc-vertica:LATEST io.kestra.plugin:plugin-jdbc-vectorwise:LATEST io.kestra.plugin:plugin-kafka:LATEST io.kestra.plugin:plugin-kubernetes:LATEST io.kestra.plugin:plugin-mongodb:LATEST io.kestra.plugin:plugin-mqtt:LATEST io.kestra.plugin:plugin-notifications:LATEST io.kestra.plugin:plugin-script-groovy:LATEST io.kestra.plugin:plugin-script-jython:LATEST io.kestra.plugin:plugin-script-nashorn:LATEST io.kestra.plugin:plugin-serdes:LATEST io.kestra.plugin:plugin-singer:LATEST io.kestra.plugin:plugin-spark:LATEST
packages: python3-pip python3-wheel python3-setuptools python3-virtualenv nodejs curl wait-for-it zip unzip
packages: python3-pip python3-wheel python3-setuptools python3-virtualenv python-is-python3 nodejs curl wait-for-it zip unzip
steps:
- uses: actions/checkout@v2

Expand Down
4 changes: 3 additions & 1 deletion cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ micronaut:
ui:
paths: classpath:ui
mapping: /ui/**
static:
paths: classpath:static
mapping: /static/**
server:
max-request-size: 10GB
multipart:
Expand Down Expand Up @@ -239,7 +242,6 @@ kestra:

variables:
env-vars-prefix: KESTRA_
globals: {}
disable-handlebars: true
cache-enabled: true
cache-size: 1000
Expand Down
29 changes: 11 additions & 18 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public class MetricRegistry {
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";

public final static String KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public final static String KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public final static String KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public final static String KESTRA_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public final static String METRIC_EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
Expand All @@ -51,14 +51,13 @@ public class MetricRegistry {
public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public final static String TAG_TASK_ID = "task_id";
public final static String STREAMS_STATE_COUNT = "stream.state.count";

public final static String TAG_TASK_TYPE = "task_type";
public final static String TAG_FLOW_ID = "flow_id";
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_TRIGGER_ID = "trigger_id";
public final static String TAG_STATE = "state";
public final static String TAG_ATTEMPT_COUNT = "attempt_count";
public final static String TAG_VALUE = "value";

@Inject
private MeterRegistry meterRegistry;
Expand Down Expand Up @@ -153,7 +152,6 @@ public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
tags,
TAG_NAMESPACE_ID, workerTaskResult.getTaskRun().getNamespace(),
TAG_FLOW_ID, workerTaskResult.getTaskRun().getFlowId(),
TAG_TASK_ID, workerTaskResult.getTaskRun().getTaskId(),
TAG_STATE, workerTaskResult.getTaskRun().getState().getCurrent().name()
);
}
Expand All @@ -166,7 +164,6 @@ public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
*/
public String[] tags(Task task) {
return new String[]{
TAG_TASK_ID, task.getId(),
TAG_TASK_TYPE, task.getType(),
};
}
Expand Down Expand Up @@ -195,7 +192,6 @@ public String[] tags(TriggerContext triggerContext) {
return new String[]{
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace(),
TAG_TRIGGER_ID, triggerContext.getTriggerId(),
};
}

Expand All @@ -207,11 +203,8 @@ public String[] tags(TriggerContext triggerContext) {
*/
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
return ArrayUtils.addAll(
ArrayUtils.addAll(
this.tags(schedulerExecutionWithTrigger.getExecution()),
tags
),
TAG_TRIGGER_ID, schedulerExecutionWithTrigger.getTriggerContext().getTriggerId()
this.tags(schedulerExecutionWithTrigger.getExecution()),
tags
);
}

Expand Down
13 changes: 6 additions & 7 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.DynamicTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -118,7 +117,7 @@ public Execution onNexts(Flow flow, Execution execution, List<TaskRun> nexts) {

if (execution.getState().getCurrent() == State.Type.CREATED) {
metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution))
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution))
.increment();

flow.logger().info(
Expand All @@ -132,7 +131,7 @@ public Execution onNexts(Flow flow, Execution execution, List<TaskRun> nexts) {
}

metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution))
.counter(MetricRegistry.EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution))
.increment(nexts.size());

return newExecution;
Expand Down Expand Up @@ -201,7 +200,7 @@ private Optional<WorkerTaskResult> childWorkerTaskTypeToWorkerTask(
.peek(workerTaskResult -> {
metricRegistry
.counter(
MetricRegistry.KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT,
MetricRegistry.EXECUTOR_WORKERTASKRESULT_COUNT,
metricRegistry.tags(workerTaskResult)
)
.increment();
Expand Down Expand Up @@ -296,11 +295,11 @@ private Executor onEnd(Executor executor) {
}

metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution))
.counter(MetricRegistry.EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution))
.increment();

metricRegistry
.timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution))
.timer(MetricRegistry.EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution))
.record(newExecution.getState().getDuration());

return executor.withExecution(newExecution, "onEnd");
Expand Down Expand Up @@ -481,7 +480,7 @@ private Executor handleRestart(Executor executor) {
}

metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution()))
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution()))
.increment();

executor.getFlow().logger().info(
Expand Down
16 changes: 2 additions & 14 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ protected Map<String, Object> variables(Flow flow, Task task, Execution executio

if (applicationContext.getProperty("kestra.variables.globals", Map.class).isPresent()) {
builder.put("globals", applicationContext.getProperty("kestra.variables.globals", Map.class).get());
} else {
builder.put("globals", Map.of());
}

if (flow != null) {
Expand Down Expand Up @@ -546,20 +548,6 @@ private Map<String, String> metricsTags() {
.put(MetricRegistry.TAG_NAMESPACE_ID, ((Map<String, String>) this.variables.get("flow")).get("namespace"));
}

if (this.variables.containsKey("task")) {
builder
.put(MetricRegistry.TAG_TASK_ID, ((Map<String, String>) this.variables.get("task")).get("id"))
.put(MetricRegistry.TAG_TASK_TYPE, ((Map<String, String>) this.variables.get("task")).get("type"));
}

if (this.variables.containsKey("taskrun")) {
Map<String, String> taskrun = (Map<String, String>) this.variables.get("taskrun");

if (taskrun.containsValue("value")) {
builder.put(MetricRegistry.TAG_VALUE, taskrun.get("value"));
}
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,5 +366,13 @@ public static class DockerOptions {
)
@PluginProperty(dynamic = true)
protected String networkMode;

@Schema(
title = "List of volumes to mount",
description = "Must be a valid mount expression as string, example : `/home/user:/app`\n\n" +
"Volumes mount are disabled by default for security reasons, you must enabled on server configuration with `kestra.tasks.scripts.docker.volume-enabled` to `true`"
)
@PluginProperty(dynamic = true)
protected List<String> volumes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@
public class DockerScriptRunner implements ScriptRunnerInterface {
private final RetryUtils retryUtils;

private final Boolean volumesEnabled;

public DockerScriptRunner(ApplicationContext applicationContext) {
this.retryUtils = applicationContext.getBean(RetryUtils.class);
this.volumesEnabled = applicationContext.getProperty("kestra.tasks.scripts.docker.volume-enabled", Boolean.class).orElse(false);
}

private DockerClient getDockerClient(AbstractBash abstractBash, RunContext runContext, Path workingDirectory) throws IllegalVariableEvaluationException, IOException {
Expand Down Expand Up @@ -172,6 +175,14 @@ public RunResult run(
hostConfig.withExtraHosts(runContext.render(abstractBash.getDockerOptions().getExtraHosts(), additionalVars).toArray(String[]::new));
}

if (this.volumesEnabled && abstractBash.getDockerOptions().getVolumes() != null) {
hostConfig.withBinds(runContext.render(abstractBash.getDockerOptions().getVolumes())
.stream()
.map(Bind::parse)
.collect(Collectors.toList())
);
}

if (abstractBash.getDockerOptions().getNetworkMode() != null) {
hostConfig.withNetworkMode(runContext.render(abstractBash.getDockerOptions().getNetworkMode(), additionalVars));
}
Expand Down
45 changes: 45 additions & 0 deletions core/src/test/java/io/kestra/core/tasks/DockerBashTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
package io.kestra.core.tasks;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import io.kestra.core.runners.RunContext;
import io.kestra.core.tasks.scripts.AbstractBash;
import io.kestra.core.tasks.scripts.Bash;
import io.kestra.core.tasks.scripts.ScriptOutput;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.annotation.Property;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest
@Property(name = "kestra.tasks.scripts.docker.volume-enabled", value = "true")
class DockerBashTest extends AbstractBashTest {
@Override
protected Bash.BashBuilder<?, ?> configure(Bash.BashBuilder<?, ?> builder) {
Expand All @@ -17,4 +36,30 @@ class DockerBashTest extends AbstractBashTest {
.build()
);
}

@Test
void volume() throws Exception {
Path tmpDir = Files.createTempDirectory("tmpDirPrefix");
Path tmpFile = tmpDir.resolve("tmp.txt");
Files.write(tmpFile, "I'm here".getBytes());


Bash bash = configure(Bash.builder()
.commands(new String[]{
"echo '::{\"outputs\": {\"extract\":\"'$(cat /host/tmp.txt)'\"}}::'",
})
)
.dockerOptions(AbstractBash.DockerOptions.builder()
.image("ubuntu")
.volumes(List.of(tmpDir.toFile() + ":/host" ))
.build()
)
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, bash, ImmutableMap.of());
ScriptOutput run = bash.run(runContext);

assertThat(run.getStdOutLineCount(), is(1));
assertThat(run.getVars().get("extract"), is("I'm here"));
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.4.4
version=0.4.5
opensearchVersion=1.3.1
micronautVersion=3.4.1
kafkaVersion=3.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -115,11 +116,23 @@ public KafkaStreamService.Stream of(Class<?> clientId, Class<?> groupId, Topolog

public static class Stream extends KafkaStreams {
private final Logger logger;

private final MetricRegistry meterRegistry;

private final String[] tags;

private KafkaStreamsMetrics metrics;

private boolean hasStarted = false;

private Stream(Topology topology, Properties props, MetricRegistry meterRegistry, Logger logger) {
super(topology, props);
this.meterRegistry = meterRegistry;

tags = new String[]{
"client_class_id",
(String) props.get(CommonClientConfigs.CLIENT_ID_CONFIG)
};

if (meterRegistry != null) {
metrics = new KafkaStreamsMetrics(
Expand All @@ -141,13 +154,25 @@ private Stream(Topology topology, Properties props, MetricRegistry meterRegistry

public synchronized void start(final KafkaStreams.StateListener listener) throws IllegalStateException, StreamsException {
this.setUncaughtExceptionHandler(e -> {
log.error("Uncaught exception in Kafka Stream, closing !", e);
this.logger.error("Uncaught exception in Kafka Stream, closing !", e);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});

this.setGlobalStateRestoreListener(new StateRestoreLoggerListeners(logger));

this.setStateListener((newState, oldState) -> {
meterRegistry.gauge(
MetricRegistry.STREAMS_STATE_COUNT,
0,
ArrayUtils.addAll(tags, "state", oldState.name())
);

meterRegistry.gauge(
MetricRegistry.STREAMS_STATE_COUNT,
1,
ArrayUtils.addAll(tags, "state", newState.name())
);

if (newState == State.RUNNING) {
this.hasStarted = true;
}
Expand All @@ -157,12 +182,12 @@ public synchronized void start(final KafkaStreams.StateListener listener) throws
newState == State.NOT_RUNNING ||
newState == State.PENDING_SHUTDOWN
) {
log.warn("Switching stream state from {} to {}", oldState, newState);
this.logger.warn("Switching stream state from {} to {}", oldState, newState);
} else if (
newState == State.PENDING_ERROR ||
newState == State.ERROR
) {
log.error("Switching stream state from {} to {}", oldState, newState);
this.logger.error("Switching stream state from {} to {}", oldState, newState);
} else {
logger.info("Switching stream state from {} to {}", oldState, newState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ public Executor transform(final String key, final Executor value) {
if (workerTaskResult.getTaskRun().getState().isTerninated()) {
metricRegistry
.counter(
MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT,
MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT,
metricRegistry.tags(workerTaskResult)
)
.increment();

metricRegistry
.timer(
MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION,
MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION,
metricRegistry.tags(workerTaskResult)
)
.record(workerTaskResult.getTaskRun().getState().getDuration());
Expand Down
Loading

0 comments on commit 316bf5a

Please sign in to comment.