Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-runner): used an in memory storage for flow & template to avoid performance issue on deserialization from rockdb #467

Merged
merged 1 commit into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ public static String uid(String namespace, String id, Optional<Integer> revision
));
}

public static String uidWithoutRevision(String namespace, String id) {
return String.join("_", Arrays.asList(
namespace,
id
));
}

public static String uidWithoutRevision(Execution execution) {
return String.join("_", Arrays.asList(
execution.getNamespace(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;

import java.util.Collection;
import java.util.Optional;

public interface FlowExecutorInterface {
Flow findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId);
Collection<Flow> allLastVersion();

Optional<Flow> findById(String namespace, String id, Optional<Integer> revision);

default Optional<Flow> findByIdFromFlowTask(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId) {
return this.findById(
namespace,
id,
revision
);
}

default Optional<Flow> findByExecution(Execution execution) {
return this.findById(
execution.getNamespace(),
execution.getFlowId(),
Optional.of(execution.getFlowRevision())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;

import java.util.Collection;
import java.util.Optional;

public class MemoryFlowExecutor implements FlowExecutorInterface {
Expand All @@ -13,8 +14,12 @@ public MemoryFlowExecutor(FlowRepositoryInterface flowRepositoryInterface) {
}

@Override
public Flow findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String flowId) {
return flowRepositoryInterface.findById(namespace, id, revision)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "' with revision + '" + revision + "'"));
public Collection<Flow> allLastVersion() {
return flowRepositoryInterface.findAll();
}

@Override
public Optional<Flow> findById(String namespace, String id, Optional<Integer> revision) {
return flowRepositoryInterface.findById(namespace, id, revision);
}
}
7 changes: 3 additions & 4 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -116,13 +114,14 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl

Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");

io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findById(
io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
runContext.render(this.namespace),
runContext.render(this.flowId),
this.revision != null ? Optional.of(this.revision) : Optional.empty(),
flowVars.get("namespace"),
flowVars.get("id")
);
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "' with revision + '" + revision + "'"));

return runnerUtils
.newExecution(
Expand Down
19 changes: 6 additions & 13 deletions core/src/main/java/io/kestra/core/tasks/flows/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -190,12 +187,8 @@ public Template.Output outputs(RunContext runContext, Execution execution, TaskR
protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException {
TemplateExecutorInterface templateExecutor = applicationContext.getBean(TemplateExecutorInterface.class);

io.kestra.core.models.templates.Template template = templateExecutor.findById(this.namespace, this.templateId);
if (template == null) {
throw new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'");
}

return template;
return templateExecutor.findById(this.namespace, this.templateId)
.orElseThrow(() -> new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'"));
}

@SuperBuilder(toBuilder = true)
Expand Down Expand Up @@ -280,15 +273,15 @@ void onStartup(final StartupEvent event) {
}

public interface TemplateExecutorInterface {
io.kestra.core.models.templates.Template findById(String namespace, String templateId);
Optional<io.kestra.core.models.templates.Template> findById(String namespace, String templateId);
}

public static class MemoryTemplateExecutor implements io.kestra.core.tasks.flows.Template.TemplateExecutorInterface {
@Inject
private TemplateRepositoryInterface templateRepository;

public io.kestra.core.models.templates.Template findById(String namespace, String templateId) {
return this.templateRepository.findById(namespace, templateId).orElse(null);
public Optional<io.kestra.core.models.templates.Template> findById(String namespace, String templateId) {
return this.templateRepository.findById(namespace, templateId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,68 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.services.FlowService;
import io.kestra.runner.kafka.services.SafeKeyValueStore;
import io.micronaut.context.ApplicationContext;
import io.kestra.core.utils.Await;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import java.util.Optional;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
@Singleton
public class KafkaFlowExecutor implements FlowExecutorInterface {
private final FlowService flowService;
private final SafeKeyValueStore<String, Flow> store;
@Inject
private FlowService flowService;
private Map<String, Flow> flows;
private Map<String, Flow> flowsLast;

public KafkaFlowExecutor(ReadOnlyKeyValueStore<String, Flow> store, String name, ApplicationContext applicationContext) {
this.store = new SafeKeyValueStore<>(store, name);
this.flowService = applicationContext.getBean(FlowService.class);
public synchronized void setFlows(List<Flow> flows) {
this.flows = flows
.stream()
.map(flow -> new AbstractMap.SimpleEntry<>(
flow.uid(),
flow
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

this.flowsLast = flowService.keepLastVersion(flows)
.stream()
.map(flow -> new AbstractMap.SimpleEntry<>(
flow.uidWithoutRevision(),
flow
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@SneakyThrows
private void await() {
if (flows == null) {
Await.until(() -> this.flows != null, Duration.ofMillis(100), Duration.ofMinutes(5));
}
}

@Override
public Collection<Flow> allLastVersion() {
this.await();

return this.flowsLast.values();
}

@Override
public Flow findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId) {
public Optional<Flow> findById(String namespace, String id, Optional<Integer> revision) {
this.await();

if (revision.isPresent()) {
return this.store.get(Flow.uid(namespace, id, revision))
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "'"));
} else {
return flowService.keepLastVersion(
this.store.all(),
namespace,
id
);
String uid = Flow.uid(namespace, id, revision);

return Optional.ofNullable(this.flows.get(uid));
}

String uid = Flow.uidWithoutRevision(namespace, id);

return Optional.ofNullable(this.flowsLast.get(uid));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.DefaultScheduler;
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.runner.kafka.configs.TopicsConfig;
import io.kestra.runner.kafka.serializers.JsonSerde;
import io.kestra.runner.kafka.services.*;
import io.kestra.runner.kafka.services.KafkaAdminService;
import io.kestra.runner.kafka.services.KafkaProducerService;
import io.kestra.runner.kafka.services.KafkaStreamService;
import io.kestra.runner.kafka.services.KafkaStreamsBuilder;
import io.kestra.runner.kafka.streams.GlobalStateLockProcessor;
import io.kestra.runner.kafka.streams.GlobalStateProcessor;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -30,15 +33,13 @@
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import jakarta.inject.Singleton;

@KafkaQueueEnabled
@Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
package io.kestra.runner.kafka;

import io.kestra.runner.kafka.services.SafeKeyValueStore;
import io.kestra.core.models.templates.Template;
import io.kestra.core.utils.Await;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import io.kestra.core.models.templates.Template;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@Slf4j
@KafkaQueueEnabled
@Replaces(io.kestra.core.tasks.flows.Template.MemoryTemplateExecutor.class)
@Requires(property = "kestra.server-type", value = "EXECUTOR")
@Requires(property = "kestra.server-type", pattern = "(EXECUTOR|STANDALONE)")
@Singleton
public class KafkaTemplateExecutor implements io.kestra.core.tasks.flows.Template.TemplateExecutorInterface {
private final SafeKeyValueStore<String, Template> store;
private Map<String, Template> templates;


public KafkaTemplateExecutor(ReadOnlyKeyValueStore<String, Template> store, String name) {
this.store = new SafeKeyValueStore<>(store, name);
public synchronized void setTemplates(List<Template> templates) {
this.templates = templates
.stream()
.map(template -> new AbstractMap.SimpleEntry<>(
template.uid(),
template
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Template findById(String namespace, String templateId) {
return this.store.get(Template.uid(namespace, templateId)).orElse(null);
@SneakyThrows
private void await() {
if (templates == null) {
Await.until(() -> this.templates != null, Duration.ofMillis(100), Duration.ofMinutes(5));
}
}

public Optional<Template> findById(String namespace, String templateId) {
this.await();

return Optional.ofNullable(this.templates.get(Template.uid(namespace, templateId)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.services.FlowService;
import io.kestra.runner.kafka.KafkaFlowExecutor;
import io.kestra.runner.kafka.KafkaQueueEnabled;
import io.kestra.runner.kafka.serializers.JsonSerde;
import io.kestra.runner.kafka.services.KafkaAdminService;
import io.kestra.runner.kafka.services.KafkaStreamSourceService;
import io.kestra.runner.kafka.services.KafkaStreamsBuilder;
import io.kestra.runner.kafka.streams.FlowTriggerWithExecutionTransformer;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -34,13 +34,11 @@ public class ExecutorFlowTrigger implements KafkaExecutorInterface {
private FlowService flowService;

@Inject
private KafkaStreamSourceService kafkaStreamSourceService;
private KafkaFlowExecutor kafkaFlowExecutor;

public StreamsBuilder topology() {
StreamsBuilder builder = new KafkaStreamsBuilder();

kafkaStreamSourceService.flowGlobalKTable(builder);

// trigger
builder.addStateStore(
Stores.keyValueStoreBuilder(
Expand All @@ -62,6 +60,7 @@ public StreamsBuilder topology() {
.transformValues(
() -> new FlowTriggerWithExecutionTransformer(
TRIGGER_MULTIPLE_STATE_STORE_NAME,
kafkaFlowExecutor,
flowService
),
Named.as("ExecutorFlowTrigger.transformToExecutionList"),
Expand Down
Loading