From 7fc0d467467fea99bf0b49272c912533db252343 Mon Sep 17 00:00:00 2001 From: "shengwei.psw" Date: Mon, 18 Jan 2021 19:56:41 +0800 Subject: [PATCH] #2 --- .../remoting/api/PortUnificationServer.java | 29 +++++++++++++++---- .../protocol/tri/GracefulShutdownHandler.java | 27 ++++++++++++++++- .../rpc/protocol/tri/TripleClientHandler.java | 11 +++++++ .../tri/TripleHttp2FrameServerHandler.java | 2 ++ 4 files changed, 63 insertions(+), 6 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java index fa4e8a82627..0276af933ca 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java @@ -16,6 +16,10 @@ */ package org.apache.dubbo.remoting.api; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.logger.Logger; @@ -43,6 +47,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.logging.Level; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY; @@ -151,11 +156,25 @@ protected void doClose() throws Throwable { channel.close(); channel = null; } - - channelGroup.close(); - Thread.sleep(15000); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); + long start = System.currentTimeMillis(); + channelGroup.close().addListener(new GenericFutureListener>() { + @Override + public void operationComplete(final Future future) throws Exception { + if (!future.isSuccess()) { + logger.warn("Error shutting down server", future.cause()); + } + System.out.println("shut operationComplete.."); + long now = System.currentTimeMillis(); + while (now - start > 15000 || channelGroup.size() > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } + } + }).await(); + } catch (InterruptedException e) { + logger.warn("Interrupted while shutting down", e); } for (WireProtocol protocol : protocols) { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdownHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdownHandler.java index f397b3fcfcf..86058336a24 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdownHandler.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdownHandler.java @@ -4,12 +4,37 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2ChannelDuplexHandler; +import io.netty.handler.codec.http2.Http2PingFrame; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import static org.apache.dubbo.rpc.protocol.tri.GracefulShutdown.GRACEFUL_SHUTDOWN_PING; public class GracefulShutdownHandler extends Http2ChannelDuplexHandler { + private GracefulShutdown gracefulShutdown; + private static final Logger logger = LoggerFactory.getLogger(GracefulShutdownHandler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + + if (msg instanceof Http2PingFrame) { + if (((Http2PingFrame)msg).content() == GRACEFUL_SHUTDOWN_PING) { + if (gracefulShutdown == null) { + // this should never happen + logger.warn("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null"); + } else { + gracefulShutdown.secondGoAwayAndClose(ctx); + } + } + } + } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - new GracefulShutdown(ctx,"app_requested", null).gracefulShutdown(); + if (gracefulShutdown == null) { + gracefulShutdown = new GracefulShutdown(ctx,"app_requested", null); + gracefulShutdown.gracefulShutdown(); + } } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java index 6daeadf2226..d72ba6647b1 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java @@ -22,6 +22,17 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + if (ctx.channel().isActive()) { // Ignore notification that the socket was closed + final Connection connection = Connection.getConnectionFromChannel(ctx.channel()); + if (connection != null) { + connection.onIdle(); + } + } + super.close(ctx, promise); + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof Http2SettingsFrame){ diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java index a71d394cf9a..63fb5cb9c48 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java @@ -1,5 +1,6 @@ package org.apache.dubbo.rpc.protocol.tri; +import io.netty.handler.codec.http2.Http2PingFrame; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.logger.Logger; @@ -21,6 +22,7 @@ import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2HeadersFrame; +import static org.apache.dubbo.rpc.protocol.tri.GracefulShutdown.GRACEFUL_SHUTDOWN_PING; import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responseErr; import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responsePlainTextError;