diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index 81ce8346..ca368d1b 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -148,24 +148,15 @@ public CompletionStage publish(List> eve LOGGER.debug("Publishing event in memory : \n{} ", events); return Flux .fromIterable(events) - .concatMap(t -> - Mono.defer(() -> { - Sinks.EmitResult emitResult = queue.tryEmitNext(t); - LOGGER.debug("Event publisher {}, {} buffered elements ( capacity = {} ), emitResult = {}, event = {}", topic, queue.scan(Scannable.Attr.BUFFERED), queue.scan(Scannable.Attr.CAPACITY), emitResult, t); - if (emitResult.isFailure()) { - return Mono.error(new RuntimeException("Error publishing to queue for %s : %s".formatted(topic, emitResult))); - } else { - return Mono.just(""); - } - }) - .retryWhen(Retry - .backoff(5, Duration.ofMillis(500)) - .doBeforeRetry(ctx -> { - LOGGER.error("Error publishing to queue %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); - }) - ) - .onErrorReturn("") - ) + .map(t -> { + try { + queue.emitNext(t, Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1))); + return Tuple.empty(); + } catch (Exception e) { + LOGGER.error("Error publishing to topic %s".formatted(topic), e); + return Tuple.empty(); + } + }) .collectList() .thenReturn(Tuple.empty()) .toFuture();