Skip to content

Commit

Permalink
feat(webserver): allow restarting a trigger
Browse files Browse the repository at this point in the history
This will kill it then restart it
  • Loading branch information
loicmathieu committed Jul 11, 2024
1 parent 0990855 commit 9e04169
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 3 deletions.
33 changes: 33 additions & 0 deletions ui/src/components/admin/Triggers.vue
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@
</el-button>
</template>
</el-table-column>
<el-table-column
v-if="user.hasAnyAction(permission.EXECUTION, action.UPDATE)"
column-key="restart"
class-name="row-action"
>
<template #default="scope">
<el-button size="small" v-if=" scope.row.evaluateRunningDate">
<kicon
:tooltip="$t(`restart trigger.tooltip`)"
placement="left"
@click="restart(scope.row)"
>
<Restart />
</kicon>
</el-button>
</template>
</el-table-column>

<el-table-column :label="$t('backfill')" column-key="backfill">
<template #default="scope">
Expand Down Expand Up @@ -231,6 +248,7 @@
import AlertCircle from "vue-material-design-icons/AlertCircle.vue";
import SelectTable from "../layout/SelectTable.vue";
import BulkSelect from "../layout/BulkSelect.vue";
import Restart from "vue-material-design-icons/Restart.vue";
</script>
<script>
import NamespaceSelect from "../namespace/NamespaceSelect.vue";
Expand Down Expand Up @@ -314,6 +332,21 @@
this.triggerToUnlock = undefined;
},
restart(trigger) {
this.$store.dispatch("trigger/restart", {
namespace: trigger.namespace,
flowId: trigger.flowId,
triggerId: trigger.triggerId
}).then(newTrigger => {
this.$toast().saved(newTrigger.id);
this.triggers = this.triggers.map(t => {
if (t.id === newTrigger.id) {
return newTrigger
}
return t
})
})
},
setDisabled(trigger, value) {
if (trigger.codeDisabled) {
this.$message({
Expand Down
26 changes: 26 additions & 0 deletions ui/src/components/flows/FlowTriggers.vue
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@
</template>
</el-table-column>

<el-table-column column-key="restart" class-name="row-action" v-if="userCan(action.UPDATE)">
<template #default="scope">
<el-button size="small" v-if="scope.row.evaluateRunningDate" @click="restart(scope.row)">
<kicon :tooltip="$t('restart trigger.button')">
<Restart />
</kicon>
</el-button>
</template>
</el-table-column>

<el-table-column column-key="unlock" class-name="row-action" v-if="userCan(action.UPDATE)">
<template #default="scope">
<el-button size="small" v-if="scope.row.executionId" @click="unlock(scope.row)">
Expand Down Expand Up @@ -188,6 +198,7 @@
import Delete from "vue-material-design-icons/Delete.vue";
import LockOff from "vue-material-design-icons/LockOff.vue";
import Check from "vue-material-design-icons/Check.vue";
import Restart from "vue-material-design-icons/Restart.vue";
import CalendarCollapseHorizontalOutline from "vue-material-design-icons/CalendarCollapseHorizontalOutline.vue"
import FlowRun from "./FlowRun.vue";
import RefreshButton from "../layout/RefreshButton.vue";
Expand Down Expand Up @@ -384,6 +395,21 @@
})
})
},
restart(trigger) {
this.$store.dispatch("trigger/restart", {
namespace: trigger.namespace,
flowId: trigger.flowId,
triggerId: trigger.triggerId
}).then(newTrigger => {
this.$toast().saved(newTrigger.id);
this.triggers = this.triggers.map(t => {
if (t.id === newTrigger.id) {
return newTrigger
}
return t
})
})
},
backfillProgression(backfill) {
const startMoment = moment(backfill.start);
const endMoment = moment(backfill.end);
Expand Down
3 changes: 3 additions & 0 deletions ui/src/stores/trigger.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ export default {
async unlock({_commit}, options) {
return (await this.$http.post(`${apiUrl(this)}/triggers/${options.namespace}/${options.flowId}/${options.triggerId}/unlock`)).data;
},
async restart({_commit}, options) {
return (await this.$http.post(`${apiUrl(this)}/triggers/${options.namespace}/${options.flowId}/${options.triggerId}/restart`)).data;
},
find({_commit}, options) {
return this.$http.get(`${apiUrl(this)}/triggers/${options.namespace}/${options.flowId}`, {params: options}).then(response => {
return response.data;
Expand Down
7 changes: 5 additions & 2 deletions ui/src/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@
"button": "Unlock trigger",
"success": "Trigger is unlocked"
},
"restart trigger": {
"button": "Restart trigger"
},
"date format": "Date format",
"timezone": "Timezone",
"add task": "Add a task",
Expand Down Expand Up @@ -849,8 +852,8 @@
},
"no_namespaces": "Zero namespaces match the search criteria.",
"plugin defaults": "Plugin defaults",
"secret": {
"names": "Secrets",
"secret": {
"names": "Secrets",
"inherited": "Inherited secrets"
},
"workerId": "Worker Id",
Expand Down
3 changes: 3 additions & 0 deletions ui/src/translations/fr.json
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@
"button": "Débloquer le déclencheur",
"success": "Le déclencheur est débloqué"
},
"restart trigger": {
"button": "Redémarrer trigger"
},
"date format": "Format de date",
"timezone": "Fuseau horaire",
"add task": "Ajouter une tâche",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kestra.webserver.controllers.api;

import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledTrigger;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.*;
import io.kestra.core.queues.QueueInterface;
Expand Down Expand Up @@ -44,6 +46,9 @@ public class TriggerController {
@Inject
private QueueInterface<Trigger> triggerQueue;

@Inject
private QueueInterface<ExecutionKilled> executionKilledQueue;

@Inject
private FlowRepositoryInterface flowRepository;

Expand Down Expand Up @@ -236,7 +241,7 @@ public HttpResponse<Trigger> update(
if (abstractTrigger instanceof RealtimeTriggerInterface && !newTrigger.getDisabled().equals(current.getDisabled())) {
throw new IllegalArgumentException("Realtime triggers can not be disabled through the API, please edit the trigger from the flow.");
}
Trigger updated = null;
Trigger updated;
ZonedDateTime nextExecutionDate = null;
try {
RunContext runContext = runContextFactory.of(maybeFlow.get(), abstractTrigger);
Expand All @@ -263,6 +268,56 @@ public HttpResponse<Trigger> update(
return HttpResponse.ok(updatedTrigger);
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/{namespace}/{flowId}/{triggerId}/restart")
@Operation(tags = {"Triggers"}, summary = "Restart a trigger")
public HttpResponse<?> restart(
@Parameter(description = "The namespace") @PathVariable String namespace,
@Parameter(description = "The flow id") @PathVariable String flowId,
@Parameter(description = "The trigger id") @PathVariable String triggerId
) throws HttpStatusException {
Optional<Trigger> triggerOpt = triggerRepository.findLast(TriggerContext.builder()
.tenantId(tenantService.resolveTenant())
.namespace(namespace)
.flowId(flowId)
.triggerId(triggerId)
.build());

if (triggerOpt.isEmpty()) {
return HttpResponse.notFound();
}

var trigger = triggerOpt.get().toBuilder()
.workerId(null)
.evaluateRunningDate(null)
.date(null)
.build();

this.executionKilledQueue.emit(ExecutionKilledTrigger
.builder()
.tenantId(trigger.getTenantId())
.namespace(trigger.getNamespace())
.flowId(trigger.getFlowId())
.triggerId(trigger.getTriggerId())
.build()
);

// this will make the trigger restarting
// be careful that, as everything is asynchronous, it can be restarted before it is killed
this.triggerQueue.emit(trigger);

return HttpResponse.ok(trigger);
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/restart")
@Operation(tags = {"Triggers"}, summary = "Restart a trigger")
public void restart(
@Parameter(description = "The trigger") @Body final Trigger trigger
) throws HttpStatusException {

}

@ExecuteOn(TaskExecutors.IO)
@Put(uri = "/backfill/pause")
@Operation(tags = {"Triggers"}, summary = "Pause a backfill")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.kestra.webserver.responses.PagedResults;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
Expand Down Expand Up @@ -172,6 +173,27 @@ void updated() {
assertThat(afterUpdated.getDisabled(), is(false));
}

@Test
void restart() {
Flow flow = generateFlow("flow-with-triggers");
jdbcFlowRepository.create(flow, flow.generateSource(), flow);

Trigger trigger = Trigger.builder()
.flowId(flow.getId())
.namespace(flow.getNamespace())
.triggerId("trigger-to-restart")
.executionId(IdUtils.create())
.disabled(true)
.build();

jdbcTriggerRepository.create(trigger);

HttpResponse<?> restarted = client.toBlocking().exchange(HttpRequest.POST(("/api/v1/triggers/io.kestra.tests.schedule/flow-with-triggers/trigger-to-restart/restart"), null));
assertThat(restarted.getStatus(), is(HttpStatus.OK));

assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST(("/api/v1/triggers/notfound/notfound/notfound/restart"), null)));
}

@Test
void unlockByTriggers() {
Trigger triggerLock = Trigger.builder()
Expand Down

0 comments on commit 9e04169

Please sign in to comment.