Skip to content

Commit

Permalink
feat(kafka-runner): introduce async queue and send log async (#806)
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo authored Nov 8, 2022
1 parent 8a574b9 commit a2b6578
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 17 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/queues/QueueInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
public interface QueueInterface<T> extends Closeable {
void emit(T message) throws QueueException;

void emitAsync(T message) throws QueueException;

void delete(T message) throws QueueException;

Runnable receive(Consumer<T> consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public ContextAppender(QueueInterface<LogEntry> logQueue, LogEntry logEntry) {
@Override
protected void append(ILoggingEvent e) {
logEntries(e, logEntry)
.forEach(logQueue::emit);
.forEach(logQueue::emitAsync);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Flow injectDefaults(Flow flow, Execution execution) {
Execution.loggingEventFromException(e),
LogEntry.of(execution)
)
.forEach(logQueue::emit);
.forEach(logQueue::emitAsync);
return flow;
}
}
Expand Down
2 changes: 1 addition & 1 deletion jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ private boolean deduplicateFlowTrigger(Execution execution, ExecutorState execut
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
try {
failedExecutionWithLog.getLogs().forEach(logQueue::emit);
failedExecutionWithLog.getLogs().forEach(logQueue::emitAsync);

return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
} catch (Exception ex) {
Expand Down
5 changes: 5 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public void emit(T message) {
this.produce(queueService.key(message), message, false);
}

@Override
public void emitAsync(T message) throws QueueException {
this.emit(message);
}

@Override
public void delete(T message) throws QueueException {
dslContextWrapper.transaction(configuration -> DSL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]>
Execution.FailedExecutionWithLog failedExecutionWithLog = execution.failedExecutionFromExecutor(exception);
Execution sendExecution = failedExecutionWithLog.getExecution();

failedExecutionWithLog.getLogs().forEach(logEntry -> logQueue.emit(logEntry));
failedExecutionWithLog.getLogs().forEach(logEntry -> logQueue.emitAsync(logEntry));

if (exception instanceof RecordTooLargeException) {
boolean exit = false;
Expand Down
37 changes: 24 additions & 13 deletions runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
Expand All @@ -28,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -84,31 +86,40 @@ public KafkaQueue(String topicKey, Class<T> cls, ApplicationContext applicationC
kafkaAdminService.createIfNotExist(topicKey);
}

private Future<RecordMetadata> produceAsync(String key, T message) {
return kafkaProducer
.send(
new ProducerRecord<>(
topicsConfig.getName(),
key, message
),
(metadata, e) -> {
if (e != null) {
log.error("Failed to produce on '{}' with key '{}', metadata '{}' ", this.cls, key, metadata, e);
}
}
);
}

private void produce(String key, T message) {
try {
kafkaProducer
.send(
new ProducerRecord<>(
topicsConfig.getName(),
key, message
),
(metadata, e) -> {
if (e != null) {
log.error("Failed to produce on '{}' with key '{}', metadata '{}' ", this.cls, key, metadata, e);
}
}
)
.get();
this.produceAsync(key, message).get();
} catch (InterruptedException | ExecutionException e) {
throw new QueueException("Failed to produce on '" + this.cls + "' with key '" + key + "': ", e);
}
}


@Override
public void emit(T message) throws QueueException {
this.produce(this.queueService.key(message), message);
}

@Override
public void emitAsync(T message) throws QueueException {
this.produceAsync(this.queueService.key(message), message);
}

@Override
public void delete(T message) throws QueueException {
this.produce(this.queueService.key(message), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public void emit(T message) {
this.produce(queueService.key(message), message);
}

@Override
public void emitAsync(T message) throws QueueException {
this.emit(message);
}

@Override
public void delete(T message) throws QueueException {
this.produce(queueService.key(message), null);
Expand Down

0 comments on commit a2b6578

Please sign in to comment.