From 6ddcb9a41a51870f4e26210941004b6cf88b284c Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Thu, 28 Apr 2022 17:02:49 +0200 Subject: [PATCH 1/9] fix(core): don't setup globals in configuration since we set up with {}, it can't be overridden during configuration --- cli/src/main/resources/application.yml | 1 - core/src/main/java/io/kestra/core/runners/RunContext.java | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index db4e04bc81..477be73b69 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -239,7 +239,6 @@ kestra: variables: env-vars-prefix: KESTRA_ - globals: {} disable-handlebars: true cache-enabled: true cache-size: 1000 diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index 77ad86b2fb..c8442a64d5 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -186,6 +186,8 @@ protected Map variables(Flow flow, Task task, Execution executio if (applicationContext.getProperty("kestra.variables.globals", Map.class).isPresent()) { builder.put("globals", applicationContext.getProperty("kestra.variables.globals", Map.class).get()); + } else { + builder.put("globals", Map.of()); } if (flow != null) { From 7fe8adb26fc05c26dabdb31a9a8db4d25ad9d594 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Fri, 29 Apr 2022 10:53:46 +0200 Subject: [PATCH 2/9] fix(docker): missing python-is-python3 --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 523741d946..9dd93101d1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -131,7 +131,7 @@ jobs: packages: "" - name: "-full" plugins: io.kestra.storage:storage-azure:LATEST io.kestra.storage:storage-gcs:LATEST io.kestra.storage:storage-minio:LATEST io.kestra.plugin:plugin-aws:LATEST io.kestra.plugin:plugin-azure:LATEST io.kestra.plugin:plugin-cassandra:LATEST io.kestra.plugin:plugin-compress:LATEST io.kestra.plugin:plugin-crypto:LATEST io.kestra.plugin:plugin-dbt:LATEST io.kestra.plugin:plugin-debezium-mysql:LATEST io.kestra.plugin:plugin-debezium-postgres:LATEST io.kestra.plugin:plugin-debezium-sqlserver:LATEST io.kestra.plugin:plugin-elasticsearch:LATEST io.kestra.plugin:plugin-fs:LATEST io.kestra.plugin:plugin-gcp:LATEST io.kestra.plugin:plugin-googleworkspace:LATEST io.kestra.plugin:plugin-jdbc-clickhouse:LATEST io.kestra.plugin:plugin-jdbc-mysql:LATEST io.kestra.plugin:plugin-jdbc-oracle:LATEST io.kestra.plugin:plugin-jdbc-postgres:LATEST io.kestra.plugin:plugin-jdbc-redshift:LATEST io.kestra.plugin:plugin-jdbc-snowflake:LATEST io.kestra.plugin:plugin-jdbc-sqlserver:LATEST io.kestra.plugin:plugin-jdbc-vertica:LATEST io.kestra.plugin:plugin-jdbc-vectorwise:LATEST io.kestra.plugin:plugin-kafka:LATEST io.kestra.plugin:plugin-kubernetes:LATEST io.kestra.plugin:plugin-mongodb:LATEST io.kestra.plugin:plugin-mqtt:LATEST io.kestra.plugin:plugin-notifications:LATEST io.kestra.plugin:plugin-script-groovy:LATEST io.kestra.plugin:plugin-script-jython:LATEST io.kestra.plugin:plugin-script-nashorn:LATEST io.kestra.plugin:plugin-serdes:LATEST io.kestra.plugin:plugin-singer:LATEST io.kestra.plugin:plugin-spark:LATEST - packages: python3-pip python3-wheel python3-setuptools python3-virtualenv nodejs curl wait-for-it zip unzip + packages: python3-pip python3-wheel python3-setuptools python3-virtualenv python-is-python3 nodejs curl wait-for-it zip unzip steps: - uses: actions/checkout@v2 From c5e81db2d122b31c77a8944526eb3b380f0a0442 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Fri, 29 Apr 2022 15:06:00 +0200 Subject: [PATCH 3/9] fix(core): too many metrics are created with task_id that overflow metrics server --- .../io/kestra/core/metrics/MetricRegistry.java | 4 ---- .../java/io/kestra/core/runners/RunContext.java | 14 -------------- 2 files changed, 18 deletions(-) diff --git a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java index 6aea377c05..a063fdabd8 100644 --- a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java +++ b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java @@ -51,14 +51,12 @@ public class MetricRegistry { public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration"; public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration"; - public final static String TAG_TASK_ID = "task_id"; public final static String TAG_TASK_TYPE = "task_type"; public final static String TAG_FLOW_ID = "flow_id"; public final static String TAG_NAMESPACE_ID = "namespace_id"; public final static String TAG_TRIGGER_ID = "trigger_id"; public final static String TAG_STATE = "state"; public final static String TAG_ATTEMPT_COUNT = "attempt_count"; - public final static String TAG_VALUE = "value"; @Inject private MeterRegistry meterRegistry; @@ -153,7 +151,6 @@ public String[] tags(WorkerTaskResult workerTaskResult, String... tags) { tags, TAG_NAMESPACE_ID, workerTaskResult.getTaskRun().getNamespace(), TAG_FLOW_ID, workerTaskResult.getTaskRun().getFlowId(), - TAG_TASK_ID, workerTaskResult.getTaskRun().getTaskId(), TAG_STATE, workerTaskResult.getTaskRun().getState().getCurrent().name() ); } @@ -166,7 +163,6 @@ public String[] tags(WorkerTaskResult workerTaskResult, String... tags) { */ public String[] tags(Task task) { return new String[]{ - TAG_TASK_ID, task.getId(), TAG_TASK_TYPE, task.getType(), }; } diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index c8442a64d5..4d47839d40 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -548,20 +548,6 @@ private Map metricsTags() { .put(MetricRegistry.TAG_NAMESPACE_ID, ((Map) this.variables.get("flow")).get("namespace")); } - if (this.variables.containsKey("task")) { - builder - .put(MetricRegistry.TAG_TASK_ID, ((Map) this.variables.get("task")).get("id")) - .put(MetricRegistry.TAG_TASK_TYPE, ((Map) this.variables.get("task")).get("type")); - } - - if (this.variables.containsKey("taskrun")) { - Map taskrun = (Map) this.variables.get("taskrun"); - - if (taskrun.containsValue("value")) { - builder.put(MetricRegistry.TAG_VALUE, taskrun.get("value")); - } - } - return builder.build(); } From 3111f389911c3be3a06c6f8f6081e34bf5b2b1f3 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Fri, 29 Apr 2022 16:51:03 +0200 Subject: [PATCH 4/9] feat(kafka-runner): add a metric on kafka stream state --- .../kestra/core/metrics/MetricRegistry.java | 16 ++++++------ .../kestra/core/runners/ExecutorService.java | 13 +++++----- .../kafka/services/KafkaStreamService.java | 25 +++++++++++++++++++ .../streams/ExecutorJoinerTransformer.java | 4 +-- .../kestra/runner/memory/MemoryExecutor.java | 4 +-- 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java index a063fdabd8..3ed491b809 100644 --- a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java +++ b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java @@ -26,13 +26,13 @@ public class MetricRegistry { public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count"; public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration"; - public final static String KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count"; - public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count"; - public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration"; - public final static String KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count"; - public final static String KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count"; - public final static String KESTRA_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count"; - public final static String METRIC_EXECUTOR_EXECUTION_DURATION = "executor.execution.duration"; + public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count"; + public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count"; + public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration"; + public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count"; + public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count"; + public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count"; + public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration"; public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count"; public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration"; @@ -51,6 +51,8 @@ public class MetricRegistry { public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration"; public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration"; + public final static String STREAMS_STATE_COUNT = "stream.state.count"; + public final static String TAG_TASK_TYPE = "task_type"; public final static String TAG_FLOW_ID = "flow_id"; public final static String TAG_NAMESPACE_ID = "namespace_id"; diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index b4421c287b..f24dc0e585 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -8,7 +8,6 @@ import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; -import io.kestra.core.models.tasks.DynamicTask; import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; @@ -118,7 +117,7 @@ public Execution onNexts(Flow flow, Execution execution, List nexts) { if (execution.getState().getCurrent() == State.Type.CREATED) { metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution)) + .counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution)) .increment(); flow.logger().info( @@ -132,7 +131,7 @@ public Execution onNexts(Flow flow, Execution execution, List nexts) { } metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution)) + .counter(MetricRegistry.EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution)) .increment(nexts.size()); return newExecution; @@ -201,7 +200,7 @@ private Optional childWorkerTaskTypeToWorkerTask( .peek(workerTaskResult -> { metricRegistry .counter( - MetricRegistry.KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT, + MetricRegistry.EXECUTOR_WORKERTASKRESULT_COUNT, metricRegistry.tags(workerTaskResult) ) .increment(); @@ -296,11 +295,11 @@ private Executor onEnd(Executor executor) { } metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution)) + .counter(MetricRegistry.EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution)) .increment(); metricRegistry - .timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution)) + .timer(MetricRegistry.EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution)) .record(newExecution.getState().getDuration()); return executor.withExecution(newExecution, "onEnd"); @@ -481,7 +480,7 @@ private Executor handleRestart(Executor executor) { } metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution())) + .counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution())) .increment(); executor.getFlow().logger().info( diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java index db444bf517..0332b746a6 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java @@ -7,6 +7,7 @@ import io.micronaut.context.annotation.Value; import io.micronaut.context.event.ApplicationEventPublisher; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -115,11 +116,23 @@ public KafkaStreamService.Stream of(Class clientId, Class groupId, Topolog public static class Stream extends KafkaStreams { private final Logger logger; + + private final MetricRegistry meterRegistry; + + private final String[] tags; + private KafkaStreamsMetrics metrics; + private boolean hasStarted = false; private Stream(Topology topology, Properties props, MetricRegistry meterRegistry, Logger logger) { super(topology, props); + this.meterRegistry = meterRegistry; + + tags = new String[]{ + "client_class_id", + (String) props.get(CommonClientConfigs.CLIENT_ID_CONFIG) + }; if (meterRegistry != null) { metrics = new KafkaStreamsMetrics( @@ -148,6 +161,18 @@ public synchronized void start(final KafkaStreams.StateListener listener) throws this.setGlobalStateRestoreListener(new StateRestoreLoggerListeners(logger)); this.setStateListener((newState, oldState) -> { + meterRegistry.gauge( + MetricRegistry.STREAMS_STATE_COUNT, + 0, + ArrayUtils.addAll(tags, "state", oldState.name()) + ); + + meterRegistry.gauge( + MetricRegistry.STREAMS_STATE_COUNT, + 1, + ArrayUtils.addAll(tags, "state", newState.name()) + ); + if (newState == State.RUNNING) { this.hasStarted = true; } diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java index 7fa7d6c627..096bab7b8f 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java @@ -86,14 +86,14 @@ public Executor transform(final String key, final Executor value) { if (workerTaskResult.getTaskRun().getState().isTerninated()) { metricRegistry .counter( - MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT, + MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(workerTaskResult) ) .increment(); metricRegistry .timer( - MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION, + MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(workerTaskResult) ) .record(workerTaskResult.getTaskRun().getState().getDuration()); diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index e26f630004..9f779e4ebc 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -299,11 +299,11 @@ private void workerTaskResultQueue(WorkerTaskResult message) { // send metrics on terminated if (message.getTaskRun().getState().isTerninated()) { metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message)) + .counter(MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message)) .increment(); metricRegistry - .timer(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message)) + .timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message)) .record(message.getTaskRun().getState().getDuration()); } From fc76401d5b5372075db7745a32399bf48f3bbcc1 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Fri, 29 Apr 2022 22:40:06 +0200 Subject: [PATCH 5/9] fix(ui): show tasks logs on topology execution can open 2 side popup close #559 --- ui/src/components/graph/TopologyTree.vue | 1 + ui/src/mixins/VueManual.js | 9 ++++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ui/src/components/graph/TopologyTree.vue b/ui/src/components/graph/TopologyTree.vue index 5a48ac3e9d..9e08971546 100644 --- a/ui/src/components/graph/TopologyTree.vue +++ b/ui/src/components/graph/TopologyTree.vue @@ -59,6 +59,7 @@ cy: undefined, watch: { flowGraph() { + this.destroyedManualComponent() this.generateGraph(); } }, diff --git a/ui/src/mixins/VueManual.js b/ui/src/mixins/VueManual.js index 47cf6c9c9e..263e98b756 100644 --- a/ui/src/mixins/VueManual.js +++ b/ui/src/mixins/VueManual.js @@ -23,11 +23,14 @@ export default { this.manualComponents.push(mount); return mount; + }, + destroyedManualComponent() { + (this.manualComponents || []).forEach(value => { + value.$destroy(); + }); } }, destroyed() { - (this.manualComponents || []).forEach(value => { - value.$destroy(); - }); + this.destroyedManualComponent(); } } From 1bdc9215628c7d49c9064dfaa5fcd53c6de19113 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Tue, 3 May 2022 21:51:07 +0200 Subject: [PATCH 6/9] feat(webserver): generate a proper swagger specs --- cli/src/main/resources/application.yml | 3 + .../java/io/kestra/webserver/Application.java | 12 +- .../webserver/controllers/ApiController.java | 93 ++++++++ .../controllers/ExecutionController.java | 217 +++++++----------- .../webserver/controllers/FlowController.java | 138 ++++++----- .../webserver/controllers/LogController.java | 52 ++--- .../webserver/controllers/MiscController.java | 5 +- .../controllers/PluginController.java | 13 +- .../controllers/RedirectController.java | 2 + .../controllers/StatsController.java | 52 ++--- .../controllers/TemplateController.java | 66 +++--- webserver/src/main/resources/static/logo.svg | 12 + 12 files changed, 352 insertions(+), 313 deletions(-) create mode 100644 webserver/src/main/java/io/kestra/webserver/controllers/ApiController.java create mode 100644 webserver/src/main/resources/static/logo.svg diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 477be73b69..54e02aa9b5 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -9,6 +9,9 @@ micronaut: ui: paths: classpath:ui mapping: /ui/** + static: + paths: classpath:static + mapping: /static/** server: max-request-size: 10GB multipart: diff --git a/webserver/src/main/java/io/kestra/webserver/Application.java b/webserver/src/main/java/io/kestra/webserver/Application.java index c14fe9dc5c..f90ce2ca4c 100644 --- a/webserver/src/main/java/io/kestra/webserver/Application.java +++ b/webserver/src/main/java/io/kestra/webserver/Application.java @@ -4,12 +4,22 @@ import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.info.Info; import io.swagger.v3.oas.annotations.info.License; +import io.swagger.v3.oas.annotations.tags.Tag; @OpenAPIDefinition( info = @Info( title = "Kestra", license = @License(name = "Apache 2.0", url = "https://raw.githubusercontent.com/kestra-io/kestra/master/LICENSE") - ) + ), + tags = { + @Tag(name = "Flows", description = "Flows api"), + @Tag(name = "Templates", description = "Templates api"), + @Tag(name = "Executions", description = "Executions api"), + @Tag(name = "Logs", description = "Logs api"), + @Tag(name = "Plugins", description = "Plugins api"), + @Tag(name = "Stats", description = "Stats api"), + @Tag(name = "Misc", description = "Misc api"), + } ) public class Application { public static void main(String[] args) { diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ApiController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ApiController.java new file mode 100644 index 0000000000..aaec3eacd0 --- /dev/null +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ApiController.java @@ -0,0 +1,93 @@ +package io.kestra.webserver.controllers; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.exceptions.InternalException; +import io.kestra.core.models.SearchResult; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.hierarchies.FlowGraph; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.models.validations.ManualConstraintViolation; +import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.webserver.responses.PagedResults; +import io.kestra.webserver.utils.PageableUtils; +import io.micronaut.context.annotation.Value; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.MediaType; +import io.micronaut.http.annotation.*; +import io.micronaut.http.exceptions.HttpStatusException; +import io.micronaut.scheduling.TaskExecutors; +import io.micronaut.scheduling.annotation.ExecuteOn; +import io.micronaut.validation.Validated; +import io.swagger.v3.oas.annotations.Hidden; +import jakarta.inject.Inject; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import javax.validation.ConstraintViolationException; +import javax.validation.Valid; + +import static io.kestra.core.utils.Rethrow.throwFunction; + +@Validated +@Controller("/api") +public class ApiController { + @Value("${micronaut.server.context-path:}") + protected String basePath; + + protected String getBasePath() { + return basePath.replaceAll("/$",""); + } + + protected String getSwaggerFilename() { + return "kestra.yml"; + } + + @Get() + @Hidden + public HttpResponse rapidoc() { + String doc = "\n" + + "\n" + + "\n" + + " Api | Kestra\n" + + " \n" + + " \n" + + " \n" + + " \n" + + " \n" + + "\n" + + "\n" + + " \n" + + " \"logo\"\n" + + "\n" + + " \n" + + " \n" + + "\n" + + "\n"; + + return HttpResponse + .ok() + .contentType(MediaType.TEXT_HTML_TYPE) + .body(doc); + } + +} diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index 12050441dd..75f6bd991e 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -15,6 +15,11 @@ import io.micronaut.validation.Validated; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; +import io.swagger.v3.oas.annotations.Hidden; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; import org.apache.commons.io.FilenameUtils; import io.kestra.core.events.CrudEvent; import io.kestra.core.events.CrudEventType; @@ -93,12 +98,13 @@ public class ExecutionController { @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/search", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Search for executions") public PagedResults find( - @QueryValue(value = "q") String query, - @QueryValue(value = "page", defaultValue = "1") int page, - @QueryValue(value = "size", defaultValue = "10") int size, - @Nullable @QueryValue(value = "state") List state, - @Nullable @QueryValue(value = "sort") List sort + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query, + @Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page, + @Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size, + @Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List sort, + @Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List state ) { return PagedResults.of( executionRepository @@ -108,12 +114,13 @@ public PagedResults find( @ExecuteOn(TaskExecutors.IO) @Get(uri = "taskruns/search", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Search for taskruns") public PagedResults findTaskRun( - @QueryValue(value = "q") String query, - @QueryValue(value = "page", defaultValue = "1") int page, - @QueryValue(value = "size", defaultValue = "10") int size, + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query, + @Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page, + @Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size, @Nullable @QueryValue(value = "state") List state, - @Nullable @QueryValue(value = "sort") List sort + @Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List sort ) { return PagedResults.of( executionRepository @@ -123,19 +130,17 @@ public PagedResults findTaskRun( @ExecuteOn(TaskExecutors.IO) @Get(uri = "taskruns/maxTaskRunSetting") + @Hidden public Integer maxTaskRunSetting() { return executionRepository.maxTaskRunSetting(); } - /** - * Get an execution flow tree - * - * @param executionId The execution identifier - * @return the flow tree with the provided identifier - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/{executionId}/graph", produces = MediaType.TEXT_JSON) - public FlowGraph flowGraph(String executionId) throws IllegalVariableEvaluationException { + @Operation(tags = {"Executions"}, summary = "Generate a graph for an execution") + public FlowGraph flowGraph( + @Parameter(description = "The execution id") String executionId + ) throws IllegalVariableEvaluationException { return executionRepository .findById(executionId) .map(throwFunction(execution -> { @@ -152,94 +157,63 @@ public FlowGraph flowGraph(String executionId) throws IllegalVariableEvaluationE .orElse(null); } - /** - * Get a execution - * - * @param executionId The execution identifier - * @return the execution with the provided identifier - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/{executionId}", produces = MediaType.TEXT_JSON) - public Execution get(String executionId) { + @Operation(tags = {"Executions"}, summary = "Get an execution") + public Execution get( + @Parameter(description = "The execution id") String executionId + ) { return executionRepository .findById(executionId) .orElse(null); } - /** - * Find and returns all executions for a specific namespace and flow identifier - * - * @param namespace The flow namespace - * @param flowId The flow identifier - * @param page The number of result pages to return - * @param size The number of result by page - * @return a list of found executions - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Search for executions for a flow") public PagedResults findByFlowId( - @QueryValue(value = "namespace") String namespace, - @QueryValue(value = "flowId") String flowId, - @QueryValue(value = "page", defaultValue = "1") int page, - @QueryValue(value = "size", defaultValue = "10") int size) { + @Parameter(description = "The flow namespace") @QueryValue(value = "namespace") String namespace, + @Parameter(description = "The flow id") @QueryValue(value = "flowId") String flowId, + @Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page, + @Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size + ) { return PagedResults.of( executionRepository .findByFlowId(namespace, flowId, Pageable.from(page, size)) ); } - /** - * Trigger a new execution for a webhook trigger - * - * @param namespace The flow namespace - * @param id The flow id - * @param key The webhook trigger uid - * @return execution created - */ @ExecuteOn(TaskExecutors.IO) @Post(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Trigger a new execution by POST webhook trigger") public Execution webhookTriggerPost( - String namespace, - String id, - String key, + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id, + @Parameter(description = "The webhook trigger uid") String key, HttpRequest request ) { return this.webhook(namespace, id, key, request); } - /** - * Trigger a new execution for a webhook trigger - * - * @param namespace The flow namespace - * @param id The flow id - * @param key The webhook trigger uid - * @return execution created - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Trigger a new execution by GET webhook trigger") public Execution webhookTriggerGet( - String namespace, - String id, - String key, + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id, + @Parameter(description = "The webhook trigger uid") String key, HttpRequest request ) { return this.webhook(namespace, id, key, request); } - /** - * Trigger a new execution for a webhook trigger - * - * @param namespace The flow namespace - * @param id The flow id - * @param key The webhook trigger uid - * @return execution created - */ @ExecuteOn(TaskExecutors.IO) @Put(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Trigger a new execution by PUT webhook trigger") public Execution webhookTriggerPut( - String namespace, - String id, - String key, + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id, + @Parameter(description = "The webhook trigger uid") String key, HttpRequest request ) { return this.webhook(namespace, id, key, request); @@ -284,19 +258,13 @@ private Execution webhook( return execution.get(); } - /** - * Trigger a new execution for current flow - * - * @param namespace The flow namespace - * @param id The flow id - * @return execution created - * @throws IllegalStateException if the flow is disabled - */ @ExecuteOn(TaskExecutors.IO) @Post(uri = "executions/trigger/{namespace}/{id}", produces = MediaType.TEXT_JSON, consumes = MediaType.MULTIPART_FORM_DATA) + @Operation(tags = {"Executions"}, summary = "Trigger a new execution for a flow") + @ApiResponse(responseCode = "409", description = "if the flow is disabled") public Execution trigger( - String namespace, - String id, + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id, @Nullable Map inputs, @Nullable Publisher files ) { @@ -353,17 +321,13 @@ protected HttpResponse validateFile(String executionId, URI path, String throw new IllegalArgumentException("Invalid prefix path"); } - /** - * Download file binary from uri parameter - * - * @param path The file URI to return - * @return data binary content - */ + @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/{executionId}/file", produces = MediaType.APPLICATION_OCTET_STREAM) + @Operation(tags = {"Executions"}, summary = "Download file for an execution") public HttpResponse file( - String executionId, - @QueryValue(value = "path") URI path + @Parameter(description = "The execution id") String executionId, + @Parameter(description = "The internal storage uri") @QueryValue(value = "path") URI path ) throws IOException, URISyntaxException { HttpResponse httpResponse = this.validateFile(executionId, path, "/api/v1/executions/{executionId}/file?path=" + path); if (httpResponse != null) { @@ -376,17 +340,12 @@ public HttpResponse file( ); } - /** - * Get file meta information from given path - * - * @param path The file URI to gather metas values - * @return metadata about given file - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/{executionId}/file/metas", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Get file meta information for an execution") public HttpResponse filesize( - String executionId, - @QueryValue(value = "path") URI path + @Parameter(description = "The execution id") String executionId, + @Parameter(description = "The internal storage uri") @QueryValue(value = "path") URI path ) throws IOException { HttpResponse httpResponse =this.validateFile(executionId, path, "/api/v1/executions/{executionId}/file/metas?path=" + path); if (httpResponse != null) { @@ -399,18 +358,13 @@ public HttpResponse filesize( ); } - /** - * Restart a new execution from an old one - * - * @param executionId the origin execution id to clone - * @return the restarted execution - */ @ExecuteOn(TaskExecutors.IO) @Post(uri = "executions/{executionId}/restart", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Restart a new execution from an old one") public Execution restart( - String executionId, - @Nullable @QueryValue(value = "revision") Integer revision - ) throws Exception { + @Parameter(description = "The execution id") String executionId, + @Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue(value = "revision") Integer revision + ) throws Exception { Optional execution = executionRepository.findById(executionId); if (execution.isEmpty()) { return null; @@ -425,19 +379,13 @@ public Execution restart( return restart; } - /** - * Create a new execution from an old one and start it from a specified task run id - * - * @param executionId the origin execution id to clone - * @param taskRunId the reference taskRun id - * @return the restarted execution - */ @ExecuteOn(TaskExecutors.IO) @Post(uri = "executions/{executionId}/replay", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Create a new execution from an old one and start it from a specified task run id") public Execution replay( - String executionId, - @Nullable @QueryValue(value = "taskRunId") String taskRunId, - @Nullable @QueryValue(value = "revision") Integer revision + @Parameter(description = "the original execution id to clone") String executionId, + @Parameter(description = "The taskrun id") @Nullable @QueryValue(value = "taskRunId") String taskRunId, + @Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue(value = "revision") Integer revision ) throws Exception { Optional execution = executionRepository.findById(executionId); if (execution.isEmpty()) { @@ -469,16 +417,13 @@ private void controlRevision(Execution execution, Integer revision) { } } - /** - * Create a new execution from an old one and start it from a specified task run id - * - * @param executionId the origin execution id to clone - * @param stateRequest the taskRun id & state to apply - * @return the restarted execution - */ @ExecuteOn(TaskExecutors.IO) @Post(uri = "executions/{executionId}/state", produces = MediaType.TEXT_JSON) - public Execution changeState(String executionId, @Body StateRequest stateRequest) throws Exception { + @Operation(tags = {"Executions"}, summary = "Change state for a taskrun in an execution") + public Execution changeState( + @Parameter(description = "The execution id") String executionId, + @Parameter(description = "the taskRun id and state to apply") @Body StateRequest stateRequest + ) throws Exception { Optional execution = executionRepository.findById(executionId); if (execution.isEmpty()) { return null; @@ -497,15 +442,18 @@ public static class StateRequest { State.Type state; } - /** - * Kill an execution and stop all works - * - * @param executionId the execution id to kill - * @throws IllegalStateException if the executions is already finished - */ @ExecuteOn(TaskExecutors.IO) @Delete(uri = "executions/{executionId}/kill", produces = MediaType.TEXT_JSON) - public HttpResponse kill(String executionId) { + @Operation(tags = {"Executions"}, summary = "Kill an execution") + @ApiResponses( + value = { + @ApiResponse(responseCode = "204", description = "On success"), + @ApiResponse(responseCode = "409", description = "if the executions is already finished") + } + ) + public HttpResponse kill( + @Parameter(description = "The execution id") String executionId + ) { Optional execution = executionRepository.findById(executionId); if (execution.isPresent() && execution.get().getState().isTerninated()) { throw new IllegalStateException("Execution is already finished, can't kill it"); @@ -525,15 +473,12 @@ private boolean isStopFollow(Flow flow, Execution execution) { execution.getState().getCurrent() != State.Type.PAUSED; } - /** - * Trigger a new execution for current flow and follow execution - * - * @param executionId The execution id to follow - * @return execution sse event - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM) - public Flowable> follow(String executionId) { + @Operation(tags = {"Executions"}, summary = "Follow an execution") + public Flowable> follow( + @Parameter(description = "The execution id") String executionId + ) { AtomicReference cancel = new AtomicReference<>(); return Flowable diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java b/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java index f459499707..22cb2db628 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java @@ -25,6 +25,10 @@ import java.util.Set; import java.util.stream.Collectors; import io.micronaut.core.annotation.Nullable; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; import jakarta.inject.Inject; import javax.validation.ConstraintViolationException; import javax.validation.Valid; @@ -37,85 +41,72 @@ public class FlowController { @Inject private FlowRepositoryInterface flowRepository; - /** - * @param namespace The flow namespace - * @param id The flow id - * @return flow tree found - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "{namespace}/{id}/graph", produces = MediaType.TEXT_JSON) - public FlowGraph flowGraph(String namespace, String id, Optional revision) throws IllegalVariableEvaluationException { + @Operation(tags = {"Flows"}, summary = "Generate a graph for a flow") + public FlowGraph flowGraph( + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id, + @Parameter(description = "The flow revision") Optional revision + ) throws IllegalVariableEvaluationException { return flowRepository .findById(namespace, id, revision) .map(throwFunction(FlowGraph::of)) .orElse(null); } - /** - * @param namespace The flow namespace - * @param id The flow id - * @return flow found - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON) - public Flow index(String namespace, String id) { + @Operation(tags = {"Flows"}, summary = "Get a flow") + public Flow index( + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id + ) { return flowRepository .findById(namespace, id) .orElse(null); } - /** - * @param namespace The flow namespace - * @param id The flow id - * @return flow revisions found - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "{namespace}/{id}/revisions", produces = MediaType.TEXT_JSON) - public List revisions(String namespace, String id) { + @Operation(tags = {"Flows"}, summary = "Get revisions for a flow") + public List revisions( + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id + ) { return flowRepository.findRevisions(namespace, id); } - /** - * @param query The flow query that is a lucene string - * @param page Page in flow pagination - * @param size Element count in pagination selection - * @return flow list - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "/search", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Flows"}, summary = "Search for flows") public PagedResults find( - @QueryValue(value = "q") String query, //Search by namespace using lucene - @QueryValue(value = "page", defaultValue = "1") int page, - @QueryValue(value = "size", defaultValue = "10") int size, - @Nullable @QueryValue(value = "sort") List sort + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query, + @Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page, + @Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size, + @Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List sort ) throws HttpStatusException { return PagedResults.of(flowRepository.find(query, PageableUtils.from(page, size, sort))); } - /** - * @param query The flow query that is a lucene string - * @param page Page in flow pagination - * @param size Element count in pagination selection - * @return flow search list - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "/source", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Flows"}, summary = "Search for flows source code") public PagedResults> source( - @QueryValue(value = "q") String query, //Search by namespace using lucene - @QueryValue(value = "page", defaultValue = "1") int page, - @QueryValue(value = "size", defaultValue = "10") int size, - @Nullable @QueryValue(value = "sort") List sort + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query, + @Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page, + @Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size, + @Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List sort ) throws HttpStatusException { return PagedResults.of(flowRepository.findSourceCode(query, PageableUtils.from(page, size, sort))); } - /** - * @param flow The flow content - * @return flow created - */ @ExecuteOn(TaskExecutors.IO) @Post(produces = MediaType.TEXT_JSON) - public HttpResponse create(@Body @Valid Flow flow) throws ConstraintViolationException { + @Operation(tags = {"Flows"}, summary = "Create a flow") + public HttpResponse create( + @Parameter(description = "The flow") @Body @Valid Flow flow + ) throws ConstraintViolationException { if (flowRepository.findById(flow.getNamespace(), flow.getId()).isPresent()) { throw new ConstraintViolationException(Collections.singleton(ManualConstraintViolation.of( "Flow id already exists", @@ -129,15 +120,18 @@ public HttpResponse create(@Body @Valid Flow flow) throws ConstraintViolat return HttpResponse.ok(flowRepository.create(flow)); } - /** - * @param namespace The namespace to update - * @param flows The flows content, all flow will be created / updated for this namespace. - * Flow in repository but not in {@code flows} will also be deleted - * @return flows created or updated - */ @ExecuteOn(TaskExecutors.IO) @Post(uri = "{namespace}", produces = MediaType.TEXT_JSON) - public List updateNamespace(String namespace, @Body @Valid List flows) throws ConstraintViolationException { + @Operation( + tags = {"Flows"}, + summary = "Update a complete namespace", + description = "All flow will be created / updated for this namespace.\n" + + "Flow that already created but not in `flows` will be deleted" + ) + public List updateNamespace( + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "A list of flows") @Body @Valid List flows + ) throws ConstraintViolationException { // control namespace to update Set> invalids = flows .stream() @@ -199,14 +193,14 @@ public List updateNamespace(String namespace, @Body @Valid List flo .collect(Collectors.toList()); } - /** - * @param namespace flow namespace - * @param id flow id to update - * @return flow updated - */ @Put(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON) @ExecuteOn(TaskExecutors.IO) - public HttpResponse update(String namespace, String id, @Body @Valid Flow flow) throws ConstraintViolationException { + @Operation(tags = {"Flows"}, summary = "Update a flow") + public HttpResponse update( + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id, + @Parameter(description = "The flow") @Body @Valid Flow flow + ) throws ConstraintViolationException { Optional existingFlow = flowRepository.findById(namespace, id); if (existingFlow.isEmpty()) { @@ -216,15 +210,15 @@ public HttpResponse update(String namespace, String id, @Body @Valid Flow return HttpResponse.ok(flowRepository.update(flow, existingFlow.get())); } - /** - * @param namespace flow namespace - * @param id flow id to update - * @param taskId taskId id to update - * @return flow updated - */ @Patch(uri = "{namespace}/{id}/{taskId}", produces = MediaType.TEXT_JSON) @ExecuteOn(TaskExecutors.IO) - public HttpResponse updateTask(String namespace, String id, String taskId, @Valid @Body Task task) throws ConstraintViolationException { + @Operation(tags = {"Flows"}, summary = "Update a single task on a flow") + public HttpResponse updateTask( + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id, + @Parameter(description = "The task id") String taskId, + @Parameter(description = "The task") @Valid @Body Task task + ) throws ConstraintViolationException { Optional existingFlow = flowRepository.findById(namespace, id); if (existingFlow.isEmpty()) { @@ -244,14 +238,16 @@ public HttpResponse updateTask(String namespace, String id, String taskId, } } - /** - * @param namespace flow namespace - * @param id flow id to delete - * @return Http 204 on delete or Http 404 when not found - */ @Delete(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON) @ExecuteOn(TaskExecutors.IO) - public HttpResponse delete(String namespace, String id) { + @Operation(tags = {"Flows"}, summary = "Delete a flow") + @ApiResponses( + @ApiResponse(responseCode = "204", description = "On success") + ) + public HttpResponse delete( + @Parameter(description = "The flow namespace") String namespace, + @Parameter(description = "The flow id") String id + ) { Optional flow = flowRepository.findById(namespace, id); if (flow.isPresent()) { flowRepository.delete(flow.get()); @@ -261,11 +257,9 @@ public HttpResponse delete(String namespace, String id) { } } - /** - * @return The flow's namespaces set - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "distinct-namespaces", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Flows"}, summary = "List all distinct namespaces") public List listDistinctNamespace() { return flowRepository.findDistinctNamespace(); } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java b/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java index 863e3cc5e3..fcd1e26e7b 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/LogController.java @@ -18,6 +18,8 @@ import io.kestra.core.repositories.LogRepositoryInterface; import io.kestra.webserver.responses.PagedResults; import io.kestra.webserver.utils.PageableUtils; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import org.slf4j.event.Level; import java.util.List; @@ -37,43 +39,29 @@ public class LogController { @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) protected QueueInterface logQueue; - /** - * Search for logs - * - * @param query The lucene query - * @param page The current page - * @param size The current page size - * @param sort The sort of current page - * @return Paged log result - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "logs/search", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Logs"}, summary = "Search for logs") public PagedResults find( - @QueryValue(value = "q") String query, - @QueryValue(value = "page", defaultValue = "1") int page, - @QueryValue(value = "size", defaultValue = "10") int size, - @Nullable @QueryValue(value = "minLevel") Level minLevel, - @Nullable @QueryValue(value = "sort") List sort - ) { + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query, + @Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page, + @Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size, + @Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List sort, + @Parameter(description = "The min log level filter") @Nullable @QueryValue(value = "minLevel") Level minLevel + ) { return PagedResults.of( logRepository.find(query, PageableUtils.from(page, size, sort), minLevel) ); } - /** - * Get execution log - * - * @param executionId The execution identifier - - * @return Paged log result - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "logs/{executionId}", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Logs"}, summary = "Get logs for a specific execution") public List findByExecution( - String executionId, - @Nullable @QueryValue(value = "minLevel") Level minLevel, - @Nullable @QueryValue(value = "taskRunId") String taskRunId, - @Nullable @QueryValue(value = "taskId") String taskId + @Parameter(description = "The execution id") String executionId, + @Parameter(description = "The min log level filter") @Nullable @QueryValue(value = "minLevel") Level minLevel, + @Parameter(description = "The taskrun id") @Nullable @QueryValue(value = "taskRunId") String taskRunId, + @Parameter(description = "The task id") @Nullable @QueryValue(value = "taskId") String taskId ) { if (taskId != null) { return logRepository.findByExecutionIdAndTaskId(executionId, taskId, minLevel); @@ -84,15 +72,13 @@ public List findByExecution( } } - /** - * Follow log for a specific execution - * - * @param executionId The execution id to follow - * @return execution log sse event - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "logs/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM) - public Flowable> follow(String executionId, @Nullable @QueryValue(value = "minLevel") Level minLevel) { + @Operation(tags = {"Logs"}, summary = "Follow log for a specific execution") + public Flowable> follow( + @Parameter(description = "The execution id") String executionId, + @Parameter(description = "The min log level filter") @Nullable @QueryValue(value = "minLevel") Level minLevel + ) { AtomicReference cancel = new AtomicReference<>(); List levels = LogEntry.findLevelsByMin(minLevel); diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/MiscController.java b/webserver/src/main/java/io/kestra/webserver/controllers/MiscController.java index f479e618d1..594a86951f 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/MiscController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/MiscController.java @@ -5,6 +5,8 @@ import io.micronaut.http.annotation.Get; import io.micronaut.scheduling.TaskExecutors; import io.micronaut.scheduling.annotation.ExecuteOn; +import io.swagger.v3.oas.annotations.Hidden; +import io.swagger.v3.oas.annotations.Operation; import lombok.Value; import lombok.extern.slf4j.Slf4j; import io.kestra.core.utils.VersionProvider; @@ -18,13 +20,14 @@ public class MiscController { VersionProvider versionProvider; @Get("/ping") + @Hidden public HttpResponse ping() { return HttpResponse.ok("pong"); } - @Get("/api/v1/version") @ExecuteOn(TaskExecutors.IO) + @Operation(tags = {"Misc"}, summary = "Get current version") public Version version() { return new Version(versionProvider.getVersion()); } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/PluginController.java b/webserver/src/main/java/io/kestra/webserver/controllers/PluginController.java index fd5dd3da42..a94dd1ebf4 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/PluginController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/PluginController.java @@ -12,6 +12,8 @@ import io.micronaut.scheduling.TaskExecutors; import io.micronaut.scheduling.annotation.ExecuteOn; import io.micronaut.validation.Validated; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -33,7 +35,8 @@ public class PluginController { @Get @ExecuteOn(TaskExecutors.IO) - public List search() throws HttpStatusException { + @Operation(tags = {"Plugins"}, summary = "Get list of plugins") + public List search() { return pluginService .allPlugins() .stream() @@ -42,7 +45,8 @@ public List search() throws HttpStatusException { } @Get(uri = "icons") - public Map icons() throws HttpStatusException { + @Operation(tags = {"Plugins"}, summary = "Get plugins icons") + public Map icons() { return pluginService .allPlugins() .stream() @@ -69,7 +73,10 @@ public Map icons() throws HttpStatusException { @SuppressWarnings({"rawtypes", "unchecked"}) @Get(uri = "{cls}") @ExecuteOn(TaskExecutors.IO) - public Doc pluginDocumentation(String cls) throws HttpStatusException, IOException { + @Operation(tags = {"Plugins"}, summary = "Get plugin documentation") + public Doc pluginDocumentation( + @Parameter(description = "The plugin full class name") String cls + ) throws IOException { ClassPluginDocumentation classPluginDocumentation = pluginDocumentation( pluginService.allPlugins(), cls diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/RedirectController.java b/webserver/src/main/java/io/kestra/webserver/controllers/RedirectController.java index 274b88893a..67041852fb 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/RedirectController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/RedirectController.java @@ -3,6 +3,7 @@ import io.micronaut.http.HttpResponse; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Get; +import io.swagger.v3.oas.annotations.Hidden; import lombok.extern.slf4j.Slf4j; import java.net.URI; @@ -11,6 +12,7 @@ @Controller public class RedirectController { @Get + @Hidden public HttpResponse slash() { return HttpResponse.temporaryRedirect(URI.create("/ui/")); } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/StatsController.java b/webserver/src/main/java/io/kestra/webserver/controllers/StatsController.java index bb862a3b3b..e8349ea208 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/StatsController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/StatsController.java @@ -4,6 +4,7 @@ import io.micronaut.http.MediaType; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Post; +import io.micronaut.http.annotation.QueryValue; import io.micronaut.scheduling.TaskExecutors; import io.micronaut.scheduling.annotation.ExecuteOn; import io.micronaut.validation.Validated; @@ -15,6 +16,8 @@ import java.util.List; import java.util.Map; import io.micronaut.core.annotation.Nullable; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import jakarta.inject.Inject; @Validated @@ -23,20 +26,14 @@ public class StatsController { @Inject protected ExecutionRepositoryInterface executionRepository; - /** - * Return daily statistics for all executions filter optionnaly by a lucene query - * - * @param q Lucene string to filter execution - * @param startDate default to now - 30 days - * @param endDate default to now - * @return a list of DailyExecutionStatistics - */ + @ExecuteOn(TaskExecutors.IO) @Post(uri = "executions/daily", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Stats"}, summary = "Get daily statistics for executions") public List dailyStatistics( - @Nullable String q, - @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate, - @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String q, + @Parameter(description = "The start datetime, default to now - 30 days") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate, + @Parameter(description = "The end datetime, default to now") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate ) { // @TODO: seems to be converted back to utc by micronaut return executionRepository.dailyStatistics( @@ -47,20 +44,14 @@ public List dailyStatistics( ); } - /** - * Return daily statistics for all taskRuns filter optionnaly by a lucene query - * - * @param q Lucene string to filter execution - * @param startDate default to now - 30 days - * @param endDate default to now - * @return a list of DailyExecutionStatistics - */ + @ExecuteOn(TaskExecutors.IO) @Post(uri = "taskruns/daily", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Stats"}, summary = "Get daily statistics for taskRuns") public List taskRunsDailyStatistics( - @Nullable String q, - @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate, - @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String q, + @Parameter(description = "The start datetime, default to now - 30 days") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate, + @Parameter(description = "The end datetime, default to now") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate ) { return executionRepository.dailyStatistics( q, @@ -70,21 +61,14 @@ public List taskRunsDailyStatistics( ); } - /** - * Return daily statistics for all executions filter optionnaly by a lucene query group by namespace & flow - * - * @param q Lucene string to filter execution - * @param startDate default to now - 30 days - * @param endDate default to now - * @return map of namespace, containing a Map of flow, DailyExecutionStatistics - */ @ExecuteOn(TaskExecutors.IO) @Post(uri = "executions/daily/group-by-flow", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Stats"}, summary = "Get daily statistics for executions group by namespaces and flows") public Map>> dailyGroupByFlowStatistics( - @Nullable String q, - @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate, - @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate, - @Nullable Boolean namespaceOnly + @Parameter(description = "Lucene string filter") @QueryValue(value = "q") String q, + @Parameter(description = "The start datetime, default to now - 30 days") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate, + @Parameter(description = "The end datetime, default to now") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate, + @Parameter(description = "Return only namespace result and skip flows") @Nullable Boolean namespaceOnly ) { return executionRepository.dailyGroupByFlowStatistics( diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java b/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java index 53c8c1e032..8ca7b14963 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java @@ -18,6 +18,10 @@ import java.util.List; import java.util.Optional; import io.micronaut.core.annotation.Nullable; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; import jakarta.inject.Inject; import javax.validation.ConstraintViolationException; import javax.validation.Valid; @@ -28,42 +32,36 @@ public class TemplateController { @Inject private TemplateRepositoryInterface templateRepository; - /** - * @param id The template id - * @return template found - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON) - public Template index(String namespace, String id) { + @Operation(tags = {"Template"}, summary = "Get a template") + public Template index( + @Parameter(description = "The template namespace") String namespace, + @Parameter(description = "The template id") String id + ) { return templateRepository .findById(namespace, id) .orElse(null); } - /** - * @param query The template query that is a lucen string - * @param page Page in template pagination - * @param size Element count in pagination selection - * @return template list - */ @ExecuteOn(TaskExecutors.IO) @Get(uri = "/search", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Template"}, summary = "Search for templates") public PagedResults