Skip to content

Commit

Permalink
Fix channel close event cause hanging thread (#12503)
Browse files Browse the repository at this point in the history
* Fix channel close event cause hanging thread

* Fix check

* fix npe

* fix ut

* fix check

* fix uts

* fix testing
  • Loading branch information
AlbumenJ authored Jun 13, 2023
1 parent 83b4114 commit e6e68f6
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

/**
Expand All @@ -46,7 +47,7 @@ public class ThreadlessExecutor extends AbstractExecutorService {
/**
* Wait thread. It must be visible to other threads and does not need to be thread-safe
*/
private volatile Object waiter;
private final AtomicReference<Object> waiter = new AtomicReference<>();

/**
* Waits until there is a task, executes the task and all queued tasks (if there're any). The task is either a normal
Expand All @@ -56,22 +57,25 @@ public void waitAndDrain(long deadline) throws InterruptedException {
throwIfInterrupted();
Runnable runnable = queue.poll();
if (runnable == null) {
waiter = Thread.currentThread();
try {
while ((runnable = queue.poll()) == null) {
long restTime = deadline - System.nanoTime();
if (restTime <= 0) {
return;
if (waiter.compareAndSet(null, Thread.currentThread())) {
try {
while ((runnable = queue.poll()) == null && waiter.get() == Thread.currentThread()) {
long restTime = deadline - System.nanoTime();
if (restTime <= 0) {
return;
}
LockSupport.parkNanos(this, restTime);
throwIfInterrupted();
}
LockSupport.parkNanos(this, restTime);
throwIfInterrupted();
} finally {
waiter.compareAndSet(Thread.currentThread(), null);
}
} finally {
waiter = null;
}
}
do {
runnable.run();
if (runnable != null) {
runnable.run();
}
} while ((runnable = queue.poll()) != null);
}

Expand All @@ -91,8 +95,8 @@ private static void throwIfInterrupted() throws InterruptedException {
public void execute(Runnable runnable) {
RunnableWrapper run = new RunnableWrapper(runnable);
queue.add(run);
if (waiter != SHUTDOWN) {
LockSupport.unpark((Thread) waiter);
if (waiter.get() != SHUTDOWN) {
LockSupport.unpark((Thread) waiter.get());
} else if (queue.remove(run)) {
throw new RejectedExecutionException();
}
Expand All @@ -109,7 +113,10 @@ public void shutdown() {

@Override
public List<Runnable> shutdownNow() {
waiter = SHUTDOWN;
if (waiter.get() != SHUTDOWN) {
LockSupport.unpark((Thread) waiter.get());
}
waiter.set(SHUTDOWN);
Runnable runnable;
while ((runnable = queue.poll()) != null) {
runnable.run();
Expand All @@ -119,7 +126,7 @@ public List<Runnable> shutdownNow() {

@Override
public boolean isShutdown() {
return waiter == SHUTDOWN;
return waiter.get() == SHUTDOWN;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.resource.GlobalResourceInitializer;
import org.apache.dubbo.common.serialize.SerializationException;
import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
Expand Down Expand Up @@ -197,6 +198,7 @@ public static void received(Channel channel, Response response, boolean timeout)
t.cancel();
}
future.doReceived(response);
shutdownExecutorIfNeeded(future);
} else {
logger.warn(PROTOCOL_TIMEOUT_SERVER, "", "", "The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
Expand All @@ -215,12 +217,20 @@ public boolean cancel(boolean mayInterruptIfRunning) {
errorResult.setStatus(Response.CLIENT_ERROR);
errorResult.setErrorMessage("request future has been canceled.");
this.doReceived(errorResult);
FUTURES.remove(id);
DefaultFuture future = FUTURES.remove(id);
shutdownExecutorIfNeeded(future);
CHANNELS.remove(id);
timeoutCheckTask.cancel();
return true;
}

private static void shutdownExecutorIfNeeded(DefaultFuture future) {
ExecutorService executor = future.getExecutor();
if (executor instanceof ThreadlessExecutor && !executor.isShutdown()) {
executor.shutdownNow();
}
}

public void cancel() {
this.cancel(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void interruptSend() throws Exception {
}

@Test
void testClose() {
void testClose1() {
Channel channel = new MockedChannel();
Request request = new Request(123);
ExecutorService executor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
Expand All @@ -171,6 +171,16 @@ void testClose() {
Assertions.assertFalse(executor.isTerminated());
}

@Test
void testClose2() {
Channel channel = new MockedChannel();
Request request = new Request(123);
ThreadlessExecutor threadlessExecutor = new ThreadlessExecutor();
DefaultFuture.newFuture(channel, request, 1000, threadlessExecutor);
DefaultFuture.closeChannel(channel, 0);
Assertions.assertTrue(threadlessExecutor.isTerminated());
}

/**
* mock a default future
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public Result get() throws InterruptedException, ExecutionException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
try {
while (!responseFuture.isDone()) {
while (!responseFuture.isDone() && !threadlessExecutor.isShutdown()) {
threadlessExecutor.waitAndDrain(Long.MAX_VALUE);
}
} finally {
Expand All @@ -199,7 +199,7 @@ public Result get(long timeout, TimeUnit unit) throws InterruptedException, Exec
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
try {
while (!responseFuture.isDone()) {
while (!responseFuture.isDone() && !threadlessExecutor.isShutdown()) {
long restTime = deadline - System.nanoTime();
if (restTime > 0) {
threadlessExecutor.waitAndDrain(deadline);
Expand Down

0 comments on commit e6e68f6

Please sign in to comment.