Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Search pipeline seen executing on transport_worker thread #10248

Closed
austintlee opened this issue Sep 27, 2023 · 16 comments · Fixed by #10598
Closed

[BUG] Search pipeline seen executing on transport_worker thread #10248

austintlee opened this issue Sep 27, 2023 · 16 comments · Fixed by #10598
Assignees
Labels
bug Something isn't working Search:Relevance Search Search query, autocomplete ...etc v2.12.0 Issues and PRs related to version 2.12.0 v3.0.0 Issues and PRs related to version 3.0.0

Comments

@austintlee
Copy link
Contributor

Describe the bug
I have a search processor that uses a org.opensearch.client.Client object to execute a TransportAction. This invocation returns a BaseFuture and the processor does .get(), blocking until the client returns a response. Occasionally, the get() call blocks indefinitely and brings the whole cluster into bad state.

A thread dump on the node that hung revealed that the search processor was executing on a transport_worker thread.

"opensearch[opensearch-node1][transport_worker][T#2]" #32 daemon prio=5 os_prio=0 cpu=61810.65ms elapsed=43062.56s allocated=2771M defined_classes=251      tid=0x0000fffef4009140 nid=0x13d waiting on condition  [0x0000ffff88cda000]
    java.lang.Thread.State: WAITING (parking)
     at jdk.internal.misc.Unsafe.park(java.base@17.0.8/Native Method)
     - parking to wait for  <0x00000000ee75b640> (a org.opensearch.common.util.concurrent.BaseFuture$Sync)
     at java.util.concurrent.locks.LockSupport.park(java.base@17.0.8/LockSupport.java:211)
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.8/AbstractQueuedSynchronizer.java:715)
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.8/AbstractQueuedSynchronizer.java:1047)
     at org.opensearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:272)
     at org.opensearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:104)
     at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:74)
     at org.opensearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:55)
     at org.opensearch.searchpipelines.questionanswering.generative.llm.DefaultLlmImpl.doChatCompletion(DefaultLlmImpl.java:84)
     at org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAResponseProcessor.processResponse(GenerativeQAResponseProcessor.java:109)
     at org.opensearch.search.pipeline.Pipeline.transformResponse(Pipeline.java:177)
     at org.opensearch.search.pipeline.PipelinedRequest.transformResponse(PipelinedRequest.java:31)
     at org.opensearch.action.search.TransportSearchAction.lambda$executeRequest$0(TransportSearchAction.java:398)
     at org.opensearch.action.search.TransportSearchAction$$Lambda$4884/0x0000003001d5d850.accept(Unknown Source)
     at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
     at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
     at org.opensearch.action.search.AbstractSearchAsyncAction.sendSearchResponse(AbstractSearchAsyncAction.java:671)
     at org.opensearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:132)
     at org.opensearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:428)
     at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:422)

I do see in a few other thread dumps I captured the same processor running on a search thread.

From:

return Transports.assertNotTransportThread(BLOCKING_OP_REASON)

And:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
assert Transports.assertTransportThread();
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();

I can see that we don't expect a blocking call to happen on transport threads and there is code specifically in BaseFuture.get to disallow invocations on transport threads, although assert won't trigger unless the OpenSearch process is run with the ea JVM flag.

Can we ensure that search processors run on search threads? Or are they really allowed to run on transport threads which means that I should not have any blocking calls in my search processor?

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Plugins
Please list all plugins currently enabled.

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

@austintlee austintlee added bug Something isn't working untriaged labels Sep 27, 2023
@austintlee
Copy link
Contributor Author

transport_worker thread example full trace:

"opensearch[opensearch-node1][transport_worker][T#2]" #32 daemon prio=5 os_prio=0 cpu=61810.65ms elapsed=43062.56s allocated=2771M defined_classes=251      tid=0x0000fffef4009140 nid=0x13d waiting on condition  [0x0000ffff88cda000]
    java.lang.Thread.State: WAITING (parking)
     at jdk.internal.misc.Unsafe.park(java.base@17.0.8/Native Method)
     - parking to wait for  <0x00000000ee75b640> (a org.opensearch.common.util.concurrent.BaseFuture$Sync)
     at java.util.concurrent.locks.LockSupport.park(java.base@17.0.8/LockSupport.java:211)
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.8/AbstractQueuedSynchronizer.java:715)
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.8/AbstractQueuedSynchronizer.java:1047)
     at org.opensearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:272)
     at org.opensearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:104)
     at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:74)
     at org.opensearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:55)
     at org.opensearch.searchpipelines.questionanswering.generative.llm.DefaultLlmImpl.doChatCompletion(DefaultLlmImpl.java:84)
     at org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAResponseProcessor.processResponse(GenerativeQAResponseProcessor.java:109)
     at org.opensearch.search.pipeline.Pipeline.transformResponse(Pipeline.java:177)
     at org.opensearch.search.pipeline.PipelinedRequest.transformResponse(PipelinedRequest.java:31)
     at org.opensearch.action.search.TransportSearchAction.lambda$executeRequest$0(TransportSearchAction.java:398)
     at org.opensearch.action.search.TransportSearchAction$$Lambda$4884/0x0000003001d5d850.accept(Unknown Source)
     at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
     at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
     at org.opensearch.action.search.AbstractSearchAsyncAction.sendSearchResponse(AbstractSearchAsyncAction.java:671)
     at org.opensearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:132)
     at org.opensearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:428)
     at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:422)
     at org.opensearch.action.search.FetchSearchPhase.moveToNextPhase(FetchSearchPhase.java:298)
     at org.opensearch.action.search.FetchSearchPhase.lambda$innerRun$1(FetchSearchPhase.java:138)
     at org.opensearch.action.search.FetchSearchPhase$$Lambda$5645/0x0000003001e62400.run(Unknown Source)
     at org.opensearch.action.search.CountedCollector.countDown(CountedCollector.java:66)
     at org.opensearch.action.search.CountedCollector$$Lambda$5941/0x0000003001ee0cc0.run(Unknown Source)
     at org.opensearch.action.search.ArraySearchPhaseResults.consumeResult(ArraySearchPhaseResults.java:61)
     at org.opensearch.action.search.CountedCollector.onResult(CountedCollector.java:74)
     at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:243)
     at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:238)
     at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:59)
     at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:44)
     at org.opensearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:70)
     at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleResponse(SearchTransportService.java:746)
     at org.opensearch.transport.TransportService$6.handleResponse(TransportService.java:880)
     at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1496)
     at org.opensearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:394)
     at org.opensearch.transport.InboundHandler.handleResponse(InboundHandler.java:386)
     at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:161)
     at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:115)
     at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:767)
     at org.opensearch.transport.netty4.Netty4MessageChannelHandler$$Lambda$4575/0x0000003001cd14b8.accept(Unknown Source)
     at org.opensearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:175)
     at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:150)
     at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:115)
     at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95)
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
     at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
     at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
     at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
     at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

    Locked ownable synchronizers:
     - None

@austintlee
Copy link
Contributor Author

search thread example:

"opensearch[demo-node4][search][T#6]@22544" daemon prio=5 tid=0xb0 nid=NA waiting
  java.lang.Thread.State: WAITING
          at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
          at java.util.concurrent.locks.LockSupport.park(LockSupport.java:211)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:715)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047)
          at org.opensearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:272)
          at org.opensearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:104)
          at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:74)
          at org.opensearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:55)
          at org.opensearch.searchpipelines.questionanswering.generative.llm.DefaultLlmImpl.doChatCompletion(DefaultLlmImpl.java:84)
          at org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAResponseProcessor.processResponse(GenerativeQAResponseProcessor.java:102)
          at org.opensearch.search.pipeline.Pipeline.transformResponse(Pipeline.java:177)
          at org.opensearch.search.pipeline.PipelinedRequest.transformResponse(PipelinedRequest.java:31)
          at org.opensearch.action.search.TransportSearchAction.lambda$executeRequest$0(TransportSearchAction.java:398)
          at org.opensearch.action.search.TransportSearchAction$$Lambda$4889/0x00007fb484d4a940.accept(Unknown Source:-1)
          at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
          at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
          at org.opensearch.action.search.AbstractSearchAsyncAction.sendSearchResponse(AbstractSearchAsyncAction.java:671)
          at org.opensearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:132)
          at org.opensearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:428)
          at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:422)
          at org.opensearch.action.search.FetchSearchPhase.moveToNextPhase(FetchSearchPhase.java:298)
          at org.opensearch.action.search.FetchSearchPhase.lambda$innerRun$1(FetchSearchPhase.java:138)
          at org.opensearch.action.search.FetchSearchPhase$$Lambda$5039/0x00007fb484d7d8e0.run(Unknown Source:-1)
          at org.opensearch.action.search.CountedCollector.countDown(CountedCollector.java:66)
          at org.opensearch.action.search.CountedCollector$$Lambda$5893/0x00007fb484f172a8.run(Unknown Source:-1)
          at org.opensearch.action.search.ArraySearchPhaseResults.consumeResult(ArraySearchPhaseResults.java:61)
          at org.opensearch.action.search.CountedCollector.onResult(CountedCollector.java:74)
          at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:243)
          at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:238)
          at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:59)
          at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:44)
          at org.opensearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:70)
          at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleResponse(SearchTransportService.java:746)
          at org.opensearch.transport.TransportService$6.handleResponse(TransportService.java:880)
          at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1496)
          at org.opensearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1579)
          at org.opensearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1559)
          at org.opensearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:71)
          at org.opensearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:62)
          at org.opensearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:45)
          at org.opensearch.search.SearchService$3.onResponse(SearchService.java:1194)
          at org.opensearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:74)
          at org.opensearch.action.ActionRunnable$$Lambda$4821/0x00007fb484d29ba8.accept(Unknown Source:-1)
          at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
          at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
          at org.opensearch.threadpool.TaskAwareRunnable.doRun(TaskAwareRunnable.java:78)
          at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
          at org.opensearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:59)
          at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:908)
          at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.lang.Thread.run(Thread.java:833)

@reta
Copy link
Collaborator

reta commented Sep 27, 2023

@austintlee the difference between these two flows is that the first one (#10248 (comment)) is run against remote node (by coordinator) but the latter one (#10248 (comment)) uses local transports (the coordinator node itself)

@RyanL1997 RyanL1997 added the Search Search query, autocomplete ...etc label Sep 27, 2023
@msfroh
Copy link
Collaborator

msfroh commented Sep 27, 2023

Hmm.... I had been saying that we will eventually need to support async execution in search pipelines. It sounds like that time may be now. For response processors, it's arguably easier than for request processors (since they already get executed on a callback.)

Ingest pipelines assume async execution, but that also leads to #6338.

@heemin32
Copy link
Contributor

Unless other processors in the search pipeline make async call to a transport action, the search processor should be run in search thread I guess? At least that is what I witnessed in ingest pipeline.

Do you have other search processors in the search pipeline which are called before your processor?

@austintlee
Copy link
Contributor Author

@heemin32 not at the moment, but we plan to introduce a search request processor that is also going to invoke a transport action AND pass state (#9405) and this processor will be used in conjunction with the search response process (the one mentioned above).

@heemin32
Copy link
Contributor

heemin32 commented Oct 2, 2023

Okay. Seems like we need to see where the thread is converted from search thread to transport thread in the search pipeline implementation. I agree with @austintlee that search pipeline should run only in search thread which is similar to what ingest pipeline behave. Also, support of async won't do much good here because we lose search back pressure mechanism by releasing search thread earlier than search is completed.

@austintlee
Copy link
Contributor Author

@heemin32 Does the ingest pipeline allow invocation of any transport actions? If so, can you provide an example that I can take a look at?

@msfroh
Copy link
Collaborator

msfroh commented Oct 3, 2023

I cobbled together an implementation of async response processors: msfroh@6b4279c

It's mostly untested (besides unit tests), but it might let you avoid the blocking get() call.

@austintlee
Copy link
Contributor Author

austintlee commented Oct 3, 2023

@heemin32 in both of those examples, I see that you are using .actionGet with a timeout. I also observed that if I simply add a timeout, my test, which reliably hangs without a timeout, runs to completion without actually timing out.

@heemin32
Copy link
Contributor

heemin32 commented Oct 3, 2023

@heemin32 in both of those examples, I see that you are using .actionGet with a timeout. I also observed that if I simply add a timeout, my test, which reliably hangs without a timeout, runs to completion without actually timing out.

That is strange that it does not get timed out with timeout value but got stuck without it..

@msfroh msfroh removed the untriaged label Oct 4, 2023
@msfroh msfroh self-assigned this Oct 4, 2023
@macohen
Copy link
Contributor

macohen commented Oct 5, 2023

How often does this occur? I'm thinking that this may be worth a patch release.

@austintlee
Copy link
Contributor Author

@macohen you need a search processor that does something that causes the thread executing it to block where the thread is a transport thread. At the moment, the new search response processor that we added to the ml-commons plugin is the only one (that we know of) that does something like that and it is an experimental feature and is disabled by default.

Once you enable it, it is fairly easy to get it to hang. You would need a cluster with at least two nodes and a query that runs on an index with two shards across two nodes. This set-up leads to a condition where you are almost guaranteed to have the search pipeline execution to occur on a transport thread and when you have a search processor that blocks the transport thread, it seems to cause the whole cluster to become unstable.

@austintlee
Copy link
Contributor Author

@msfroh I just built OS 2.10 + your fix and tested it on my cluster and it seems to work!

I think we should do a patch release for 2.10. 2.10 has the blocking call without a timeout. This can cause ./gradlew run to fail as there is an assert that triggers if BaseFuture.get is called without a timeout value (it is not triggered even if the timeout is set to 0 as long as it is set).

I already switched to a blocking call with a timeout which will be in 2.11. I have not been able to reproduce the hang after I introduced a timeout, but if we are doing a patch release, we should go with the async approach that Michael put forth.

By the way, @heemin32 I think the geo processor will also fail on gradle run because of this:

public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
assert timeout <= 0 || blockingAllowed();

@msfroh
Copy link
Collaborator

msfroh commented Oct 5, 2023

If we're going to support async execution for response processors, I should probably take care of request processors in the same commit.

I might also try to avoid the problem we see with stack overflows on long chains by adding an "isAsync" method to the interface -- a chain of non-async processors should just execute in a loop. Only async processors should need to do the callback dance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Search:Relevance Search Search query, autocomplete ...etc v2.12.0 Issues and PRs related to version 2.12.0 v3.0.0 Issues and PRs related to version 3.0.0
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

7 participants