Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 20, 2023
1 parent 3fc8562 commit 2c0d535
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 66 deletions.
9 changes: 4 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ RUN apt-get update -y && \
if [ -n "${APT_PACKAGES}" ]; then apt-get install -y --no-install-recommends ${APT_PACKAGES}; fi && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /var/tmp/* /tmp/* && \
if [ -n "${KESTRA_PLUGINS}" ]; then /app/kestra plugins install ${KESTRA_PLUGINS} && rm -rf /tmp/*; fi

RUN groupadd kestra && \
useradd -m -g kestra kestra && \
chown -R kestra:kestra /app
if [ -n "${KESTRA_PLUGINS}" ]; then /app/kestra plugins install ${KESTRA_PLUGINS} && rm -rf /tmp/*; fi && \
groupadd kestra && \
useradd -m -g kestra kestra && \
chown -R kestra:kestra /app

USER kestra

Expand Down
2 changes: 2 additions & 0 deletions ui/src/utils/toast.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export default {
title: title || self.$t("error"),
message: this._wrap(message),
type: "error",
duration: 0,
customClass: "large"
},
...(options || {})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import lombok.*;
Expand Down Expand Up @@ -278,9 +280,12 @@ public HttpResponse<Void> delete(
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete a list of executions")
@ApiResponses(
@ApiResponse(responseCode = "204", description = "On success")
value = {
@ApiResponse(responseCode = "200", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}),
@ApiResponse(responseCode = "422", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
}
)
public MutableHttpResponse<?> bulkDelete(
public MutableHttpResponse<?> deleteByIds(
@Parameter(description = "The execution id") @Body List<String> executionsId
) {
List<Execution> executions = new ArrayList<>();
Expand Down Expand Up @@ -309,36 +314,39 @@ public MutableHttpResponse<?> bulkDelete(
.build()
);
}
for (Execution execution : executions) {
executionRepository.delete(execution);
}

executions
.forEach(execution -> executionRepository.delete(execution));

return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}

@Delete(uri = "executions/query", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete executions returned by the query")
public HttpResponse<BulkResponse> queryDelete(
@Operation(tags = {"Executions"}, summary = "Delete executions filter by query parameters")
public HttpResponse<BulkResponse> deleteByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
Flowable<Execution> executions = executionRepository.find(
query,
namespace,
flowId,
startDate,
endDate,
state
);
Integer count = executions.map(e -> {
Integer count = executionRepository
.find(
query,
namespace,
flowId,
startDate,
endDate,
state
)
.map(e -> {
executionRepository.delete(e);
return 1;
}
).reduce(Integer::sum).blockingGet();
})
.reduce(Integer::sum)
.blockingGet();

return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
Expand Down Expand Up @@ -591,14 +599,21 @@ public Execution restart(
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/restart/by-ids", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Restart a list of executions")
public MutableHttpResponse<?> bulkRestart(
@ApiResponses(
value = {
@ApiResponse(responseCode = "200", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}),
@ApiResponse(responseCode = "422", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
}
)
public MutableHttpResponse<?> restartByIds(
@Parameter(description = "The execution id") @Body List<String> executionsId
) throws Exception {
List<Execution> executions = new ArrayList<>();
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();

for (String executionId : executionsId) {
Optional<Execution> execution = executionRepository.findById(executionId);

if (execution.isPresent() && !execution.get().getState().isFailed()) {
invalids.add(ManualConstraintViolation.of(
"execution not in state FAILED",
Expand Down Expand Up @@ -639,30 +654,32 @@ public MutableHttpResponse<?> bulkRestart(

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/restart/query", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Restart executions returned by the query")
public HttpResponse<BulkResponse> queryRestart(
@Operation(tags = {"Executions"}, summary = "Restart executions filter by query parameters")
public HttpResponse<BulkResponse> restartByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
Flowable<Execution> executions = executionRepository.find(
query,
namespace,
flowId,
startDate,
endDate,
state
);
Integer count = executions.map(e -> {
Integer count = executionRepository
.find(
query,
namespace,
flowId,
startDate,
endDate,
state
)
.map(e -> {
Execution restart = executionService.restart(e, null);
executionQueue.emit(restart);
eventPublisher.publishEvent(new CrudEvent<>(restart, CrudEventType.UPDATE));
return 1;
}
).reduce(Integer::sum).blockingGet();
})
.reduce(Integer::sum)
.blockingGet();

return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
Expand All @@ -673,8 +690,7 @@ public HttpResponse<BulkResponse> queryRestart(
public Execution replay(
@Parameter(description = "the original execution id to clone") String executionId,
@Parameter(description = "The taskrun id") @Nullable @QueryValue(value = "taskRunId") String taskRunId,
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue(value = "revision") Integer
revision
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue(value = "revision") Integer revision
) throws Exception {
Optional<Execution> execution = executionRepository.findById(executionId);
if (execution.isEmpty()) {
Expand Down Expand Up @@ -762,11 +778,11 @@ public HttpResponse<?> kill(
@Operation(tags = {"Executions"}, summary = "Kill a list of executions")
@ApiResponses(
value = {
@ApiResponse(responseCode = "204", description = "On success"),
@ApiResponse(responseCode = "409", description = "if the executions is already finished")
@ApiResponse(responseCode = "200", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}),
@ApiResponse(responseCode = "422", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
}
)
public MutableHttpResponse<?> bulkKill(
public MutableHttpResponse<?> killByIds(
@Parameter(description = "The execution id") @Body List<String> executionsId
) {
List<Execution> executions = new ArrayList<>();
Expand All @@ -776,15 +792,15 @@ public MutableHttpResponse<?> bulkKill(
Optional<Execution> execution = executionRepository.findById(executionId);
if (execution.isPresent() && execution.get().getState().isTerninated()) {
invalids.add(ManualConstraintViolation.of(
"execution already finished",
"Execution already finished",
executionId,
String.class,
"execution",
executionId
));
} else if (execution.isEmpty()) {
invalids.add(ManualConstraintViolation.of(
"execution not found",
"Execution not found",
executionId,
String.class,
"execution",
Expand All @@ -794,57 +810,63 @@ public MutableHttpResponse<?> bulkKill(
executions.add(execution.get());
}
}

if (invalids.size() > 0) {
return HttpResponse.unprocessableEntity()
.body(BulkErrorResponse
.builder()
.message("invalid bulk kill")
.message("Invalid bulk kill")
.invalids(invalids)
.build()
);
}
for (Execution execution : executions) {

executions.forEach(execution -> {
killQueue.emit(ExecutionKilled
.builder()
.executionId(execution.getId())
.build()
);
}
});

return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}

@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "executions/kill/query", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Kill executions returned by the query")
public HttpResponse<BulkResponse> queryKill(
@Operation(tags = {"Executions"}, summary = "Kill executions filter by query parameters")
public HttpResponse<BulkResponse> killByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
Flowable<Execution> executions = executionRepository.find(
query,
namespace,
flowId,
startDate,
endDate,
state
);
Integer count = executions.map(e -> {
if (!e.getState().isRunning()) {
throw new IllegalStateException("Execution must be running to be killed, " +
"current state is '" + e.getState().getCurrent() + "' !"
);
}
killQueue.emit(ExecutionKilled
Integer count = executionRepository
.find(
query,
namespace,
flowId,
startDate,
endDate,
state
)
.map(e -> {
if (!e.getState().isRunning()) {
throw new IllegalStateException("Execution must be running to be killed, " +
"current state is '" + e.getState().getCurrent() + "' !"
);
}

killQueue.emit(ExecutionKilled
.builder()
.executionId(e.getId())
.build());
return 1;
}
).reduce(Integer::sum).blockingGet();
})
.reduce(Integer::sum)
.blockingGet();

return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
Expand Down

0 comments on commit 2c0d535

Please sign in to comment.