Skip to content

Commit

Permalink
feat(core): flows validation (#871)
Browse files Browse the repository at this point in the history
Flow are now also saved as String to keep source code 
All properties are validated, the invalid properties will be refused 
Task default will be also injected during validation process  

close #160
close #357
close #388
close #748

Co-authored-by: Ludovic DEHON <tchiot.ludo@gmail.com>
  • Loading branch information
Skraye and tchiotludo authored Jan 31, 2023
1 parent ec20485 commit 8340a4e
Show file tree
Hide file tree
Showing 60 changed files with 2,541 additions and 2,873 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.validation.ConstraintViolationException;

@CommandLine.Command(
name = "test",
Expand Down Expand Up @@ -105,6 +106,8 @@ public Integer call() throws Exception {
throw new CommandLine.ParameterException(this.spec.commandLine(), e.getMessage());
} catch (IOException | TimeoutException e) {
throw new IllegalStateException(e);
} catch (ConstraintViolationException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow", e);
} finally {
applicationContext.getProperty("kestra.storage.local.base-path", Path.class)
.ifPresent(path -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlFlowParser;
import picocli.CommandLine;
Expand All @@ -21,6 +22,9 @@ public class ValidateCommand extends AbstractCommand {
@Inject
private YamlFlowParser yamlFlowParser;

@Inject
private ModelValidator modelValidator;

@CommandLine.Parameters(index = "0", description = "the flow file to test")
private Path file;

Expand All @@ -30,6 +34,7 @@ public Integer call() throws Exception {

try {
Flow parse = yamlFlowParser.parse(file.toFile());
modelValidator.validate(parse);
stdOut(mapper.writeValueAsString(parse));
} catch (ConstraintViolationException e) {
ValidateCommand.handleException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.commands.flows.ValidateCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlFlowParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
Expand All @@ -28,6 +29,9 @@ public class FlowNamespaceUpdateCommand extends AbstractApiCommand {
@Inject
private YamlFlowParser yamlFlowParser;

@Inject
private ModelValidator modelValidator;

@CommandLine.Parameters(index = "0", description = "the namespace of flow to update")
private String namespace;

Expand All @@ -42,7 +46,12 @@ public Integer call() throws Exception {
List<Flow> flows = Files.walk(directory)
.filter(Files::isRegularFile)
.filter(YamlFlowParser::isValidExtension)
.map(path -> yamlFlowParser.parse(path.toFile()))
.map(path -> {
Flow flow = yamlFlowParser.parse(path.toFile());
modelValidator.validate(flow);

return flow;
})
.collect(Collectors.toList());

if (flows.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.kestra.cli.services;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowSource;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
Expand Down Expand Up @@ -34,7 +35,8 @@ public int flows(boolean noRecreate) {
.stream()
.flatMap(flow -> flowRepository.findRevisions(flow.getNamespace(), flow.getId()).stream())
// we can't resend FlowSource since deserialize failed & will be invalid
.filter(flow -> !(flow instanceof FlowSource))
.filter(flow -> !(flow instanceof FlowWithException))
.map(FlowWithSource::toFlow)
.collect(Collectors.toList());

return this.send(flows, QueueFactoryInterface.FLOW_NAMED, Flow.class, noRecreate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kestra.cli.commands.sys;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.services.TaskDefaultService;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
Expand All @@ -16,6 +18,7 @@
import static org.hamcrest.core.StringContains.containsString;

class FlowListenersRestoreCommandTest {

@BeforeAll
static void init() {
if (!KestraClassLoader.isInit()) {
Expand All @@ -32,15 +35,16 @@ void run() throws InterruptedException {

try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
TaskDefaultService taskDefaultService = ctx.getBean(TaskDefaultService.class);

Thread thread = new Thread(() -> {
Integer result = PicocliRunner.call(FlowListenersRestoreCommand.class, ctx, "--timeout=PT1S");
assertThat(result, is(0));
});
thread.start();

for (int i = 0; i < COUNT; i++) {
flowRepository.create(RestoreQueueCommandTest.create());
Flow flow = RestoreQueueCommandTest.create();
flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
Thread.sleep(100);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.cli.commands.sys;

import io.kestra.core.services.TaskDefaultService;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
Expand Down Expand Up @@ -46,11 +47,13 @@ void run() throws InterruptedException {
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
QueueInterface<Flow> flowQueue = ctx.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.FLOW_NAMED));
TaskDefaultService taskDefaultService = ctx.getBean(TaskDefaultService.class);

AtomicInteger atomicInteger = new AtomicInteger();

for (int i = 0; i < COUNT; i++) {
flowRepository.create(create());
Flow flow = create();
flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
}
CountDownLatch countDownLatch = new CountDownLatch(COUNT);

Expand Down
70 changes: 22 additions & 48 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.models.flows;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
Expand All @@ -11,10 +12,10 @@
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.TaskValidationInterface;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -29,19 +30,20 @@
import javax.validation.Valid;
import javax.validation.constraints.*;

@SuperBuilder
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
@FlowValidation
public class Flow implements DeletedInterface {
private static final ObjectMapper jsonMapper = JacksonMapper.ofJson().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
public boolean hasIgnoreMarker(final AnnotatedMember m) {
List<String> exclusions = Arrays.asList("revision", "deleted");
List<String> exclusions = Arrays.asList("revision", "deleted", "source");
return exclusions.contains(m.getName()) || super.hasIgnoreMarker(m);
}
});
Expand Down Expand Up @@ -232,54 +234,9 @@ public boolean equalsWithoutRevision(Flow o) {
}
}

public Optional<ConstraintViolationException> validate() {
Set<ConstraintViolation<?>> violations = new HashSet<>();

List<Task> allTasks = allTasksWithChilds();

// unique id
List<String> ids = allTasks
.stream()
.map(Task::getId)
.collect(Collectors.toList());

List<String> duplicates = ids
.stream()
.distinct()
.filter(entry -> Collections.frequency(ids, entry) > 1).collect(Collectors.toList());

if (duplicates.size() > 0) {
violations.add(ManualConstraintViolation.of(
"Duplicate task id with name [" + String.join(", ", duplicates) + "]",
this,
Flow.class,
"flow.tasks",
String.join(", ", duplicates)
));
}

// validate tasks
allTasks
.forEach(task -> {
if (task instanceof TaskValidationInterface) {
violations.addAll(((TaskValidationInterface<?>) task).failedConstraints());
}
});

if (violations.size() > 0) {
return Optional.of(new ConstraintViolationException(violations));
} else {
return Optional.empty();
}
}

public Optional<ConstraintViolationException> validateUpdate(Flow updated) {
Set<ConstraintViolation<?>> violations = new HashSet<>();

// validate flow
updated.validate()
.ifPresent(e -> violations.addAll(e.getConstraintViolations()));

// change flow id
if (!updated.getId().equals(this.getId())) {
violations.add(ManualConstraintViolation.of(
Expand Down Expand Up @@ -309,6 +266,23 @@ public Optional<ConstraintViolationException> validateUpdate(Flow updated) {
}
}

public String generateSource() {
try {
return JacksonMapper.ofYaml()
.writeValueAsString(
JacksonMapper
.ofJson()
.readTree(
jsonMapper.copy()
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT)
.writeValueAsString(this)
)
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public Flow toDeleted() {
return new Flow(
this.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
@Introspected
@ToString
@EqualsAndHashCode
public class FlowSource extends Flow {
String source;
public class FlowWithException extends FlowWithSource {
String exception;
}
Loading

0 comments on commit 8340a4e

Please sign in to comment.