From dfb0296456d6181cbf232c0cf7594772fd06a5ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Thu, 1 Aug 2024 16:51:29 +0200 Subject: [PATCH] Expose a way to force unpublished event to be published --- .../eventsourcing/InMemoryEventStore.java | 5 + .../eventsourcing/KafkaEventPublisher.java | 48 ++++++---- .../eventsourcing/InMemoryEventStore.java | 5 + .../ReactorKafkaEventPublisher.java | 93 +++++++++---------- .../fr/maif/eventsourcing/EventPublisher.java | 7 +- .../maif/eventsourcing/ReactorEventStore.java | 24 +++++ 6 files changed, 108 insertions(+), 74 deletions(-) diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java index 9828f1f8..5b0e0bc5 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java @@ -147,6 +147,11 @@ public CompletionStage publish(List> eve return _this.publish(events); } + @Override + public CompletionStage publishNonAcknowledgedFromDb(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { + return CompletionStages.completedStage(Tuple.empty()); + } + @Override public void close() throws IOException { diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/KafkaEventPublisher.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/KafkaEventPublisher.java index 333d10f8..f603e244 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/KafkaEventPublisher.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/KafkaEventPublisher.java @@ -79,6 +79,12 @@ public KafkaEventPublisher(ActorSystem system, ProducerSettings CompletionStage publishNonAcknowledgedFromDb(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { + return republishFromDBSource(eventStore, concurrentReplayStrategy).runWith(Sink.ignore(), materializer) + .thenApply(d -> Tuple.empty()); + } + @Override public void start(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { killSwitch = RestartSource @@ -86,25 +92,7 @@ public void start(EventStore eventStore, Concur RestartSettings.create(restartInterval, maxRestartInterval, 0), () -> { LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", topic); - return Source.completionStage(eventStore.openTransaction().toCompletableFuture()) - .flatMapConcat(tx -> { - - LOGGER.info("Replaying not published in DB for {}", topic); - ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; - return Source.fromPublisher(eventStore.loadEventsUnpublished(tx, strategy)) - .via(publishToKafka(eventStore, Option.some(tx), groupFlow)) - .alsoTo(logProgress(100)) - .watchTermination((nu, cs) -> - cs.whenComplete((d, e) -> { - eventStore.commitOrRollback(Option.of(e), tx); - if (e != null) { - LOGGER.error("Error replaying non published events to kafka for "+topic, e); - } else { - LOGGER.info("Replaying events not published in DB is finished for {}", topic); - } - }) - ); - }) + return republishFromDBSource(eventStore, concurrentReplayStrategy) .concat( this.eventsSource.via(publishToKafka( eventStore, @@ -131,6 +119,28 @@ public void start(EventStore eventStore, Concur .run(materializer).first(); } + private Source, NotUsed> republishFromDBSource(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { + return Source.completionStage(eventStore.openTransaction().toCompletableFuture()) + .flatMapConcat(tx -> { + + LOGGER.info("Replaying not published in DB for {}", topic); + ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; + return Source.fromPublisher(eventStore.loadEventsUnpublished(tx, strategy)) + .via(publishToKafka(eventStore, Option.some(tx), groupFlow)) + .alsoTo(logProgress(100)) + .watchTermination((nu, cs) -> + cs.whenComplete((d, e) -> { + eventStore.commitOrRollback(Option.of(e), tx); + if (e != null) { + LOGGER.error("Error replaying non published events to kafka for " + topic, e); + } else { + LOGGER.info("Replaying events not published in DB is finished for {}", topic); + } + }) + ); + }); + } + private Flow, EventEnvelope, NotUsed> publishToKafka(EventStore eventStore, Option tx, Flow, EventEnvelope>, List, EventEnvelope>>, NotUsed> groupFlow) { Flow, EventEnvelope>, ProducerMessage.Results, EventEnvelope>, NotUsed> publishToKafkaFlow = Producer., EventEnvelope>flexiFlow(producerSettings); diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java index c5419d10..cb577ae9 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java @@ -185,6 +185,11 @@ public CompletionStage publish(List> eve return _this.publish(events); } + @Override + public CompletionStage publishNonAcknowledgedFromDb(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { + return CompletionStages.completedStage(Tuple.empty()); + } + @Override public void close() throws IOException { diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index 6f83e504..e22036d7 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -30,7 +30,6 @@ import java.util.Objects; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -44,10 +43,7 @@ public class ReactorKafkaEventPublisher implemen private final String topic; private final Integer queueBufferSize; - private final AtomicReference>> queue = new AtomicReference<>(); -// private final Sinks.Many> queue; - private final AtomicReference>> eventSource = new AtomicReference<>(); -// private final Flux> eventSource; + private final Sinks.Many> queue; private final SenderOptions> senderOptions; private final Duration restartInterval; private final Duration maxRestartInterval; @@ -68,23 +64,11 @@ public ReactorKafkaEventPublisher(SenderOptions void start(EventStore eventStore, Concur Sinks.Many> logProgressSink = Sinks.many().unicast().onBackpressureBuffer(); logProgress(logProgressSink.asFlux(), 100).subscribe(); - killSwitch = Mono.defer(() -> fromCS(eventStore::openTransaction) + killSwitch = publishFromDb(eventStore, concurrentReplayStrategy, logProgressSink) + .concatMap(countAndLastSeqNum -> { +// Flux.defer(() -> { + LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", topic, countAndLastSeqNum.lastSeqNum); + return queue.asFlux() + .filter(e -> e.sequenceNum > countAndLastSeqNum.lastSeqNum) + .transform(publishToKafka( + eventStore, + Option.none(), + bufferTimeout(200, Duration.ofMillis(20)), + bufferTimeout(200, Duration.ofSeconds(1)) + )); + }) + .doOnError(e -> { + LOGGER.error("Error publishing events to kafka", e); + }) + .retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval) + .transientErrors(true) + .maxBackoff(maxRestartInterval) + .doBeforeRetry(ctx -> { + LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure()); + }) + ) + .subscribe(); + } + + @Override + public CompletionStage publishNonAcknowledgedFromDb(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { + Sinks.Many> logProgressSink = Sinks.many().unicast().onBackpressureBuffer(); + logProgress(logProgressSink.asFlux(), 100).subscribe(); + return publishFromDb(eventStore, concurrentReplayStrategy, logProgressSink) + .collectList() + .map(any -> Tuple.empty()) + .toFuture(); + } + + private Flux publishFromDb(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy, Sinks.Many> logProgressSink) { + return Mono.defer(() -> fromCS(eventStore::openTransaction) .flatMap(tx -> { LOGGER.info("Replaying events not published from DB in topic {}", topic); ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; @@ -136,31 +157,7 @@ public void start(EventStore eventStore, Concur } }); })) - .flux() - .concatMap(countAndLastSeqNum -> { -// Flux.defer(() -> { - LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", topic, countAndLastSeqNum.lastSeqNum); - return eventSource.get() - .filter(e -> e.sequenceNum > countAndLastSeqNum.lastSeqNum) - .transform(publishToKafka( - eventStore, - Option.none(), - bufferTimeout(200, Duration.ofMillis(20)), - bufferTimeout(200, Duration.ofSeconds(1)) - )); - }) - .doOnError(e -> { - reinitQueue(); - LOGGER.error("Error publishing events to kafka", e); - }) - .retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval) - .transientErrors(true) - .maxBackoff(maxRestartInterval) - .doBeforeRetry(ctx -> { - LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure()); - }) - ) - .subscribe(); + .flux(); } private Function>, Flux>> publishToKafka(EventStore eventStore, @@ -197,7 +194,7 @@ public CompletionStage publish(List> eve return Flux .fromIterable(events) .map(t -> { - queue.get().tryEmitNext(t).orThrow(); + queue.tryEmitNext(t).orThrow(); return Tuple.empty(); }) .retryWhen(Retry.fixedDelay(50, Duration.ofMillis(1)) @@ -248,10 +245,4 @@ private Flux logProgress(Flux logProgress, int every) { } }); } - - public Integer getBufferedElementCount() { -// return this.queue.scan(Scannable.Attr.BUFFERED); - return 0; - } - } diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventPublisher.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventPublisher.java index 9a20a8ed..f2bd4a5a 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventPublisher.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventPublisher.java @@ -1,9 +1,7 @@ package fr.maif.eventsourcing; -import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple0; import io.vavr.collection.List; -import io.vavr.concurrent.Future; import java.io.Closeable; import java.util.concurrent.CompletionStage; @@ -11,6 +9,7 @@ public interface EventPublisher extends Closeable { CompletionStage publish(List> events); - default void start(EventStore eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) { - } + CompletionStage publishNonAcknowledgedFromDb(EventStore eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy); + + default void start(EventStore eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {} } diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorEventStore.java index 0ec0eb32..1913728b 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorEventStore.java @@ -5,6 +5,10 @@ import io.vavr.control.Option; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import scala.util.hashing.MurmurHash3$; + +import java.util.concurrent.atomic.LongAccumulator; +import java.util.function.Function; public interface ReactorEventStore { @@ -51,6 +55,26 @@ default Mono>> markAsPublished(List commitOrRollback(Option of, TxCtx tx); + /** + * Stream elements from journal and execute an handling function concurrently. + * The function shard by entity id, so event for the same entity won't be handled concurrently. + * + * @param fromSequenceNum sequence num to start with + * @param parallelism concurrent factor + * @param maxEventsToHandle limit to n events + * @param handle the handling fonction for example to build a new projection + * @return the last sequence num handled + */ + default Mono concurrentReplay(Long fromSequenceNum, Integer parallelism, Option maxEventsToHandle, Function>, Mono> handle) { + LongAccumulator lastSeqNum = new LongAccumulator(Long::max, 0); + EventStore.Query.Builder tmpQuery = EventStore.Query.builder().withSequenceFrom(fromSequenceNum); + return this.loadEventsByQuery(maxEventsToHandle.fold(() -> tmpQuery, tmpQuery::withSize).build()) + .groupBy(evt -> MurmurHash3$.MODULE$.stringHash(evt.entityId) % parallelism) + .flatMap(flux -> handle.apply(flux.doOnNext(evt -> lastSeqNum.accumulate(evt.sequenceNum))), parallelism) + .last() + .map(any -> lastSeqNum.get()); + } + EventStore toEventStore(); static ReactorEventStore fromEventStore(EventStore eventStore) {