diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java index 3928ce5e03068..7e49c499fbafa 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java @@ -210,6 +210,13 @@ List findAnnotationsOnMethods(DotName annotation) { .collect(Collectors.toList()); } + List findRepeatableAnnotationsOnMethods(DotName annotation) { + return index.getAnnotationsWithRepeatable(annotation, index) + .stream() + .filter(it -> it.target().kind() == AnnotationTarget.Kind.METHOD) + .collect(Collectors.toList()); + } + List findAnnotationsOnInjectionPoints(DotName annotation) { return index.getAnnotations(annotation) .stream() diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 9c98b31b82e65..c78452f945515 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -161,7 +161,7 @@ public void defaultChannelConfiguration( if (launchMode.getLaunchMode().isDevOrTest()) { if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) { - List incomings = discoveryState.findAnnotationsOnMethods(DotNames.INCOMING); + List incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING); List channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL); List annotations = new ArrayList<>(); annotations.addAll(incomings); @@ -188,7 +188,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, BuildProducer reflection) { Map alreadyGeneratedSerializers = new HashMap<>(); Map alreadyGeneratedDeserializers = new HashMap<>(); - for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.INCOMING)) { + for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.INCOMING)) { String channelName = annotation.value().asString(); if (!discovery.isKafkaConnector(channelsManagedByConnectors, true, channelName)) { continue; diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java index be99802f9ea0e..0f941cc60d1bd 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -63,7 +64,9 @@ private static void doTest(Tuple[] expectations, Class... classesToIndex) { private static void doTest(Config customConfig, Tuple[] expectations, Class... classesToIndex) { List configs = new ArrayList<>(); - DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classesToIndex)) { + List> classes = new ArrayList<>(Arrays.asList(classesToIndex)); + classes.add(Incoming.class); + DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classes)) { @Override Config getConfig() { return customConfig != null ? customConfig : super.getConfig(); @@ -89,7 +92,7 @@ boolean isKafkaConnector(List list, boolean in } } - private static IndexView index(Class... classes) { + private static IndexView index(List> classes) { Indexer indexer = new Indexer(); for (Class clazz : classes) { try { @@ -2696,4 +2699,33 @@ private static class TransactionalProducer { } + @Test + void repeatableIncomings() { + Tuple[] expectations = { + tuple("mp.messaging.incoming.channel1.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel3.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"), + tuple("mp.messaging.incoming.channel4.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"), + }; + doTest(expectations, RepeatableIncomingsChannels.class); + } + + + private static class RepeatableIncomingsChannels { + + @Incoming("channel1") + @Incoming("channel2") + void method1(String msg) { + + } + + @Incoming("channel3") + @Incoming("channel4") + void method2(JsonObject msg) { + + } + + } + + } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index 9300b53b1dafc..c78440905aa61 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -142,6 +142,9 @@ public List removalExclusions() { new UnremovableBeanBuildItem( new BeanClassAnnotationExclusion( ReactiveMessagingDotNames.INCOMING)), + new UnremovableBeanBuildItem( + new BeanClassAnnotationExclusion( + ReactiveMessagingDotNames.INCOMINGS)), new UnremovableBeanBuildItem( new BeanClassAnnotationExclusion( ReactiveMessagingDotNames.OUTGOING))); diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/KafkaRepeatableReceivers.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/KafkaRepeatableReceivers.java new file mode 100644 index 0000000000000..489319d635185 --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/KafkaRepeatableReceivers.java @@ -0,0 +1,28 @@ +package io.quarkus.it.kafka; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +@ApplicationScoped +public class KafkaRepeatableReceivers { + + private final List prices = new CopyOnWriteArrayList<>(); + + @Incoming("prices-in") + @Incoming("prices-in2") + public CompletionStage consume(Message msg) { + prices.add(msg.getPayload()); + return msg.ack(); + } + + public List getPrices() { + return prices; + } + +} diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesProducer.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesProducer.java new file mode 100644 index 0000000000000..cd47f37c2a158 --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesProducer.java @@ -0,0 +1,21 @@ +package io.quarkus.it.kafka; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.mutiny.Multi; + +@ApplicationScoped +public class PricesProducer { + + @Outgoing("prices-out") + public Multi generatePrices() { + return Multi.createFrom().items(1.2, 2.2, 3.4); + } + + @Outgoing("prices-out2") + public Multi generatePrices2() { + return Multi.createFrom().items(4.5, 5.6, 6.7); + } +} diff --git a/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties b/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties index 4a3e4b4dffe17..e1c86a9bd6319 100644 --- a/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties +++ b/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties @@ -25,3 +25,9 @@ mp.messaging.outgoing.pets-out.topic=pets mp.messaging.incoming.pets-in.topic=pets quarkus.redis.my-redis.hosts=${quarkus.redis.hosts} + +mp.messaging.outgoing.prices-out.topic=prices +mp.messaging.outgoing.prices-out2.topic=prices2 + +mp.messaging.incoming.prices-in.topic=prices +mp.messaging.incoming.prices-in2.topic=prices2 diff --git a/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java index 1acd4d3c45a77..b7e6e64b59dfa 100644 --- a/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java +++ b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java @@ -8,7 +8,9 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import io.quarkus.arc.Arc; import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.DisabledOnIntegrationTest; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.kafka.KafkaCompanionResource; import io.restassured.common.mapper.TypeRef; @@ -48,4 +50,11 @@ public void testFruits() { await().untilAsserted(() -> Assertions.assertEquals(get("/kafka/fruits").as(TYPE_REF).size(), 4)); } + @Test + @DisabledOnIntegrationTest + public void testPrices() { + KafkaRepeatableReceivers repeatableReceivers = Arc.container().instance(KafkaRepeatableReceivers.class).get(); + await().untilAsserted(() -> Assertions.assertEquals(repeatableReceivers.getPrices().size(), 6)); + } + }