From 05b229e3fc60db874f153602f0d4978073cd09fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Fri, 24 Nov 2023 09:41:19 +0100 Subject: [PATCH] More control on event publishing --- .../ReactorKafkaEventPublisher.java | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index 81ce8346..ca368d1b 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -148,24 +148,15 @@ public CompletionStage publish(List> eve LOGGER.debug("Publishing event in memory : \n{} ", events); return Flux .fromIterable(events) - .concatMap(t -> - Mono.defer(() -> { - Sinks.EmitResult emitResult = queue.tryEmitNext(t); - LOGGER.debug("Event publisher {}, {} buffered elements ( capacity = {} ), emitResult = {}, event = {}", topic, queue.scan(Scannable.Attr.BUFFERED), queue.scan(Scannable.Attr.CAPACITY), emitResult, t); - if (emitResult.isFailure()) { - return Mono.error(new RuntimeException("Error publishing to queue for %s : %s".formatted(topic, emitResult))); - } else { - return Mono.just(""); - } - }) - .retryWhen(Retry - .backoff(5, Duration.ofMillis(500)) - .doBeforeRetry(ctx -> { - LOGGER.error("Error publishing to queue %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); - }) - ) - .onErrorReturn("") - ) + .map(t -> { + try { + queue.emitNext(t, Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1))); + return Tuple.empty(); + } catch (Exception e) { + LOGGER.error("Error publishing to topic %s".formatted(topic), e); + return Tuple.empty(); + } + }) .collectList() .thenReturn(Tuple.empty()) .toFuture();