Skip to content

Commit

Permalink
feat(kafka-runner): create multiple stream thread to separate load (#452
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tchiotludo committed Jan 28, 2022
1 parent 8673acc commit 95d2b2d
Show file tree
Hide file tree
Showing 22 changed files with 1,227 additions and 973 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.kestra.cli.commands.servers;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.runners.ExecutorInterface;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.AbstractExecutor;
import io.kestra.core.utils.Await;
import picocli.CommandLine;

Expand Down Expand Up @@ -36,12 +36,12 @@ public static Map<String, Object> propertiesOverrides() {
public Integer call() throws Exception {
super.call();

AbstractExecutor abstractExecutor = applicationContext.getBean(AbstractExecutor.class);
abstractExecutor.run();
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();

log.info("Executor started");

this.shutdownHook(abstractExecutor::close);
this.shutdownHook(executorService::close);

Await.until(() -> !this.applicationContext.isRunning());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package io.kestra.cli.commands.servers;

import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.AbstractExecutor;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.utils.Await;
import io.kestra.runner.kafka.KafkaExecutor;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import jakarta.inject.Inject;

@CommandLine.Command(
name = "standalone",
Expand Down
2 changes: 1 addition & 1 deletion cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ kestra:
segment.bytes: "10485760"

executor:
name: "${kestra.kafka.defaults.topic-prefix}executor-executor-changelog"
name: "${kestra.kafka.defaults.topic-prefix}executor_main-executor-changelog"
cls: io.kestra.core.runners.Executor
properties:
cleanup.policy: "delete,compact"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,13 @@ public boolean hasTaskRunJoinable(TaskRun taskRun) {
}

/**
* Convert an exception on {@link io.kestra.core.runners.AbstractExecutor} and add log to the current
* Convert an exception on Executor and add log to the current
* {@code RUNNING} taskRun, on the lastAttempts.
* If no Attempt is found, we create one (must be nominal case).
* The executor will catch the {@code FAILED} taskRun emitted and will failed the execution.
* In the worst case, we FAILED the execution (only from {@link io.kestra.core.models.triggers.types.Flow}).
*
* @param e the exception throw from {@link io.kestra.core.runners.AbstractExecutor}
* @param e the exception throw from Executor
* @return a new execution with taskrun failed if possible or execution failed is other case
*/
public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.core.runners;

import java.io.Closeable;

public interface ExecutorInterface extends Closeable, Runnable {

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,43 @@
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.services.ConditionService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.kestra.core.utils.Rethrow.throwFunction;
import static io.kestra.core.utils.Rethrow.throwPredicate;

public abstract class AbstractExecutor implements Runnable, Closeable {
protected static final Logger log = org.slf4j.LoggerFactory.getLogger(AbstractExecutor.class);
@Singleton
@Slf4j
public class ExecutorService {
@Inject
protected ApplicationContext applicationContext;

@Inject
protected RunContextFactory runContextFactory;

@Inject
protected MetricRegistry metricRegistry;

@Inject
protected ConditionService conditionService;

protected FlowExecutorInterface flowExecutorInterface;

public AbstractExecutor(
RunContextFactory runContextFactory,
MetricRegistry metricRegistry,
ConditionService conditionService
) {
this.runContextFactory = runContextFactory;
this.metricRegistry = metricRegistry;
this.conditionService = conditionService;
protected FlowExecutorInterface flowExecutorInterface() {
// bean is injected late, so we need to wait
if (this.flowExecutorInterface == null) {
this.flowExecutorInterface = applicationContext.getBean(FlowExecutorInterface.class);
}

return this.flowExecutorInterface;
}

public Executor process(Executor executor) {
Expand Down Expand Up @@ -250,7 +259,7 @@ private List<TaskRun> saveFlowableOutput(
ImmutableMap.of()
);
} catch (Exception e) {
log.warn("Unable to save output on taskRun '{}'", taskRun, e);
executor.getFlow().logger().warn("Unable to save output on taskRun '{}'", taskRun, e);
}

return taskRun;
Expand Down Expand Up @@ -510,7 +519,7 @@ private Executor handleFlowTask(final Executor executor) {
);

try {
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface);
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface());

WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()
.task(flowTask)
Expand Down Expand Up @@ -552,7 +561,7 @@ private Executor handleFlowTask(final Executor executor) {
return resultExecutor;
}

protected static void log(Logger log, Boolean in, WorkerTask value) {
public void log(Logger log, Boolean in, WorkerTask value) {
log.debug(
"{} {} : {}",
in ? "<< IN " : ">> OUT",
Expand All @@ -561,7 +570,7 @@ protected static void log(Logger log, Boolean in, WorkerTask value) {
);
}

protected static void log(Logger log, Boolean in, WorkerTaskResult value) {
public void log(Logger log, Boolean in, WorkerTaskResult value) {
log.debug(
"{} {} : {}",
in ? "<< IN " : ">> OUT",
Expand All @@ -570,7 +579,7 @@ protected static void log(Logger log, Boolean in, WorkerTaskResult value) {
);
}

protected static void log(Logger log, Boolean in, Execution value) {
public void log(Logger log, Boolean in, Execution value) {
log.debug(
"{} {} [key='{}']\n{}",
in ? "<< IN " : ">> OUT",
Expand All @@ -580,7 +589,7 @@ protected static void log(Logger log, Boolean in, Execution value) {
);
}

protected static void log(Logger log, Boolean in, Executor value) {
public void log(Logger log, Boolean in, Executor value) {
log.debug(
"{} {} [key='{}', from='{}', offset='{}']\n{}",
in ? "<< IN " : ">> OUT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.Optional;

public class MemoryFlowExecutor implements FlowExecutorInterface {
private FlowRepositoryInterface flowRepositoryInterface;
private final FlowRepositoryInterface flowRepositoryInterface;

public MemoryFlowExecutor(FlowRepositoryInterface flowRepositoryInterface) {
this.flowRepositoryInterface = flowRepositoryInterface;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

import jakarta.inject.Inject;
import jakarta.inject.Named;

@Slf4j
public class StandAloneRunner implements RunnerInterface, Closeable {
@Setter private ExecutorService poolExecutor;
@Setter private java.util.concurrent.ExecutorService poolExecutor;
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter protected boolean schedulerEnabled = true;

Expand Down Expand Up @@ -47,7 +47,7 @@ public void run() {

poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");

poolExecutor.execute(applicationContext.getBean(AbstractExecutor.class));
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));

Worker worker = new Worker(applicationContext, workerThread);
applicationContext.registerSingleton(worker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.micronaut.context.ApplicationContext;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.slf4j.Logger;

import java.io.IOException;
Expand Down Expand Up @@ -186,7 +187,8 @@ public RunResult run(
.maxAttempt(5)
.build()
).run(
InternalServerErrorException.class,
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
throwable.getCause() instanceof ConnectionClosedException,
() -> {
pull
.withTag(!imageParse.tag.equals("") ? imageParse.tag : "latest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;

@Singleton
public class FlowTriggerCaseTest {
Expand Down Expand Up @@ -49,7 +49,8 @@ public void trigger() throws InterruptedException, TimeoutException {

logEntryQueue.receive(logEntry -> {
if (logEntry.getMessage().contains("Failed to trigger flow") &&
logEntry.getTriggerId().equals("listen-flow-invalid")
logEntry.getTriggerId() != null &&
logEntry.getTriggerId().equals("listen-flow-invalid")
) {
countDownLatch.countDown();
}
Expand Down
Loading

0 comments on commit 95d2b2d

Please sign in to comment.