Skip to content

Commit

Permalink
chore(version): update to version 'v0.4.0'.
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Feb 22, 2022
2 parents e5aa680 + ee6004e commit 3ed2e13
Show file tree
Hide file tree
Showing 129 changed files with 4,467 additions and 3,912 deletions.
20 changes: 16 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
with:
python-version: '3.x'
architecture: 'x64'
- uses: actions/setup-node@v1
- uses: actions/setup-node@v2
with:
node-version: '12'
check-latest: true
Expand All @@ -53,15 +53,15 @@ jobs:
${{ runner.os }}-gradle-
- name: Npm cache
uses: actions/cache@v1
uses: actions/cache@v2
with:
path: ~/.npm
key: ${{ runner.os }}-npm-${{ hashFiles('**/package-lock.json') }}
restore-keys: |
${{ runner.os }}-npm-
- name: Node cache
uses: actions/cache@v1
uses: actions/cache@v2
with:
path: node
key: ${{ runner.os }}-node-${{ hashFiles('ui/*.gradle') }}
Expand Down Expand Up @@ -186,6 +186,7 @@ jobs:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}', steps.vars.outputs.tag) }}
platforms: linux/amd64,linux/arm64

- name: Push to Docker Hub Full
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/develop' || github.ref == 'refs/heads/release' || startsWith(github.ref, 'refs/tags/v')
Expand All @@ -194,10 +195,21 @@ jobs:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}-full', steps.vars.outputs.tag) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }} io.kestra.storage:storage-gcs:LATEST io.kestra.storage:storage-minio:LATEST io.kestra.plugin:plugin-aws:LATEST io.kestra.plugin:plugin-compress:LATEST io.kestra.plugin:plugin-crypto:LATEST io.kestra.plugin:plugin-elasticsearch:LATEST io.kestra.plugin:plugin-fs:LATEST io.kestra.plugin:plugin-gcp:LATEST io.kestra.plugin:plugin-googleworkspace:LATEST io.kestra.plugin:plugin-jdbc-clickhouse:LATEST io.kestra.plugin:plugin-jdbc-mysql:LATEST io.kestra.plugin:plugin-jdbc-oracle:LATEST io.kestra.plugin:plugin-jdbc-postgres:LATEST io.kestra.plugin:plugin-jdbc-redshift:LATEST io.kestra.plugin:plugin-jdbc-vertica:LATEST io.kestra.plugin:plugin-jdbc-vectorwise:LATEST io.kestra.plugin:plugin-kubernetes:LATEST io.kestra.plugin:plugin-mongodb:LATEST io.kestra.plugin:plugin-notifications:LATEST io.kestra.plugin:plugin-script-groovy:LATEST io.kestra.plugin:plugin-script-jython:LATEST io.kestra.plugin:plugin-script-nashorn:LATEST io.kestra.plugin:plugin-serdes:LATEST io.kestra.plugin:plugin-singer:LATEST
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }} io.kestra.storage:storage-gcs:LATEST io.kestra.storage:storage-minio:LATEST io.kestra.plugin:plugin-aws:LATEST io.kestra.plugin:plugin-compress:LATEST io.kestra.plugin:plugin-crypto:LATEST io.kestra.plugin:plugin-elasticsearch:LATEST io.kestra.plugin:plugin-fs:LATEST io.kestra.plugin:plugin-gcp:LATEST io.kestra.plugin:plugin-googleworkspace:LATEST io.kestra.plugin:plugin-jdbc-clickhouse:LATEST io.kestra.plugin:plugin-jdbc-mysql:LATEST io.kestra.plugin:plugin-jdbc-oracle:LATEST io.kestra.plugin:plugin-jdbc-postgres:LATEST io.kestra.plugin:plugin-jdbc-redshift:LATEST io.kestra.plugin:plugin-jdbc-vertica:LATEST io.kestra.plugin:plugin-jdbc-vectorwise:LATEST io.kestra.plugin:plugin-kafka:LATEST io.kestra.plugin:plugin-kubernetes:LATEST io.kestra.plugin:plugin-mongodb:LATEST io.kestra.plugin:plugin-notifications:LATEST io.kestra.plugin:plugin-script-groovy:LATEST io.kestra.plugin:plugin-script-jython:LATEST io.kestra.plugin:plugin-script-nashorn:LATEST io.kestra.plugin:plugin-serdes:LATEST io.kestra.plugin:plugin-singer:LATEST
APT_PACKAGES=python3-pip python3-wheel python3-setuptools python3-virtualenv nodejs
# GitHub Release
- name: Create GitHub release
uses: "marvinpinto/action-automatic-releases@latest"
if: startsWith(github.ref, 'refs/tags/v')
with:
repo_token: "${{ secrets.GITHUB_TOKEN }}"
draft: true
files: |
build/executable/*
# Slack
- name: Slack notification
uses: 8398a7/action-slack@v3
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.kestra.cli.commands.servers;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.runners.ExecutorInterface;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.AbstractExecutor;
import io.kestra.core.utils.Await;
import picocli.CommandLine;

Expand Down Expand Up @@ -36,12 +36,12 @@ public static Map<String, Object> propertiesOverrides() {
public Integer call() throws Exception {
super.call();

AbstractExecutor abstractExecutor = applicationContext.getBean(AbstractExecutor.class);
abstractExecutor.run();
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();

log.info("Executor started");

this.shutdownHook(abstractExecutor::close);
this.shutdownHook(executorService::close);

Await.until(() -> !this.applicationContext.isRunning());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package io.kestra.cli.commands.servers;

import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.AbstractExecutor;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.utils.Await;
import io.kestra.runner.kafka.KafkaExecutor;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import jakarta.inject.Inject;

@CommandLine.Command(
name = "standalone",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class FlowListenersRestoreCommand extends AbstractCommand {
private ApplicationContext applicationContext;

@CommandLine.Option(names = {"--timeout"}, description = "Timeout before quit, considering we complete the restore")
private Duration timeout = Duration.ofSeconds(15);
private Duration timeout = Duration.ofSeconds(60);

public FlowListenersRestoreCommand() {
super(false);
Expand All @@ -35,16 +35,20 @@ public Integer call() throws Exception {
super.call();

FlowListenersInterface flowListeners = applicationContext.getBean(FlowListenersInterface.class);

AtomicReference<ZonedDateTime> lastTime = new AtomicReference<>(ZonedDateTime.now());

flowListeners.run();
flowListeners.listen(flows -> {
stdOut("Received {0} flows", flows.size());
long count = flows.stream().filter(flow -> !flow.isDeleted()).count();

stdOut("Received {0} active flows", count);

lastTime.set(ZonedDateTime.now());
if (count > 0) {
lastTime.set(ZonedDateTime.now());
}
});

// we can't know when it's over to wait no more flow received
// we can't know when it's over, wait no more flow received
Await.until(() -> lastTime.get().compareTo(ZonedDateTime.now().minus(this.timeout)) < 0);

return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.cli.services;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowSource;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
Expand Down Expand Up @@ -32,6 +33,8 @@ public int flows(boolean noRecreate) {
.findAll()
.stream()
.flatMap(flow -> flowRepository.findRevisions(flow.getNamespace(), flow.getId()).stream())
// we can't resend FlowSource since deserialize failed & will be invalid
.filter(flow -> !(flow instanceof FlowSource))
.collect(Collectors.toList());

return this.send(flows, QueueFactoryInterface.FLOW_NAMED, Flow.class, noRecreate);
Expand Down
11 changes: 10 additions & 1 deletion cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ endpoints:
write-sensitive: false

kestra:
retries:
attempts: 5
multiplier: 2.0
delay: 1s
maxDelay: ""

kafka:
defaults:
topic-prefix: "kestra_"
Expand Down Expand Up @@ -105,7 +111,7 @@ kestra:
segment.bytes: "10485760"

executor:
name: "${kestra.kafka.defaults.topic-prefix}executor-executor-changelog"
name: "${kestra.kafka.defaults.topic-prefix}executor_main-executor-changelog"
cls: io.kestra.core.runners.Executor
properties:
cleanup.policy: "delete,compact"
Expand Down Expand Up @@ -232,6 +238,9 @@ kestra:
variables:
env-vars-prefix: KESTRA_
globals: {}
disable-handlebars: true
cache-enabled: true
cache-size: 1000

metrics:
prefix: kestra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ void run() throws InterruptedException {

thread.join();

assertThat(out.toString(), containsString("Received 1 flows"));
assertThat(out.toString(), containsString("Received 5 flows"));
assertThat(out.toString(), containsString("Received 1 active flows"));
assertThat(out.toString(), containsString("Received 5 active flows"));
}
}
}
68 changes: 68 additions & 0 deletions core/src/main/java/io/kestra/core/annotations/Retryable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.core.annotations;

import io.micronaut.aop.Around;
import io.micronaut.context.annotation.AliasFor;
import io.micronaut.context.annotation.Type;
import io.micronaut.retry.annotation.DefaultRetryPredicate;
import io.micronaut.retry.annotation.RetryPredicate;
import io.micronaut.retry.intercept.OverrideRetryInterceptor;

import java.lang.annotation.*;

import javax.validation.constraints.Digits;

import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Inherited
@Documented
@Retention(RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Around
@Type(OverrideRetryInterceptor.class)
public @interface Retryable {
int MAX_INTEGRAL_DIGITS = 4;

/**
* @return The exception types to include (defaults to all)
*/
Class<? extends Throwable>[] value() default {};

/**
* @return The exception types to include (defaults to all)
*/
@AliasFor(member = "value")
Class<? extends Throwable>[] includes() default {};

/**
* @return The exception types to exclude (defaults to none)
*/
Class<? extends Throwable>[] excludes() default {};

/**
* @return The maximum number of retry attempts
*/
@Digits(integer = MAX_INTEGRAL_DIGITS, fraction = 0)
String attempts() default "${kestra.retries.attempts:5}";

/**
* @return The delay between retry attempts
*/
String delay() default "${kestra.retries.delay:1s}";

/**
* @return The maximum overall delay
*/
String maxDelay() default "${kestra.retries.max-delay:}";

/**
* @return The multiplier to use to calculate the delay
*/
@Digits(integer = 2, fraction = 2)
String multiplier() default "${kestra.retries.multiplier:2.0}";

/**
* @return The retry predicate class to use instead of {@link io.micronaut.retry.annotation.Retryable#includes} and {@link io.micronaut.retry.annotation.Retryable#excludes}
* (defaults to none)
*/
Class<? extends RetryPredicate> predicate() default DefaultRetryPredicate.class;
}
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/listeners/RetryEvents.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ public class RetryEvents {
void onRetry(final RetryEvent event) {
log.info(
"Retry from '{}.{}()', attempt {}, overallDelay {}",
event.getSource().getExecutableMethod().getClass().getName(),
event.getSource().getName(),
event.getSource().getTarget().getClass().getName(),
event.getSource().getExecutableMethod().getName(),
event.getRetryState().currentAttempt(),
event.getRetryState().getOverallDelay()
);
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger
);
}


/**
* Return globals tags
*
* @return tags to applied to metrics
*/
public Tags tags(String... tags) {
return Tags.of(tags);
}

/**
* Attach a {@link MeterBinder} to current registry
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static FlowUsage of(FlowRepositoryInterface flowRepository) {
.namespacesCount(allFlows
.stream()
.map(Flow::getNamespace)
.distinct()
.count()
)
.taskTypeCount(allFlows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import oshi.software.os.OperatingSystem;

import java.lang.management.ManagementFactory;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -83,7 +84,8 @@ public static HostUsage of() {
processorIdentifier,
processorsCount
)
.filter(s -> s.equals("unknown"))
.filter(Objects::nonNull)
.filter(s -> !s.equals("unknown"))
.map(s -> String.format("%08x", s.hashCode()))
.collect(Collectors.joining("-"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public static List<PluginUsage> of(ApplicationContext applicationContext) {
KestraApplicationContext context = (KestraApplicationContext) applicationContext;
PluginRegistry pluginRegistry = context.getPluginRegistry();

if (pluginRegistry == null) {
return List.of();
}

return pluginRegistry.getPlugins()
.stream()
.map(registeredPlugin -> PluginUsage.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
code = {
"- conditions:",
" - type: io.kestra.core.models.conditions.types.DayWeekInMonthCondition",
" dayOfWeek: io.kestra.core.models.conditions.types.DayWeekInMonthCondition",
" dayOfWeek: MONDAY",
" dayInMonth: FIRST",
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,13 @@ public boolean hasTaskRunJoinable(TaskRun taskRun) {
}

/**
* Convert an exception on {@link io.kestra.core.runners.AbstractExecutor} and add log to the current
* Convert an exception on Executor and add log to the current
* {@code RUNNING} taskRun, on the lastAttempts.
* If no Attempt is found, we create one (must be nominal case).
* The executor will catch the {@code FAILED} taskRun emitted and will failed the execution.
* In the worst case, we FAILED the execution (only from {@link io.kestra.core.models.triggers.types.Flow}).
*
* @param e the exception throw from {@link io.kestra.core.runners.AbstractExecutor}
* @param e the exception throw from Executor
* @return a new execution with taskrun failed if possible or execution failed is other case
*/
public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) {
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ public static String uid(String namespace, String id, Optional<Integer> revision
));
}

public static String uidWithoutRevision(String namespace, String id) {
return String.join("_", Arrays.asList(
namespace,
id
));
}

public static String uidWithoutRevision(Execution execution) {
return String.join("_", Arrays.asList(
execution.getNamespace(),
Expand All @@ -134,7 +141,7 @@ public Stream<Task> allTasks() {
.flatMap(Collection::stream);
}

private List<Task> allTasksWithChilds() {
public List<Task> allTasksWithChilds() {
return allTasks()
.flatMap(this::allTasksWithChilds)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public interface PollingTriggerInterface {
Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception;

default ZonedDateTime nextEvaluationDate(Optional<? extends TriggerContext> last) {
default ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
return ZonedDateTime.now();
}

Expand Down
Loading

0 comments on commit 3ed2e13

Please sign in to comment.