Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread blocked on get key inside a watcher #1089

Closed
q12321q opened this issue Aug 18, 2022 · 27 comments
Closed

Thread blocked on get key inside a watcher #1089

q12321q opened this issue Aug 18, 2022 · 27 comments

Comments

@q12321q
Copy link

q12321q commented Aug 18, 2022

Versions

  • etcd: 3.5.1
  • jetcd: 0.7.3
  • java: 11.0.8

Describe the bug
I don't know if it's a bug or a miss usage of the jetcd sdk but since we migrate from jetcd version 0.5.7 to 0.7.3 when we do a get key within a watcher notification we get the exception io.vertx.core.VertxException: Thread blocked and indeed the get key is stuck.

To Reproduce
Here how I reproduce:

    @Test
    public void testWatcherAndGet() throws Exception {

            final Client client = Client.builder().endpoints("http://localhost:32784").build();
            final ByteSequence keyToGet = ByteSequence.from("/keytoGet", StandardCharsets.UTF_8);
            final ByteSequence keyToWatch = ByteSequence.from("/keytoWatch", StandardCharsets.UTF_8);
            final ByteSequence keyValueToWatch = ByteSequence.from("keyValuetoWatch", StandardCharsets.UTF_8);

            CountDownLatch countDownLatch = new CountDownLatch(1);
            client.getWatchClient().watch(keyToWatch, WatchOption.DEFAULT, new Watch.Listener() {

                @Override
                public void onNext(WatchResponse watchResponse) {
                    try {
                        client.getKVClient().get(keyToGet).get(); // stuck there with 0.7.3 but ok with 0.5.7
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void onError(Throwable throwable) { }
                @Override
                public void onCompleted() { }
            });

            client.getKVClient().put(keyToWatch, keyValueToWatch).get();
            countDownLatch.await();
    }

Here is the full exception:

2022-08-18T16:36:31.680Z dev WARN  vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2413 ms, time limit is 2000 ms
2022-08-18T16:36:32.681Z dev WARN  vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3415 ms, time limit is 2000 ms
2022-08-18T16:36:33.682Z dev WARN  vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4415 ms, time limit is 2000 ms
2022-08-18T16:36:34.688Z dev WARN  vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5419 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
        at java.base@11.0.8/jdk.internal.misc.Unsafe.park(Native Method)
        at java.base@11.0.8/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
        at java.base@11.0.8/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
        at java.base@11.0.8/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
        at java.base@11.0.8/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
        at java.base@11.0.8/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1998)
        at app//TestEtcdClient$4.onNext(TestEtcdClient.java:571)
        at app//io.etcd.jetcd.impl.WatchImpl$WatcherImpl.onNext(WatchImpl.java:310)
        at app//io.etcd.jetcd.impl.WatchImpl$WatcherImpl$$Lambda$183/0x00000008004af840.handle(Unknown Source)
        at app//io.vertx.grpc.stub.StreamObserverReadStream.onNext(StreamObserverReadStream.java:37)
        at app//io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:474)
        at app//io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:661)
        at app//io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:646)
        at app//io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at app//io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at app//io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:298)
        at app//io.vertx.grpc.VertxChannelBuilder$$Lambda$175/0x00000008003db840.handle(Unknown Source)
        at app//io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
        at app//io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
        at app//io.vertx.grpc.VertxChannelBuilder.lambda$build$1(VertxChannelBuilder.java:298)
        at app//io.vertx.grpc.VertxChannelBuilder$$Lambda$139/0x0000000800375c40.execute(Unknown Source)
        at app//io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
        at app//io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
        at app//io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:677)
        at app//io.grpc.internal.RetriableStream$Sublistener$6.run(RetriableStream.java:1065)
        at app//io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at app//io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at app//io.grpc.internal.RetriableStream$Sublistener.messagesAvailable(RetriableStream.java:1061)
        at app//io.grpc.internal.ForwardingClientStreamListener.messagesAvailable(ForwardingClientStreamListener.java:39)
        at app//io.grpc.internal.AbstractStream$TransportState.messagesAvailable(AbstractStream.java:182)
        at app//io.grpc.internal.MessageDeframer.processBody(MessageDeframer.java:412)
        at app//io.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:275)
        at app//io.grpc.internal.MessageDeframer.deframe(MessageDeframer.java:177)
        at app//io.grpc.internal.AbstractStream$TransportState.deframe(AbstractStream.java:210)
        at app//io.grpc.internal.AbstractClientStream$TransportState.inboundDataReceived(AbstractClientStream.java:361)
        at app//io.grpc.internal.Http2ClientStreamTransportState.transportDataReceived(Http2ClientStreamTransportState.java:148)
        at app//io.grpc.netty.NettyClientStream$TransportState.transportDataReceived(NettyClientStream.java:348)
        at app//io.grpc.netty.NettyClientHandler.onDataRead(NettyClientHandler.java:387)
        at app//io.grpc.netty.NettyClientHandler.access$1100(NettyClientHandler.java:91)
        at app//io.grpc.netty.NettyClientHandler$FrameListener.onDataRead(NettyClientHandler.java:927)
        at app//io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307)
        at app//io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
        at app//io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415)
        at app//io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250)
        at app//io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
        at app//io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at app//io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
        at app//io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at app//io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at app//io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
        at app//io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
        at app//io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at app//io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
        at app//io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1236)
        at app//io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285)
        at app//io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
        at app//io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
        at app//io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at app//io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at app//io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at app//io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at app//io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at app//io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at app//io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at app//io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base@11.0.8/java.lang.Thread.run(Thread.java:834)

Expected behavior
I expect the test not to be stuck and the get key return its result.

Additional context
NA

@lburgazzoli
Copy link
Collaborator

@q12321q do you have any time to investigate further ?

@q12321q
Copy link
Author

q12321q commented Aug 19, 2022

Hello @lburgazzoli,
Are you able to reproduce?
I tested it with the 0.6.1 version and no issue there as well.
If I wrap

client.getKVClient().get(keyToGet).get();
countDownLatch.countDown();

in a non blocking thread the test works in 0.7.3.

My guess is that the switch to Vertx produce this regression. The watcher notification and the key get are in the event loop (i.e. single thread) and the first one wait for the second hence thread is blocked. This is just a supposition as I'm not very familiar with Vertx.
I can wrap all my watch notifications in non blocking threads but it's not a impact less change so first I would like to understand if it's an expected behavior from the jetcd point of view and where it should be fixed: jetcd library side or users side?

Thanks a lot for your help

@lburgazzoli
Copy link
Collaborator

I can't look at the issue now and probably for a few days so I would love if you can help me on this.
Yes this is a regression and your analysis seems to be reasonable to me.

@q12321q
Copy link
Author

q12321q commented Aug 19, 2022

Thanks a lot for having a look at this issue.
Sure I'll do my best to help you.

@misaya295
Copy link

misaya295 commented Sep 30, 2022

@q12321q May I ask how you can avoid this problem? I am also encountering this problem now.

@lburgazzoli
Copy link
Collaborator

I guess you should use an async call so don't use client.getKVClient().get(keyToGet).get() but i.e. client.getKVClient().get(keyToGet).accept(...)

@moremind
Copy link

I also have the same problem in reactor,but reactor throws BlockingException, I have an idea, can you have async method execute it.

@moremind
Copy link

I don't know the method of get in etcd, but I guess it's blocking? 👂

@lburgazzoli
Copy link
Collaborator

Try with something like this:

public void testWatchAndGet(final Client client) throws Exception {
        final ByteSequence key = randomByteSequence();
        final ByteSequence value = randomByteSequence();
        final AtomicReference<KeyValue> ref = new AtomicReference<>();

        final Consumer<WatchResponse> consumer = response -> {
            for (WatchEvent event : response.getEvents()) {
                if (event.getEventType() == EventType.PUT) {
                    ByteSequence key1 = event.getKeyValue().getKey();

                    client.getKVClient().get(key1).whenComplete((r, t) -> {
                        if (!r.getKvs().isEmpty()) {
                            ref.set(r.getKvs().get(0));
                        }
                    });
                }
            }
        };


        try (Watcher watcher = client.getWatchClient().watch(key, consumer)) {
            client.getKVClient().put(key, value).get();

            await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull());

            assertThat(ref.get()).isNotNull();
            assertThat(ref.get().getKey()).isEqualTo(key);
            assertThat(ref.get().getValue()).isEqualTo(value);
        }
    }

@moremind
Copy link

Try with something like this:

public void testWatchAndGet(final Client client) throws Exception {
        final ByteSequence key = randomByteSequence();
        final ByteSequence value = randomByteSequence();
        final AtomicReference<KeyValue> ref = new AtomicReference<>();

        final Consumer<WatchResponse> consumer = response -> {
            for (WatchEvent event : response.getEvents()) {
                if (event.getEventType() == EventType.PUT) {
                    ByteSequence key1 = event.getKeyValue().getKey();

                    client.getKVClient().get(key1).whenComplete((r, t) -> {
                        if (!r.getKvs().isEmpty()) {
                            ref.set(r.getKvs().get(0));
                        }
                    });
                }
            }
        };


        try (Watcher watcher = client.getWatchClient().watch(key, consumer)) {
            client.getKVClient().put(key, value).get();

            await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull());

            assertThat(ref.get()).isNotNull();
            assertThat(ref.get().getKey()).isEqualTo(key);
            assertThat(ref.get().getValue()).isEqualTo(value);
        }
    }

I have tested this code, the result is as follows
image
The vertx thread always is single, I don't know why do jetcd have a single consume thread?

@lburgazzoli
Copy link
Collaborator

@moremind, @q12321q does my example solve the issue ? if so please close it.

@lburgazzoli
Copy link
Collaborator

The vertx thread always is single, I don't know why do jetcd have a single consume thread?

What is the issue ?

@moremind
Copy link

@moremind, @q12321q does my example solve the issue ? if so please close it.

I think it can solve the bug. thanks for your answer!

@moremind
Copy link

The vertx thread always is single, I don't know why do jetcd have a single consume thread?

What is the issue ?

I think the thread 'vetx.x-eventloop-0' should not be single, but I test it, and it is single

@moremind
Copy link

4985d2dc4047978c23612eb594cdf93

this is new test

@lburgazzoli
Copy link
Collaborator

I still don't get what is the issue as vert.x use event loops so things may end up on the the same thread.
You are free to open another issue and investigate more but jetcd does not do anything particular rather to delegate to vertx

@moremind
Copy link

I still don't get what is the issue as vert.x use event loops so things may end up on the the same thread.
You are free to open another issue and investigate more but jetcd does not do anything particular rather to delegate to vertx

yes, I know that I will reference to vertx

@q12321q
Copy link
Author

q12321q commented Oct 12, 2022

@lburgazzoli for me the issue is that the migration to vert.x force us not to block on some call backs. It's new requirement and a undocumented breaking change.
What will happen if we block the events loop for some time? My guess is that it'll also block all the other ETCD requests in the meantime?

@lburgazzoli
Copy link
Collaborator

@lburgazzoli for me the issue is that the migration to vert.x force us not to block on some call backs. It's new requirement and a undocumented breaking change.

Unfortunately I'm the only maintainer of the project and I asked many time to get some help but no one step up to help so I do what I think it is right and what helps me reducing the maintenance cost. Every time I've opened issues related to similar changes for discussion, I have received no feedback so I assume people do not care or do not use jetcd.

So I'm sorry for the breaking change but given I'm alone and I don't have much time, I can't do much.

What will happen if we block the events loop for some time? My guess is that it'll also block all the other ETCD requests in the meantime?

I think this is something you have to take into account anyway as in general the expectation is that you should use the callback methods to handle responses. That is why we return a CompletableFuture and not a blocking one

@q12321q
Copy link
Author

q12321q commented Oct 12, 2022

Oh sorry if my tone was too direct! I didn't open the issue to blame. Thanks a lot for your work and my bad to not have enough time to help you more in this issue.
My goal here is to clearly understand the new implication of the migration vert.x and find the best way to remediate.

My feeling here is that we may have to wrap the watcher's callbacks in a thread pool. The whenComplete won't solve entirely the issue as our application can block for several seconds.

Maybe we should document this behavior as I feel that we won't be the only ones to have the issue. What do you think?

@moremind
Copy link

thanks for your response, if possible, I am glad to seek the question.

@lburgazzoli
Copy link
Collaborator

Oh sorry if my tone was too direct! I didn't open the issue to blame. Thanks a lot for your work and my bad to not have enough time to help you more in this issue. My goal here is to clearly understand the new implication of the migration vert.x and find the best way to remediate.

No worries at all

My feeling here is that we may have to wrap the watcher's callbacks in a thread pool. The whenComplete won't solve entirely the issue as our application can block for several seconds.

Maybe we should document this behavior as I feel that we won't be the only ones to have the issue. What do you think?

I don't think we should wrap the watcher's callbacks in a thread pool as if the pool become full then you may end blocking anyway so that should be handled by the application code (i.e. put responses in a queue). Honestly the watch should be implemented using the Flow APIs but I had to hold back on this because of people are asking for keep supporting Java 8 ...

@shawven
Copy link

shawven commented Dec 3, 2022

I also encountered this problem. I think callback processing is a good solution

@github-actions
Copy link

github-actions bot commented Feb 2, 2023

This issue is stale because it has been open 60 days with no activity.
Remove stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the stale label Feb 2, 2023
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Feb 9, 2023
@kylooh
Copy link

kylooh commented Dec 13, 2023

I have the similiar problem, my etcd client sometime stucks, and never comes back, while the internet is working perfectly. And in some of cases, for example, I used 4 thread to submit request to get data by the client, and 2 of them is blocked for no reason, but rest of them is still working. I am still trying to figure out what happens, but it seems to be not easy to reproduce this problem. Since then, it happens frequently on our production machine, which is on a private enviroment. But not able to reproduce on our local machine.

@Wackerle
Copy link

Wackerle commented Jan 8, 2024

I still don't get what is the issue as vert.x use event loops so things may end up on the the same thread.
You are free to open another issue and investigate more but jetcd does not do anything particular rather to delegate to vertx

yes, I know that I will reference to vertx

Is there any new progress on this issue? @moremind @lburgazzoli
We have encountered a performance issue, initially suspected to be related to this thread, because it is always single.

@lburgazzoli
Copy link
Collaborator

@Wackerle if you think this is an issue, please open a new one or it will be forgotten and more important, any help would really be appreciated as unfortunately, I don't have much time to dedicate to troubleshoot issues like this one.,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

7 participants