Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka serializer cannot be autodetected when injecting an Instance #44500

Closed
rokkolesa opened this issue Nov 14, 2024 · 4 comments · Fixed by #44526
Closed

Kafka serializer cannot be autodetected when injecting an Instance #44500

rokkolesa opened this issue Nov 14, 2024 · 4 comments · Fixed by #44526
Labels
area/kafka kind/enhancement New feature or request
Milestone

Comments

@rokkolesa
Copy link

Describe the bug

When injecting an Emitter or MutinyEmitter wrapped in Instance, the serializer is not autodetected and instead fails the application startup.

Expected behavior

I expect that it should not matter if the emitter is injected directly or via Instance regarding the serializer autodetection.

Actual behavior

The application startup fails with the error:

 Failed to start quarkus: io.quarkus.dev.appstate.ApplicationStartException: java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.dev.appstate.ApplicationStateNotification.waitForApplicationStart(ApplicationStateNotification.java:63)
        at io.quarkus.runner.bootstrap.StartupActionImpl.runMainClass(StartupActionImpl.java:142)
        at io.quarkus.deployment.dev.IsolatedDevModeMain.restartApp(IsolatedDevModeMain.java:202)
        at io.quarkus.deployment.dev.IsolatedDevModeMain.restartCallback(IsolatedDevModeMain.java:183)
        at io.quarkus.deployment.dev.RuntimeUpdatesProcessor.doScan(RuntimeUpdatesProcessor.java:555)
        at io.quarkus.deployment.console.ConsoleStateManager.forceRestart(ConsoleStateManager.java:175)
        at io.quarkus.deployment.console.ConsoleStateManager.lambda$installBuiltins$0(ConsoleStateManager.java:112)
        at io.quarkus.deployment.console.ConsoleStateManager$1.accept(ConsoleStateManager.java:77)
        at io.quarkus.deployment.console.ConsoleStateManager$1.accept(ConsoleStateManager.java:49)
        at io.quarkus.deployment.console.AeshConsole.lambda$setup$1(AeshConsole.java:278)
        at org.aesh.terminal.EventDecoder.accept(EventDecoder.java:118)
        at org.aesh.terminal.EventDecoder.accept(EventDecoder.java:31)
        at org.aesh.terminal.io.Decoder.write(Decoder.java:133)
        at org.aesh.readline.tty.terminal.TerminalConnection.openBlocking(TerminalConnection.java:216)
        at org.aesh.readline.tty.terminal.TerminalConnection.openBlocking(TerminalConnection.java:203)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:121)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:116)
        ... 1 more
Caused by: jakarta.enterprise.inject.spi.DeploymentException: java.lang.IllegalArgumentException: The attribute `value.serializer` on connector 'smallrye-kafka' (channel: foo_out) must be set
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:58)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_qTrMuLFyQ1IvGfeSxRVitl6CCBQ.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:163)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:114)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        ... 11 more
Caused by: java.lang.IllegalArgumentException: The attribute `value.serializer` on connector 'smallrye-kafka' (channel: foo_out) must be set
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.lambda$getValueSerializer$0(KafkaConnectorOutgoingConfiguration.java:40)
        at java.base/java.util.Optional.orElseThrow(Optional.java:403)
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.getValueSerializer(KafkaConnectorOutgoingConfiguration.java:40)
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.validate(KafkaConnectorOutgoingConfiguration.java:295)
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.<init>(KafkaConnectorOutgoingConfiguration.java:16)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriber(KafkaConnector.java:272)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriber$$superforward(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$10.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext$NextAroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor.intercept(DevModeSupportConnectorFactoryInterceptor.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:70)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:37)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriber(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriber(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:226)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:167)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:115)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:250)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:53)
        ... 19 more

If i instead inject the emitter directly, the error is not thrown and the emitter works correctly.

How to Reproduce?

Using the below snippet causes an exception at startup, even if the bean is not used.

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.reactive.messaging.MutinyEmitter;

@ApplicationScoped
public class Foo
{
    @Channel("foo_out")
    Instance<MutinyEmitter<String>> emitter;

    public void emit()
    {
        emitter.get().sendAndAwait("bar");
    }
}

Output of uname -a or ver

No response

Output of java -version

openjdk version "21.0.5" 2024-10-15 OpenJDK Runtime Environment Homebrew (build 21.0.5) OpenJDK 64-Bit Server VM Homebrew (build 21.0.5, mixed mode, sharing)

Quarkus version or git rev

3.16.3

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.9

Additional information

I can work around this issue by specifying mp.messaging.outgoing.foo_out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer in application.properties.

@rokkolesa rokkolesa added the kind/bug Something isn't working label Nov 14, 2024
Copy link

quarkus-bot bot commented Nov 14, 2024

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

@cescoffier cescoffier added kind/enhancement New feature or request and removed kind/bug Something isn't working labels Nov 14, 2024
@cescoffier
Copy link
Member

@ozangunalp @Ladicek Do you think we can extend the detection to handle this case?

@Ladicek
Copy link
Contributor

Ladicek commented Nov 15, 2024

Yes, that should be possible.

@ozangunalp
Copy link
Contributor

For the Instance injection we do support the recognition of the Emitter in Quarkus (as opposed to upstream), so that's not a problem.
For serde detection, it should not be a problem.

ozangunalp added a commit to ozangunalp/quarkus that referenced this issue Nov 15, 2024
ozangunalp added a commit to ozangunalp/quarkus that referenced this issue Nov 15, 2024
ozangunalp added a commit to ozangunalp/quarkus that referenced this issue Nov 15, 2024
ozangunalp added a commit to ozangunalp/quarkus that referenced this issue Nov 15, 2024
@quarkus-bot quarkus-bot bot added this to the 3.18 - main milestone Nov 15, 2024
bschuhmann pushed a commit to bschuhmann/quarkus that referenced this issue Nov 16, 2024
@gsmet gsmet modified the milestones: 3.18 - main, 3.17.0 Nov 19, 2024
gsmet pushed a commit to gsmet/quarkus that referenced this issue Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka kind/enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants