Skip to content

Commit

Permalink
feat(*): Migrate from RxJava2 to Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 30, 2024
1 parent 3498136 commit ac8da5e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
compileOnly "io.micronaut:micronaut-inject"
compileOnly "io.micronaut.validation:micronaut-validation"
compileOnly "io.micronaut.rxjava2:micronaut-rxjava2"
compileOnly "io.micronaut.reactor:micronaut-reactor"
compileOnly "io.micronaut:micronaut-jackson-databind"

// kestra
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/io/kestra/plugin/amqp/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.amqp.models.Message;
import io.kestra.plugin.amqp.models.SerdeType;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -93,8 +94,8 @@ public Publish.Output run(RunContext runContext) throws Exception {
Channel channel = connection.createChannel();

Integer count = 1;
Flowable<Message> flowable;
Flowable<Integer> resultFlowable;
Flux<Message> flowable;
Flux<Integer> resultFlowable;

if (this.from instanceof String) {
if (!isValidURI((String) this.from)) {
Expand All @@ -103,16 +104,16 @@ public Publish.Output run(RunContext runContext) throws Exception {

URI from = new URI(runContext.render((String) this.from));
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
flowable = Flowable.create(FileSerde.reader(inputStream, Message.class), BackpressureStrategy.BUFFER);
flowable = Flux.create(FileSerde.reader(inputStream, Message.class), FluxSink.OverflowStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, channel, runContext);

count = resultFlowable
.reduce(Integer::sum)
.blockingGet();
.block();
}

} else if (this.from instanceof List) {
flowable = Flowable.fromArray(((List<?>) this.from)
flowable = Flux.fromArray(((List<?>) this.from)
.stream()
.map(throwFunction(row -> {
if (row instanceof Map) {
Expand All @@ -129,7 +130,7 @@ public Publish.Output run(RunContext runContext) throws Exception {

count = resultFlowable
.reduce(Integer::sum)
.blockingGet();
.block();
} else {
publish(channel, JacksonMapper.toMap(runContext.render((Map<String, Object>) this.from), Message.class), runContext);
}
Expand All @@ -146,12 +147,12 @@ public Publish.Output run(RunContext runContext) throws Exception {
}


private Flowable<Integer> buildFlowable(Flowable<Message> flowable, Channel channel, RunContext runContext) {
private Flux<Integer> buildFlowable(Flux<Message> flowable, Channel channel, RunContext runContext) throws Exception {
return flowable
.map(message -> {
.map(throwFunction(message -> {
publish(channel, message, runContext);
return 1;
});
}));
}


Expand Down

0 comments on commit ac8da5e

Please sign in to comment.