diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/PortUnificationServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/PortUnificationServer.java index a8eeb3e676b..75dea24437f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/PortUnificationServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/PortUnificationServer.java @@ -136,8 +136,7 @@ protected void initChannel(SocketChannel ch) throws Exception { final ChannelPipeline p = ch.pipeline(); PortUnificationServerHandler negotiation = new PortUnificationServerHandler(protocols); p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)); - p.addLast("negotiation", - new PortUnificationServerHandler(protocols)); + p.addLast("negotiation", negotiation); channelGroup = negotiation.getChannels(); } } @@ -164,6 +163,7 @@ protected void doClose() throws Throwable { } channelGroup.close(); + Thread.sleep(1000000); } catch (Throwable e) { logger.warn(e.getMessage(), e); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdownHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdownHandler.java new file mode 100644 index 00000000000..a9da650bc6e --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdownHandler.java @@ -0,0 +1,30 @@ +package org.apache.dubbo.rpc.protocol.tri; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.Http2ConnectionDecoder; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.ssl.SslContext; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.remoting.netty4.Http2WireProtocol; + + +public class GracefulShutdownHandler extends Http2FrameCodec { + + public GracefulShutdownHandler() { + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + super.close(ctx, promise); + new GracefulShutdown(ctx,"app_requested", null).gracefulShutdown(); + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java index cbb768f176f..dc46bb8ac76 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java @@ -37,6 +37,7 @@ public TripleRpcException asException(){ enum Code { OK(0), UNKNOWN(2), + DEADLINE_EXCEEDED(4), NOT_FOUND(5), RESOURCE_EXHAUSTED(8), UNIMPLEMENTED(12), diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java index 7002ca621b7..efa8bc1d95f 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java @@ -16,6 +16,7 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.serialize.Serialization2; +import org.apache.dubbo.remoting.TimeoutException; import org.apache.dubbo.rpc.AppResponse; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; @@ -32,6 +33,7 @@ import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; import io.netty.handler.codec.http2.Http2Headers; +import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code; import java.io.IOException; import java.util.concurrent.CompletionStage; @@ -83,6 +85,9 @@ public void halfClose() { future.whenComplete((appResult, t) -> { try { if (t != null) { + if (t instanceof TimeoutException) { + responseErr(ctx, GrpcStatus.fromCode(Code.DEADLINE_EXCEEDED).withCause(t)); + } responseErr(ctx, GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN).withCause(t)); return; } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index d84a9aa6750..19429483208 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -28,6 +28,7 @@ public void configServerPipeline(ChannelPipeline pipeline) { .frameLogger(SERVER_LOGGER) .build(); final Http2MultiplexHandler handler = new Http2MultiplexHandler(new TripleServerInitializer()); + pipeline.addLast("gracefulShutdown", new GracefulShutdownHandler()); pipeline.addLast(codec, handler); }