Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong context/worker when terminating push queries #8251

Open
agavra opened this issue Oct 12, 2021 · 0 comments
Open

Wrong context/worker when terminating push queries #8251

agavra opened this issue Oct 12, 2021 · 0 comments
Labels
bug query-engine Issues owned by the ksqlDB Query Engine team

Comments

@agavra
Copy link
Contributor

agavra commented Oct 12, 2021

I got this report from a community member that seems to indicate a bug in our VertX threading.

I'm working on a solution based on KsqlDB running on AWS fargate with an ALB on top of it. For now, we just have one node (to narrow down the possible causes of the error) and we're running push queries. On the client side we're using the Java client.
The streamQuery method works as expected and I can see results flowing. Our problem starts when calling the terminateQuery method, which returns a 500 from server: "On wrong context or worker".

We're using:

  • Ksql 6.2 (Confluent)
  • Kafka 1.0.0
  • Ksql client 6.2.1

server stack trace

[2021-10-11 17:52:32,976] INFO stream-client [_confluent-ksql-default_transient_transient_VISION_STREAM_3555441465477918522_1633974534213-50ee1416-c10d-4840-b829-b2351ff580af] State transition from RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:314)
[2021-10-11 17:52:32,976] ERROR Failed to handle request 500 /close-query (io.confluent.ksql.api.server.FailureHandler:38)
java.lang.IllegalStateException: On wrong context or worker
	at io.confluent.ksql.util.VertxUtils.checkContext(VertxUtils.java:38)
	at io.confluent.ksql.api.server.ConnectionQueryManager.checkContext(ConnectionQueryManager.java:61)
	at io.confluent.ksql.api.server.ConnectionQueryManager.access$100(ConnectionQueryManager.java:31)
	at io.confluent.ksql.api.server.ConnectionQueryManager$ConnectionQueries.removeQuery(ConnectionQueryManager.java:81)
	at io.confluent.ksql.api.server.PushQueryHolder.close(PushQueryHolder.java:50)
	at io.confluent.ksql.api.server.CloseQueryHandler.handle(CloseQueryHandler.java:58)
	at io.confluent.ksql.api.server.CloseQueryHandler.handle(CloseQueryHandler.java:31)
	at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1038)
	at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:101)
	at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:132)
	at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.doEnd(BodyHandlerImpl.java:296)
	at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.end(BodyHandlerImpl.java:276)
	at io.vertx.ext.web.handler.impl.BodyHandlerImpl.lambda$handle$0(BodyHandlerImpl.java:87)
	at io.vertx.core.http.impl.Http2ServerRequestImpl.handleEnd(Http2ServerRequestImpl.java:198)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:68)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:107)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:100)
	at io.vertx.core.http.impl.Http2ConnectionBase.lambda$onDataRead$10(Http2ConnectionBase.java:323)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:323)
	at io.vertx.core.http.impl.Http2ServerConnection.onDataRead(Http2ServerConnection.java:42)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:318)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$PrefaceFrameListener.onDataRead(DefaultHttp2ConnectionDecoder.java:701)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:416)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:181)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:242)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:418)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61)
	at io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:44)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

client stack trace

2021-10-11 17:48:53.769 [clientInboundChannel-137] INFO  c.d.d.v.k.handler.QueryService - QueryInstance: EED103A3898E43256C8ECCCCF02AB9CC, Session ID: da870490-615e-450a-ae5e-e510b77f3d93. Running query: SELECT page_url,session_id FROM VISION_STREAM WHERE session_id = '200' EMIT CHANGES;
2021-10-11 17:48:54.611 [clientInboundChannel-137] INFO  c.d.d.v.k.u.KsqlClientService - Query Hash: EED103A3898E43256C8ECCCCF02AB9CC. Query started with KSQL Server ID: 44163b35-bcb4-4b8d-8d71-854816c341e0
2021-10-11 17:48:54.611 [vert.x-eventloop-thread-2] INFO  c.d.d.v.k.utils.RowSubscriber - KSQL query with ID 44163b35-bcb4-4b8d-8d71-854816c341e0 pushing to stomp topic /topic/EED103A3898E43256C8ECCCCF02AB9CC
2021-10-11 17:48:54.616 [clientInboundChannel-137] INFO  c.d.d.v.k.h.QuerySessionAdministrator - Query Hash: EED103A3898E43256C8ECCCCF02AB9CC, Session ID: da870490-615e-450a-ae5e-e510b77f3d93. Session started at: 2021-10-11T17:48:54.611
2021-10-11 17:52:32.969 [clientInboundChannel-138] INFO  c.d.d.v.k.h.QuerySessionAdministrator - Query Hash: EED103A3898E43256C8ECCCCF02AB9CC, Session ID: da870490-615e-450a-ae5e-e510b77f3d93. Removing session
2021-10-11 17:52:32.969 [clientInboundChannel-138] INFO  c.d.d.v.k.handler.QueryService - Query Hash: EED103A3898E43256C8ECCCCF02AB9CC. No remaining sessions. Removing query from KSQL with ID 44163b35-bcb4-4b8d-8d71-854816c341e0
2021-10-11 17:52:32.978 [vert.x-eventloop-thread-2] INFO  c.d.d.v.k.utils.RowSubscriber - KSQL Query with ID 44163b35-bcb4-4b8d-8d71-854816c341e0 with stomp topic /topic/EED103A3898E43256C8ECCCCF02AB9CC closed
2021-10-11 17:52:32.984 [clientInboundChannel-138] ERROR c.d.d.v.k.w.WebSocketController - Error
java.util.concurrent.ExecutionException: io.confluent.ksql.api.client.exception.KsqlClientException: Received 500 response from server: On wrong context or worker. Error code: 50000
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
	at com.disney.data.vision.ksql.utils.KsqlClientService.terminateQuery(KsqlClientService.java:39)
	at com.disney.data.vision.ksql.handler.QueryService.unsubscribe(QueryService.java:106)
	at com.disney.data.vision.ksql.websocket.WebSocketController.unsubscribe(WebSocketController.java:41)
	at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:502)
	at org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler.handleMatch(SimpAnnotationMethodMessageHandler.java:497)
	at org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler.handleMatch(SimpAnnotationMethodMessageHandler.java:87)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:461)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:399)
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:135)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.confluent.ksql.api.client.exception.KsqlClientException: Received 500 response from server: On wrong context or worker. Error code: 50000
	at io.confluent.ksql.api.client.impl.ClientImpl.lambda$handleErrorResponse$19(ClientImpl.java:504)
	at io.vertx.core.http.impl.HttpClientResponseImpl$BodyHandler.notifyHandler(HttpClientResponseImpl.java:292)
	at io.vertx.core.http.impl.HttpClientResponseImpl.lambda$bodyHandler$0(HttpClientResponseImpl.java:193)
	at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:248)
	at io.vertx.core.http.impl.Http2ClientConnection$Http2ClientStream.handleEnd(Http2ClientConnection.java:260)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:68)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:107)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:100)
	at io.vertx.core.http.impl.Http2ConnectionBase.lambda$onDataRead$10(Http2ConnectionBase.java:323)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:323)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:318)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:181)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:418)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1534)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1283)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1330)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 common frames omitted%
@agavra agavra added bug needs-triage query-engine Issues owned by the ksqlDB Query Engine team labels Oct 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug query-engine Issues owned by the ksqlDB Query Engine team
Projects
None yet
Development

No branches or pull requests

2 participants