From ac8da5e285a598471ed7437f6b6bc58962a93ea8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Fri, 5 Jan 2024 09:50:52 +0100 Subject: [PATCH] feat(*): Migrate from RxJava2 to Reactor --- build.gradle | 2 +- .../java/io/kestra/plugin/amqp/Publish.java | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/build.gradle b/build.gradle index 4507d2a..2d2469b 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,7 @@ dependencies { compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion") compileOnly "io.micronaut:micronaut-inject" compileOnly "io.micronaut.validation:micronaut-validation" - compileOnly "io.micronaut.rxjava2:micronaut-rxjava2" + compileOnly "io.micronaut.reactor:micronaut-reactor" compileOnly "io.micronaut:micronaut-jackson-databind" // kestra diff --git a/src/main/java/io/kestra/plugin/amqp/Publish.java b/src/main/java/io/kestra/plugin/amqp/Publish.java index 624958d..2de2109 100644 --- a/src/main/java/io/kestra/plugin/amqp/Publish.java +++ b/src/main/java/io/kestra/plugin/amqp/Publish.java @@ -15,13 +15,14 @@ import io.kestra.core.serializers.JacksonMapper; import io.kestra.plugin.amqp.models.Message; import io.kestra.plugin.amqp.models.SerdeType; -import io.reactivex.BackpressureStrategy; -import io.reactivex.Flowable; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; import jakarta.validation.constraints.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -93,8 +94,8 @@ public Publish.Output run(RunContext runContext) throws Exception { Channel channel = connection.createChannel(); Integer count = 1; - Flowable flowable; - Flowable resultFlowable; + Flux flowable; + Flux resultFlowable; if (this.from instanceof String) { if (!isValidURI((String) this.from)) { @@ -103,16 +104,16 @@ public Publish.Output run(RunContext runContext) throws Exception { URI from = new URI(runContext.render((String) this.from)); try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) { - flowable = Flowable.create(FileSerde.reader(inputStream, Message.class), BackpressureStrategy.BUFFER); + flowable = Flux.create(FileSerde.reader(inputStream, Message.class), FluxSink.OverflowStrategy.BUFFER); resultFlowable = this.buildFlowable(flowable, channel, runContext); count = resultFlowable .reduce(Integer::sum) - .blockingGet(); + .block(); } } else if (this.from instanceof List) { - flowable = Flowable.fromArray(((List) this.from) + flowable = Flux.fromArray(((List) this.from) .stream() .map(throwFunction(row -> { if (row instanceof Map) { @@ -129,7 +130,7 @@ public Publish.Output run(RunContext runContext) throws Exception { count = resultFlowable .reduce(Integer::sum) - .blockingGet(); + .block(); } else { publish(channel, JacksonMapper.toMap(runContext.render((Map) this.from), Message.class), runContext); } @@ -146,12 +147,12 @@ public Publish.Output run(RunContext runContext) throws Exception { } - private Flowable buildFlowable(Flowable flowable, Channel channel, RunContext runContext) { + private Flux buildFlowable(Flux flowable, Channel channel, RunContext runContext) throws Exception { return flowable - .map(message -> { + .map(throwFunction(message -> { publish(channel, message, runContext); return 1; - }); + })); }