From 59663c8da5e1be85ca758e4688fc2bdda878dae5 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Wed, 13 Nov 2024 10:47:07 +0800 Subject: [PATCH] WebSocket on Triple (#14390) * WebSocket on Triple * Resolve conflict * Fix issues Fix issues * Merge latest code * Change package * Merge latest code * Merge latest code * Use Filter instead of GenericFilter --- .artifacts | 2 + .../dubbo/config/nested/TripleConfig.java | 11 ++ .../dubbo/config/nested/WebSocketConfig.java | 66 ++++++++ dubbo-dependencies-bom/pom.xml | 13 ++ dubbo-distribution/dubbo-all-shaded/pom.xml | 19 +++ dubbo-distribution/dubbo-all/pom.xml | 19 +++ dubbo-distribution/dubbo-bom/pom.xml | 10 ++ dubbo-distribution/dubbo-core-spi/pom.xml | 3 + .../protocol/tri/servlet/TripleFilter.java | 8 +- dubbo-plugin/dubbo-triple-websocket/pom.xml | 140 +++++++++++++++++ .../websocket/TripleBinaryMessageHandler.java | 42 ++++++ .../tri/websocket/TripleEndpoint.java | 93 ++++++++++++ .../websocket/TripleTextMessageHandler.java | 40 +++++ .../tri/websocket/TripleWebSocketFilter.java | 127 ++++++++++++++++ .../tri/websocket/WebSocketConstants.java | 26 ++++ .../tri/websocket/WebSocketStreamChannel.java | 142 ++++++++++++++++++ dubbo-plugin/pom.xml | 1 + .../remoting/http12/HttpHeaderNames.java | 2 + .../http12/LimitedByteArrayOutputStream.java | 61 ++++++++ .../dubbo-remoting-websocket/pom.xml | 41 +++++ .../remoting/websocket/FinalFragment.java | 22 +++ .../FinalFragmentByteArrayInputStream.java | 46 ++++++ .../FinalFragmentByteBufInputStream.java | 55 +++++++ .../FinalFragmentStreamingDecoder.java | 123 +++++++++++++++ .../websocket/WebSocketHeaderNames.java | 31 ++++ ...bSocketServerTransportListenerFactory.java | 29 ++++ .../websocket/WebSocketTransportListener.java | 21 +++ .../netty4/NettyWebSocketChannel.java | 87 +++++++++++ .../websocket/netty4/WebSocketFrameCodec.java | 140 +++++++++++++++++ .../WebSocketProtocolSelectorHandler.java | 71 +++++++++ .../netty4/WebSocketServerUpgradeCodec.java | 64 ++++++++ dubbo-remoting/pom.xml | 1 + .../java/org/apache/dubbo/rpc/Constants.java | 2 + dubbo-rpc/dubbo-rpc-triple/pom.xml | 5 + .../rpc/protocol/tri/TripleConstants.java | 2 + .../rpc/protocol/tri/TripleHttp2Protocol.java | 35 +++++ .../tri/rest/mapping/meta/MethodMeta.java | 2 +- .../basic/FallbackArgumentResolver.java | 4 + ...faultWebSocketServerTransportListener.java | 107 +++++++++++++ ...bSocketServerTransportListenerFactory.java | 35 +++++ .../WebSocketServerChannelObserver.java | 39 +++++ ...et.WebSocketServerTransportListenerFactory | 1 + .../dubbo-spring-boot-3-autoconfigure/pom.xml | 19 +++ .../DubboTriple3AutoConfiguration.java | 35 ++++- .../dubbo-spring-boot-autoconfigure/pom.xml | 19 +++ .../DubboTripleAutoConfiguration.java | 35 ++++- .../pom.xml | 6 + dubbo-test/dubbo-dependencies-all/pom.xml | 10 ++ 48 files changed, 1898 insertions(+), 14 deletions(-) create mode 100644 dubbo-common/src/main/java/org/apache/dubbo/config/nested/WebSocketConfig.java create mode 100644 dubbo-plugin/dubbo-triple-websocket/pom.xml create mode 100644 dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleBinaryMessageHandler.java create mode 100644 dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleEndpoint.java create mode 100644 dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleTextMessageHandler.java create mode 100644 dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleWebSocketFilter.java create mode 100644 dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketConstants.java create mode 100644 dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketStreamChannel.java create mode 100644 dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteArrayOutputStream.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/pom.xml create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragment.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteArrayInputStream.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteBufInputStream.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentStreamingDecoder.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketHeaderNames.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketServerTransportListenerFactory.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketTransportListener.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/NettyWebSocketChannel.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketFrameCodec.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketProtocolSelectorHandler.java create mode 100644 dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketServerUpgradeCodec.java create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListener.java create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListenerFactory.java create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketServerChannelObserver.java create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory diff --git a/.artifacts b/.artifacts index ef7445d6111..ef2d5e8ece8 100644 --- a/.artifacts +++ b/.artifacts @@ -75,6 +75,7 @@ dubbo-remoting dubbo-remoting-api dubbo-remoting-http12 dubbo-remoting-http3 +dubbo-remoting-websocket dubbo-remoting-netty dubbo-remoting-netty4 dubbo-remoting-zookeeper-curator5 @@ -116,3 +117,4 @@ dubbo-plugin-loom dubbo-rest-jaxrs dubbo-rest-spring dubbo-triple-servlet +dubbo-triple-websocket diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java index 0e81f6f334d..854cb2d0518 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java @@ -152,6 +152,9 @@ public class TripleConfig implements Serializable { @Nested private ServletConfig servlet; + @Nested + private WebSocketConfig websocket; + public Boolean getVerbose() { return verbose; } @@ -370,4 +373,12 @@ public ServletConfig getServlet() { public void setServlet(ServletConfig servlet) { this.servlet = servlet; } + + public WebSocketConfig getWebsocket() { + return websocket; + } + + public void setWebsocket(WebSocketConfig websocket) { + this.websocket = websocket; + } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/WebSocketConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/WebSocketConfig.java new file mode 100644 index 00000000000..e2600f775b6 --- /dev/null +++ b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/WebSocketConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.config.nested; + +import java.io.Serializable; + +public class WebSocketConfig implements Serializable { + + private static final long serialVersionUID = -2504271061733141988L; + + /** + * Whether to enable websocket support, requests are transport through the websocket container + *

The default value is false. + */ + private Boolean enabled; + + /** + * The URL patterns that the websocket filter will be registered for. + *

The default value is '/*'. + */ + private String[] filterUrlPatterns; + + /** + * The order of the websocket filter. + *

The default value is -1000000. + */ + private Integer filterOrder; + + public Boolean getEnabled() { + return enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public String[] getFilterUrlPatterns() { + return filterUrlPatterns; + } + + public void setFilterUrlPatterns(String[] filterUrlPatterns) { + this.filterUrlPatterns = filterUrlPatterns; + } + + public Integer getFilterOrder() { + return filterOrder; + } + + public void setFilterOrder(Integer filterOrder) { + this.filterOrder = filterOrder; + } +} diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 7bf5f82a3db..024759105e8 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -108,6 +108,7 @@ 3.25.5 1.3.2 3.1.0 + 2.2.0 6.1.0 9.4.56.v20240826 3.1.0 @@ -462,6 +463,18 @@ ${jakarta_servlet_version} provided + + jakarta.websocket + jakarta.websocket-api + ${jakarta_websocket_version} + provided + + + jakarta.websocket + jakarta.websocket-client-api + ${jakarta_websocket_version} + provided + com.squareup.okhttp3 okhttp diff --git a/dubbo-distribution/dubbo-all-shaded/pom.xml b/dubbo-distribution/dubbo-all-shaded/pom.xml index 734d1e11488..6066525a2a9 100644 --- a/dubbo-distribution/dubbo-all-shaded/pom.xml +++ b/dubbo-distribution/dubbo-all-shaded/pom.xml @@ -283,6 +283,13 @@ compile true + + org.apache.dubbo + dubbo-triple-websocket + ${project.version} + compile + true + @@ -343,6 +350,13 @@ compile true + + org.apache.dubbo + dubbo-remoting-websocket + ${project.version} + compile + true + org.apache.dubbo dubbo-remoting-netty @@ -500,6 +514,7 @@ org.apache.dubbo:dubbo-remoting-api org.apache.dubbo:dubbo-remoting-http12 org.apache.dubbo:dubbo-remoting-http3 + org.apache.dubbo:dubbo-remoting-websocket org.apache.dubbo:dubbo-remoting-netty4 org.apache.dubbo:dubbo-remoting-netty org.apache.dubbo:dubbo-remoting-zookeeper-curator5 @@ -510,6 +525,7 @@ org.apache.dubbo:dubbo-rest-jaxrs org.apache.dubbo:dubbo-rest-spring org.apache.dubbo:dubbo-triple-servlet + org.apache.dubbo:dubbo-triple-websocket org.apache.dubbo:dubbo-serialization-api org.apache.dubbo:dubbo-serialization-hessian2 org.apache.dubbo:dubbo-serialization-fastjson2 @@ -745,6 +761,9 @@ META-INF/dubbo/internal/org.apache.dubbo.remoting.http3.Http3ServerTransportListenerFactory + + META-INF/dubbo/internal/org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory + META-INF/dubbo/internal/org.apache.dubbo.remoting.telnet.TelnetHandler diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml index 02b928d5ed1..7d6b9f81c37 100644 --- a/dubbo-distribution/dubbo-all/pom.xml +++ b/dubbo-distribution/dubbo-all/pom.xml @@ -283,6 +283,13 @@ compile true + + org.apache.dubbo + dubbo-triple-websocket + ${project.version} + compile + true + @@ -343,6 +350,13 @@ compile true + + org.apache.dubbo + dubbo-remoting-websocket + ${project.version} + compile + true + org.apache.dubbo dubbo-remoting-netty @@ -499,6 +513,7 @@ org.apache.dubbo:dubbo-remoting-api org.apache.dubbo:dubbo-remoting-http12 org.apache.dubbo:dubbo-remoting-http3 + org.apache.dubbo:dubbo-remoting-websocket org.apache.dubbo:dubbo-remoting-netty4 org.apache.dubbo:dubbo-remoting-netty org.apache.dubbo:dubbo-remoting-zookeeper-curator5 @@ -509,6 +524,7 @@ org.apache.dubbo:dubbo-rest-jaxrs org.apache.dubbo:dubbo-rest-spring org.apache.dubbo:dubbo-triple-servlet + org.apache.dubbo:dubbo-triple-websocket org.apache.dubbo:dubbo-serialization-api org.apache.dubbo:dubbo-serialization-hessian2 org.apache.dubbo:dubbo-serialization-fastjson2 @@ -743,6 +759,9 @@ META-INF/dubbo/internal/org.apache.dubbo.remoting.http3.Http3ServerTransportListenerFactory + + META-INF/dubbo/internal/org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory + META-INF/dubbo/internal/org.apache.dubbo.remoting.telnet.TelnetHandler diff --git a/dubbo-distribution/dubbo-bom/pom.xml b/dubbo-distribution/dubbo-bom/pom.xml index 9a93aac9fc0..cf82bbd2ee9 100644 --- a/dubbo-distribution/dubbo-bom/pom.xml +++ b/dubbo-distribution/dubbo-bom/pom.xml @@ -339,6 +339,11 @@ dubbo-triple-servlet ${project.version} + + org.apache.dubbo + dubbo-triple-websocket + ${project.version} + @@ -395,6 +400,11 @@ dubbo-remoting-http3 ${project.version} + + org.apache.dubbo + dubbo-remoting-websocket + ${project.version} + org.apache.dubbo dubbo-remoting-netty diff --git a/dubbo-distribution/dubbo-core-spi/pom.xml b/dubbo-distribution/dubbo-core-spi/pom.xml index 431959407dc..e535005b44a 100644 --- a/dubbo-distribution/dubbo-core-spi/pom.xml +++ b/dubbo-distribution/dubbo-core-spi/pom.xml @@ -363,6 +363,9 @@ META-INF/dubbo/internal/org.apache.dubbo.remoting.http3.Http3ServerTransportListenerFactory + + META-INF/dubbo/internal/org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory + META-INF/dubbo/internal/org.apache.dubbo.remoting.telnet.TelnetHandler diff --git a/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java b/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java index 891cc244409..e93ed1cd09c 100644 --- a/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java +++ b/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java @@ -59,6 +59,8 @@ import java.util.Arrays; import java.util.Set; +import static org.apache.dubbo.rpc.protocol.tri.TripleConstants.UPGRADE_HEADER_KEY; + public class TripleFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(TripleFilter.class); @@ -86,7 +88,7 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo return; } } else { - if (mappingRegistry.exists(request.getRequestURI(), request.getMethod())) { + if (!isUpgradeRequest(request) && mappingRegistry.exists(request.getRequestURI(), request.getMethod())) { handleHttp1(request, response); return; } @@ -188,6 +190,10 @@ private static int resolveTimeout(HttpServletRequest request, boolean isGrpc) { return 0; } + private boolean isUpgradeRequest(HttpServletRequest request) { + return request.getHeader(UPGRADE_HEADER_KEY) != null; + } + private static final class TripleAsyncListener implements AsyncListener { private final ServletStreamChannel streamChannel; diff --git a/dubbo-plugin/dubbo-triple-websocket/pom.xml b/dubbo-plugin/dubbo-triple-websocket/pom.xml new file mode 100644 index 00000000000..c7f465a66ad --- /dev/null +++ b/dubbo-plugin/dubbo-triple-websocket/pom.xml @@ -0,0 +1,140 @@ + + + + 4.0.0 + + org.apache.dubbo + dubbo-plugin + ${revision} + ../pom.xml + + + dubbo-triple-websocket + + + 4.0.1 + 1.1 + ${project.build.directory}/generated-sources/java/org/apache/dubbo/rpc/protocol/tri + + + + + org.apache.dubbo + dubbo-rpc-triple + ${project.version} + + + javax.servlet + javax.servlet-api + ${servlet4_version} + provided + + + jakarta.servlet + jakarta.servlet-api + provided + + + javax.websocket + javax.websocket-api + ${websocket_version} + provided + + + jakarta.websocket + jakarta.websocket-api + provided + + + jakarta.websocket + jakarta.websocket-client-api + provided + + + org.apache.dubbo + dubbo-remoting-netty4 + ${project.version} + test + + + + + + jdk-version-ge-17 + + [17,) + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + copy-sources + + run + + generate-sources + + + + + + + + + + + + + + + + import org.apache.dubbo.rpc.protocol.tri.ServletExchanger; + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-sources + + add-source + + generate-sources + + + ${project.build.directory}/generated-sources/java + + + + + + + + + + diff --git a/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleBinaryMessageHandler.java b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleBinaryMessageHandler.java new file mode 100644 index 00000000000..a507f6f1e42 --- /dev/null +++ b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleBinaryMessageHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame; +import org.apache.dubbo.remoting.websocket.FinalFragmentByteArrayInputStream; +import org.apache.dubbo.remoting.websocket.WebSocketTransportListener; + +import javax.websocket.MessageHandler; + +import java.nio.ByteBuffer; + +public class TripleBinaryMessageHandler implements MessageHandler.Partial { + + private final WebSocketTransportListener webSocketTransportListener; + + public TripleBinaryMessageHandler(WebSocketTransportListener webSocketTransportListener) { + this.webSocketTransportListener = webSocketTransportListener; + } + + @Override + public void onMessage(ByteBuffer messagePart, boolean last) { + Http2InputMessage http2InputMessage = + new Http2InputMessageFrame(new FinalFragmentByteArrayInputStream(messagePart.array(), last), false); + webSocketTransportListener.onData(http2InputMessage); + } +} diff --git a/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleEndpoint.java b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleEndpoint.java new file mode 100644 index 00000000000..54eb85fb6be --- /dev/null +++ b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleEndpoint.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.io.StreamUtils; +import org.apache.dubbo.config.context.ConfigManager; +import org.apache.dubbo.config.nested.TripleConfig; +import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpHeaders; +import org.apache.dubbo.remoting.http12.HttpMethods; +import org.apache.dubbo.remoting.http12.HttpStatus; +import org.apache.dubbo.remoting.http12.h2.Http2Header; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame; +import org.apache.dubbo.remoting.http12.h2.Http2MetadataFrame; +import org.apache.dubbo.remoting.http12.message.DefaultHttpHeaders; +import org.apache.dubbo.remoting.websocket.WebSocketTransportListener; +import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.protocol.tri.ServletExchanger; + +import javax.websocket.CloseReason; +import javax.websocket.CloseReason.CloseCodes; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.Session; + +import static org.apache.dubbo.rpc.protocol.tri.websocket.WebSocketConstants.TRIPLE_WEBSOCKET_LISTENER; + +public class TripleEndpoint extends Endpoint { + + @Override + public void onOpen(Session session, EndpointConfig config) { + String path = session.getRequestURI().getPath(); + HttpHeaders httpHeaders = new DefaultHttpHeaders(); + httpHeaders.set(HttpHeaderNames.PATH.getName(), path); + httpHeaders.set(HttpHeaderNames.METHOD.getName(), HttpMethods.POST.name()); + Http2Header http2Header = new Http2MetadataFrame(httpHeaders); + + URL url = ServletExchanger.getUrl(); + TripleConfig tripleConfig = ConfigManager.getProtocolOrDefault(url).getTripleOrDefault(); + + WebSocketStreamChannel webSocketStreamChannel = new WebSocketStreamChannel(session, tripleConfig); + WebSocketTransportListener webSocketTransportListener = + DefaultWebSocketServerTransportListenerFactory.INSTANCE.newInstance( + webSocketStreamChannel, url, FrameworkModel.defaultModel()); + webSocketTransportListener.onMetadata(http2Header); + session.addMessageHandler(new TripleTextMessageHandler(webSocketTransportListener)); + session.addMessageHandler(new TripleBinaryMessageHandler(webSocketTransportListener)); + session.getUserProperties().put(TRIPLE_WEBSOCKET_LISTENER, webSocketTransportListener); + } + + @Override + public void onClose(Session session, CloseReason closeReason) { + super.onClose(session, closeReason); + WebSocketTransportListener webSocketTransportListener = + (WebSocketTransportListener) session.getUserProperties().get(TRIPLE_WEBSOCKET_LISTENER); + if (webSocketTransportListener == null) { + return; + } + if (closeReason.getCloseCode().getCode() == CloseCodes.NORMAL_CLOSURE.getCode()) { + Http2InputMessage http2InputMessage = new Http2InputMessageFrame(StreamUtils.EMPTY, true); + webSocketTransportListener.onData(http2InputMessage); + return; + } + webSocketTransportListener.cancelByRemote(closeReason.getCloseCode().getCode()); + } + + @Override + public void onError(Session session, Throwable thr) { + super.onError(session, thr); + WebSocketTransportListener webSocketTransportListener = + (WebSocketTransportListener) session.getUserProperties().get(TRIPLE_WEBSOCKET_LISTENER); + if (webSocketTransportListener == null) { + return; + } + webSocketTransportListener.cancelByRemote(HttpStatus.INTERNAL_SERVER_ERROR.getCode()); + } +} diff --git a/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleTextMessageHandler.java b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleTextMessageHandler.java new file mode 100644 index 00000000000..2aa962d6d18 --- /dev/null +++ b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleTextMessageHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame; +import org.apache.dubbo.remoting.websocket.FinalFragmentByteArrayInputStream; +import org.apache.dubbo.remoting.websocket.WebSocketTransportListener; + +import javax.websocket.MessageHandler; + +public class TripleTextMessageHandler implements MessageHandler.Partial { + + private final WebSocketTransportListener webSocketTransportListener; + + public TripleTextMessageHandler(WebSocketTransportListener webSocketTransportListener) { + this.webSocketTransportListener = webSocketTransportListener; + } + + @Override + public void onMessage(String messagePart, boolean last) { + Http2InputMessage http2InputMessage = + new Http2InputMessageFrame(new FinalFragmentByteArrayInputStream(messagePart.getBytes(), last), false); + webSocketTransportListener.onData(http2InputMessage); + } +} diff --git a/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleWebSocketFilter.java b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleWebSocketFilter.java new file mode 100644 index 00000000000..90391a38141 --- /dev/null +++ b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/TripleWebSocketFilter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.remoting.http12.HttpMethods; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; +import javax.servlet.http.HttpServletResponse; +import javax.websocket.server.ServerContainer; +import javax.websocket.server.ServerEndpointConfig; + +import java.io.IOException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST; +import static org.apache.dubbo.rpc.protocol.tri.TripleConstants.UPGRADE_HEADER_KEY; +import static org.apache.dubbo.rpc.protocol.tri.websocket.WebSocketConstants.TRIPLE_WEBSOCKET_REMOTE_ADDRESS; +import static org.apache.dubbo.rpc.protocol.tri.websocket.WebSocketConstants.TRIPLE_WEBSOCKET_UPGRADE_HEADER_VALUE; + +public class TripleWebSocketFilter implements Filter { + + private static final ErrorTypeAwareLogger LOG = LoggerFactory.getErrorTypeAwareLogger(TripleWebSocketFilter.class); + + private transient ServerContainer sc; + + private final Set existed = new ConcurrentHashSet<>(); + + @Override + public void init(FilterConfig filterConfig) { + sc = (ServerContainer) filterConfig.getServletContext().getAttribute(ServerContainer.class.getName()); + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + if (!isWebSocketUpgradeRequest(request, response)) { + chain.doFilter(request, response); + return; + } + HttpServletRequest hRequest = (HttpServletRequest) request; + HttpServletResponse hResponse = (HttpServletResponse) response; + String path; + String pathInfo = hRequest.getPathInfo(); + if (pathInfo == null) { + path = hRequest.getServletPath(); + } else { + path = hRequest.getServletPath() + pathInfo; + } + Map copiedMap = new HashMap<>(hRequest.getParameterMap()); + copiedMap.put( + TRIPLE_WEBSOCKET_REMOTE_ADDRESS, + new String[] {hRequest.getRemoteHost(), String.valueOf(hRequest.getRemotePort())}); + HttpServletRequestWrapper wrappedRequest = new HttpServletRequestWrapper(hRequest) { + @Override + public Map getParameterMap() { + return copiedMap; + } + }; + if (existed.contains(path)) { + chain.doFilter(wrappedRequest, hResponse); + return; + } + ServerEndpointConfig serverEndpointConfig = + ServerEndpointConfig.Builder.create(TripleEndpoint.class, path).build(); + try { + sc.addEndpoint(serverEndpointConfig); + existed.add(path); + } catch (Exception e) { + LOG.error(PROTOCOL_FAILED_REQUEST, "", "", "Failed to add endpoint", e); + hResponse.sendError(HttpServletResponse.SC_BAD_REQUEST); + return; + } + chain.doFilter(wrappedRequest, hResponse); + } + + @Override + public void destroy() {} + + public boolean isWebSocketUpgradeRequest(ServletRequest request, ServletResponse response) { + return ((request instanceof HttpServletRequest) + && (response instanceof HttpServletResponse) + && headerContainsToken( + (HttpServletRequest) request, UPGRADE_HEADER_KEY, TRIPLE_WEBSOCKET_UPGRADE_HEADER_VALUE) + && HttpMethods.GET.name().equals(((HttpServletRequest) request).getMethod())); + } + + private boolean headerContainsToken(HttpServletRequest req, String headerName, String target) { + Enumeration headers = req.getHeaders(headerName); + while (headers.hasMoreElements()) { + String header = headers.nextElement(); + String[] tokens = header.split(","); + for (String token : tokens) { + if (target.equalsIgnoreCase(token.trim())) { + return true; + } + } + } + return false; + } +} diff --git a/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketConstants.java b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketConstants.java new file mode 100644 index 00000000000..467e5ae318b --- /dev/null +++ b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketConstants.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +public interface WebSocketConstants { + + String TRIPLE_WEBSOCKET_UPGRADE_HEADER_VALUE = "websocket"; + + String TRIPLE_WEBSOCKET_REMOTE_ADDRESS = "tri.websocket.remote.address"; + + String TRIPLE_WEBSOCKET_LISTENER = "tri.websocket.listener"; +} diff --git a/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketStreamChannel.java b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketStreamChannel.java new file mode 100644 index 00000000000..a92235883a6 --- /dev/null +++ b/dubbo-plugin/dubbo-triple-websocket/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketStreamChannel.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.config.nested.TripleConfig; +import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpHeaders; +import org.apache.dubbo.remoting.http12.HttpMetadata; +import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.HttpStatus; +import org.apache.dubbo.remoting.http12.LimitedByteArrayOutputStream; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.remoting.http12.h2.Http2Header; +import org.apache.dubbo.remoting.http12.h2.Http2OutputMessage; +import org.apache.dubbo.remoting.http12.h2.Http2OutputMessageFrame; +import org.apache.dubbo.remoting.websocket.WebSocketHeaderNames; + +import javax.websocket.CloseReason; +import javax.websocket.Session; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.apache.dubbo.rpc.protocol.tri.websocket.WebSocketConstants.TRIPLE_WEBSOCKET_REMOTE_ADDRESS; + +public class WebSocketStreamChannel implements H2StreamChannel { + + private final Session session; + + private final TripleConfig tripleConfig; + + private final InetSocketAddress remoteAddress; + + private final InetSocketAddress localAddress; + + public WebSocketStreamChannel(Session session, TripleConfig tripleConfig) { + this.session = session; + this.tripleConfig = tripleConfig; + Map> requestParameterMap = session.getRequestParameterMap(); + List remoteAddressData = requestParameterMap.get(TRIPLE_WEBSOCKET_REMOTE_ADDRESS); + this.remoteAddress = InetSocketAddress.createUnresolved( + remoteAddressData.get(0), Integer.parseInt(remoteAddressData.get(1))); + this.localAddress = InetSocketAddress.createUnresolved( + session.getRequestURI().getHost(), session.getRequestURI().getPort()); + } + + @Override + public CompletableFuture writeResetFrame(long errorCode) { + CompletableFuture completableFuture = new CompletableFuture<>(); + try { + session.close(); + completableFuture.complete(null); + } catch (IOException e) { + completableFuture.completeExceptionally(e); + } + return completableFuture; + } + + @Override + public Http2OutputMessage newOutputMessage(boolean endStream) { + return new Http2OutputMessageFrame( + new LimitedByteArrayOutputStream(256, tripleConfig.getMaxResponseBodySizeOrDefault()), endStream); + } + + @Override + public CompletableFuture writeHeader(HttpMetadata httpMetadata) { + Http2Header http2Header = (Http2Header) httpMetadata; + CompletableFuture completableFuture = new CompletableFuture<>(); + if (http2Header.isEndStream()) { + try { + session.close(encodeCloseReason(http2Header)); + completableFuture.complete(null); + } catch (IOException e) { + completableFuture.completeExceptionally(e); + } + } + return completableFuture; + } + + @Override + public CompletableFuture writeMessage(HttpOutputMessage httpOutputMessage) { + ByteArrayOutputStream body = (ByteArrayOutputStream) httpOutputMessage.getBody(); + CompletableFuture completableFuture = new CompletableFuture<>(); + try { + session.getBasicRemote().sendBinary(ByteBuffer.wrap(body.toByteArray())); + completableFuture.complete(null); + } catch (IOException e) { + completableFuture.completeExceptionally(e); + } + return completableFuture; + } + + @Override + public SocketAddress remoteAddress() { + return remoteAddress; + } + + @Override + public SocketAddress localAddress() { + return localAddress; + } + + @Override + public void flush() {} + + private CloseReason encodeCloseReason(Http2Header http2Header) { + HttpHeaders headers = http2Header.headers(); + List statusHeaders = headers.remove(HttpHeaderNames.STATUS.getName()); + CloseReason closeReason; + if (CollectionUtils.isNotEmpty(statusHeaders) + && !HttpStatus.OK.getStatusString().equals(statusHeaders.get(0))) { + List messageHeaders = headers.remove(WebSocketHeaderNames.WEBSOCKET_MESSAGE.getName()); + closeReason = new CloseReason( + CloseReason.CloseCodes.UNEXPECTED_CONDITION, + CollectionUtils.isNotEmpty(messageHeaders) ? messageHeaders.get(0) : "Internal server error"); + } else { + closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Bye"); + } + return closeReason; + } +} diff --git a/dubbo-plugin/pom.xml b/dubbo-plugin/pom.xml index 1cfb0efb141..8a54f6b958d 100644 --- a/dubbo-plugin/pom.xml +++ b/dubbo-plugin/pom.xml @@ -43,6 +43,7 @@ dubbo-rest-jaxrs dubbo-rest-spring dubbo-triple-servlet + dubbo-triple-websocket false diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java index 5272913d3ee..a701870f73d 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java @@ -24,6 +24,8 @@ public enum HttpHeaderNames { PATH(PseudoHeaderName.PATH.value()), + METHOD(PseudoHeaderName.METHOD.value()), + ACCEPT(io.netty.handler.codec.http.HttpHeaderNames.ACCEPT), CONTENT_TYPE(io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE), diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteArrayOutputStream.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteArrayOutputStream.java new file mode 100644 index 00000000000..7596f332ded --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteArrayOutputStream.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12; + +import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class LimitedByteArrayOutputStream extends ByteArrayOutputStream { + + private final int capacity; + + public LimitedByteArrayOutputStream(int capacity) { + super(); + this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity; + } + + public LimitedByteArrayOutputStream(int size, int capacity) { + super(size); + this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity; + } + + @Override + public void write(int b) { + ensureCapacity(1); + super.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + ensureCapacity(b.length); + super.write(b); + } + + @Override + public void write(byte[] b, int off, int len) { + ensureCapacity(len); + super.write(b, off, len); + } + + private void ensureCapacity(int len) { + if (size() + len > capacity) { + throw new HttpOverPayloadException("Response Entity Too Large"); + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/pom.xml b/dubbo-remoting/dubbo-remoting-websocket/pom.xml new file mode 100644 index 00000000000..b27869d056a --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + org.apache.dubbo + dubbo-remoting + ${revision} + ../pom.xml + + dubbo-remoting-websocket + jar + ${project.artifactId} + The websocket remoting module of dubbo project + + false + + + + + org.apache.dubbo + dubbo-remoting-http12 + ${project.parent.version} + + + diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragment.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragment.java new file mode 100644 index 00000000000..c3028962df7 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragment.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket; + +public interface FinalFragment { + + boolean isFinalFragment(); +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteArrayInputStream.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteArrayInputStream.java new file mode 100644 index 00000000000..0176e352656 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteArrayInputStream.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket; + +import java.io.ByteArrayInputStream; + +public class FinalFragmentByteArrayInputStream extends ByteArrayInputStream implements FinalFragment { + + private final boolean finalFragment; + + public FinalFragmentByteArrayInputStream(byte[] buf) { + this(buf, 0, buf.length); + } + + public FinalFragmentByteArrayInputStream(byte[] buf, boolean finalFragment) { + this(buf, 0, buf.length, finalFragment); + } + + public FinalFragmentByteArrayInputStream(byte[] buf, int offset, int length) { + this(buf, offset, length, false); + } + + public FinalFragmentByteArrayInputStream(byte[] buf, int offset, int length, boolean finalFragment) { + super(buf, offset, length); + this.finalFragment = finalFragment; + } + + @Override + public boolean isFinalFragment() { + return finalFragment; + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteBufInputStream.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteBufInputStream.java new file mode 100644 index 00000000000..5bd5e1fe944 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentByteBufInputStream.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; + +public class FinalFragmentByteBufInputStream extends ByteBufInputStream implements FinalFragment { + + private final boolean finalFragment; + + public FinalFragmentByteBufInputStream(ByteBuf buffer) { + this(buffer, buffer.readableBytes()); + } + + public FinalFragmentByteBufInputStream(ByteBuf buffer, int length) { + this(buffer, length, false); + } + + public FinalFragmentByteBufInputStream(ByteBuf buffer, boolean releaseOnClose) { + this(buffer, buffer.readableBytes(), releaseOnClose); + } + + public FinalFragmentByteBufInputStream(ByteBuf buffer, boolean releaseOnClose, boolean finalFragment) { + this(buffer, buffer.readableBytes(), releaseOnClose, finalFragment); + } + + public FinalFragmentByteBufInputStream(ByteBuf buffer, int length, boolean releaseOnClose) { + this(buffer, length, releaseOnClose, false); + } + + public FinalFragmentByteBufInputStream(ByteBuf buffer, int length, boolean releaseOnClose, boolean finalFragment) { + super(buffer, length, releaseOnClose); + this.finalFragment = finalFragment; + } + + @Override + public boolean isFinalFragment() { + return finalFragment; + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentStreamingDecoder.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentStreamingDecoder.java new file mode 100644 index 00000000000..d5530171186 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/FinalFragmentStreamingDecoder.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket; + +import org.apache.dubbo.remoting.http12.CompositeInputStream; +import org.apache.dubbo.remoting.http12.exception.DecodeException; +import org.apache.dubbo.remoting.http12.message.StreamingDecoder; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class FinalFragmentStreamingDecoder implements StreamingDecoder { + + private boolean inDelivery; + + private boolean pendingDelivery; + + private boolean closed; + + private boolean closing; + + protected final CompositeInputStream accumulate = new CompositeInputStream(); + + protected FragmentListener listener; + + @Override + public void request(int numMessages) {} + + @Override + public void decode(InputStream inputStream) throws DecodeException { + if (closing || closed) { + // ignored + return; + } + accumulate.addInputStream(inputStream); + if (inputStream instanceof FinalFragment && ((FinalFragment) inputStream).isFinalFragment()) { + pendingDelivery = true; + deliver(); + } + } + + @Override + public void close() { + closing = true; + deliver(); + } + + @Override + public void onStreamClosed() { + if (closed) { + return; + } + closed = true; + try { + accumulate.close(); + } catch (IOException e) { + throw new DecodeException(e); + } + } + + @Override + public void setFragmentListener(FragmentListener listener) { + this.listener = listener; + } + + private void deliver() { + if (inDelivery) { + return; + } + if (closed) { + return; + } + inDelivery = true; + try { + if (pendingDelivery) { + processBody(); + pendingDelivery = false; + } + if (closing) { + if (!closed) { + closed = true; + accumulate.close(); + listener.onClose(); + } + } + } catch (IOException e) { + throw new DecodeException(e); + } finally { + inDelivery = false; + } + } + + private void processBody() throws IOException { + byte[] rawMessage = readRawMessage(accumulate, accumulate.available()); + InputStream inputStream = new ByteArrayInputStream(rawMessage); + invokeListener(inputStream); + } + + protected void invokeListener(InputStream inputStream) { + this.listener.onFragmentMessage(inputStream); + } + + protected byte[] readRawMessage(InputStream inputStream, int length) throws IOException { + byte[] data = new byte[length]; + inputStream.read(data, 0, length); + return data; + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketHeaderNames.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketHeaderNames.java new file mode 100644 index 00000000000..b5a1d87f9a0 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketHeaderNames.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket; + +public enum WebSocketHeaderNames { + WEBSOCKET_MESSAGE("websocket-message"); + + private final String name; + + WebSocketHeaderNames(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketServerTransportListenerFactory.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketServerTransportListenerFactory.java new file mode 100644 index 00000000000..94ea29f9a3c --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketServerTransportListenerFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.ExtensionScope; +import org.apache.dubbo.common.extension.SPI; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.rpc.model.FrameworkModel; + +@SPI(scope = ExtensionScope.FRAMEWORK) +public interface WebSocketServerTransportListenerFactory { + + WebSocketTransportListener newInstance(H2StreamChannel streamChannel, URL url, FrameworkModel frameworkModel); +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketTransportListener.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketTransportListener.java new file mode 100644 index 00000000000..09fb6ba47cf --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/WebSocketTransportListener.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket; + +import org.apache.dubbo.remoting.http12.h2.Http2TransportListener; + +public interface WebSocketTransportListener extends Http2TransportListener {} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/NettyWebSocketChannel.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/NettyWebSocketChannel.java new file mode 100644 index 00000000000..261687562c8 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/NettyWebSocketChannel.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket.netty4; + +import org.apache.dubbo.config.nested.TripleConfig; +import org.apache.dubbo.remoting.http12.HttpMetadata; +import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.remoting.http12.h2.Http2OutputMessage; +import org.apache.dubbo.remoting.http12.h2.Http2OutputMessageFrame; +import org.apache.dubbo.remoting.http12.netty4.NettyHttpChannelFutureListener; + +import java.net.SocketAddress; +import java.util.concurrent.CompletableFuture; + +import io.netty.channel.Channel; + +public class NettyWebSocketChannel implements H2StreamChannel { + + private final Channel channel; + + private final TripleConfig tripleConfig; + + public NettyWebSocketChannel(Channel channel, TripleConfig tripleConfig) { + this.channel = channel; + this.tripleConfig = tripleConfig; + } + + @Override + public CompletableFuture writeResetFrame(long errorCode) { + NettyHttpChannelFutureListener futureListener = new NettyHttpChannelFutureListener(); + channel.close().addListener(futureListener); + return futureListener; + } + + @Override + public Http2OutputMessage newOutputMessage(boolean endStream) { + return new Http2OutputMessageFrame( + new LimitedByteBufOutputStream( + channel.alloc().buffer(), tripleConfig.getMaxResponseBodySizeOrDefault()), + endStream); + } + + @Override + public CompletableFuture writeHeader(HttpMetadata httpMetadata) { + NettyHttpChannelFutureListener futureListener = new NettyHttpChannelFutureListener(); + channel.write(httpMetadata).addListener(futureListener); + return futureListener; + } + + @Override + public CompletableFuture writeMessage(HttpOutputMessage httpOutputMessage) { + NettyHttpChannelFutureListener futureListener = new NettyHttpChannelFutureListener(); + channel.write(httpOutputMessage).addListener(futureListener); + return futureListener; + } + + @Override + public SocketAddress remoteAddress() { + return channel.remoteAddress(); + } + + @Override + public SocketAddress localAddress() { + return channel.localAddress(); + } + + @Override + public void flush() { + channel.flush(); + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketFrameCodec.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketFrameCodec.java new file mode 100644 index 00000000000..0bf25d1dac4 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketFrameCodec.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket.netty4; + +import org.apache.dubbo.common.io.StreamUtils; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpHeaders; +import org.apache.dubbo.remoting.http12.HttpMethods; +import org.apache.dubbo.remoting.http12.HttpStatus; +import org.apache.dubbo.remoting.http12.h2.Http2Header; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame; +import org.apache.dubbo.remoting.http12.h2.Http2MetadataFrame; +import org.apache.dubbo.remoting.http12.h2.Http2OutputMessage; +import org.apache.dubbo.remoting.http12.netty4.h1.NettyHttp1HttpHeaders; +import org.apache.dubbo.remoting.websocket.FinalFragmentByteBufInputStream; +import org.apache.dubbo.remoting.websocket.WebSocketHeaderNames; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; + +public class WebSocketFrameCodec extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof BinaryWebSocketFrame || msg instanceof TextWebSocketFrame) { + Http2InputMessage http2InputMessage = onDataFrame((WebSocketFrame) msg); + super.channelRead(ctx, http2InputMessage); + } else if (msg instanceof CloseWebSocketFrame) { + Object closeMessage = onCloseFrame((CloseWebSocketFrame) msg); + super.channelRead(ctx, closeMessage); + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof Http2OutputMessage) { + WebSocketFrame webSocketFrame = encodeWebSocketFrame(ctx, (Http2OutputMessage) msg); + super.write(ctx, webSocketFrame, promise); + } else if (msg instanceof Http2Header) { + Http2Header http2Header = (Http2Header) msg; + if (http2Header.isEndStream()) { + CloseWebSocketFrame closeWebSocketFrame = encodeCloseWebSocketFrame(http2Header); + super.write(ctx, closeWebSocketFrame, promise); + } + } else { + super.write(ctx, msg, promise); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { + Http2Header http2Header = onHandshakeComplete((WebSocketServerProtocolHandler.HandshakeComplete) evt); + super.channelRead(ctx, http2Header); + } else { + super.userEventTriggered(ctx, evt); + } + } + + private Http2Header onHandshakeComplete(WebSocketServerProtocolHandler.HandshakeComplete evt) { + HttpHeaders httpHeaders = new NettyHttp1HttpHeaders(evt.requestHeaders()); + httpHeaders.set(HttpHeaderNames.PATH.getName(), evt.requestUri()); + httpHeaders.set(HttpHeaderNames.METHOD.getName(), HttpMethods.POST.name()); + return new Http2MetadataFrame(httpHeaders); + } + + private Http2InputMessageFrame onDataFrame(WebSocketFrame webSocketFrame) { + ByteBuf data = webSocketFrame.content(); + return new Http2InputMessageFrame( + new FinalFragmentByteBufInputStream(data, true, webSocketFrame.isFinalFragment()), false); + } + + private Object onCloseFrame(CloseWebSocketFrame closeWebSocketFrame) { + if (closeWebSocketFrame.statusCode() != WebSocketCloseStatus.NORMAL_CLOSURE.code()) { + return new DefaultHttp2ResetFrame(closeWebSocketFrame.statusCode()); + } + return new Http2InputMessageFrame(StreamUtils.EMPTY, true); + } + + private CloseWebSocketFrame encodeCloseWebSocketFrame(Http2Header http2Header) { + HttpHeaders headers = http2Header.headers(); + List statusHeaders = headers.remove(HttpHeaderNames.STATUS.getName()); + WebSocketCloseStatus status = WebSocketCloseStatus.NORMAL_CLOSURE; + if (CollectionUtils.isNotEmpty(statusHeaders) + && !HttpStatus.OK.getStatusString().equals(statusHeaders.get(0))) { + List messageHeaders = headers.remove(WebSocketHeaderNames.WEBSOCKET_MESSAGE.getName()); + status = new WebSocketCloseStatus( + WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), + CollectionUtils.isNotEmpty(messageHeaders) + ? messageHeaders.get(0) + : WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); + } + return new CloseWebSocketFrame(status); + } + + private WebSocketFrame encodeWebSocketFrame(ChannelHandlerContext ctx, Http2OutputMessage outputMessage) + throws IOException { + OutputStream body = outputMessage.getBody(); + if (body == null) { + return new BinaryWebSocketFrame(); + } + if (body instanceof ByteBufOutputStream) { + ByteBuf buffer = ((ByteBufOutputStream) body).buffer(); + return new BinaryWebSocketFrame(buffer); + } + throw new IllegalArgumentException("Http2OutputMessage body must be ByteBufOutputStream"); + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketProtocolSelectorHandler.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketProtocolSelectorHandler.java new file mode 100644 index 00000000000..1ac8bfbdb98 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketProtocolSelectorHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket.netty4; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.config.nested.TripleConfig; +import org.apache.dubbo.remoting.http12.command.HttpWriteQueue; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.remoting.http12.h2.command.Http2WriteQueueChannel; +import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler; +import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameHandler; +import org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory; +import org.apache.dubbo.remoting.websocket.WebSocketTransportListener; +import org.apache.dubbo.rpc.model.FrameworkModel; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpRequest; + +public class WebSocketProtocolSelectorHandler extends SimpleChannelInboundHandler { + + private final URL url; + + private final FrameworkModel frameworkModel; + + private final TripleConfig tripleConfig; + + private final WebSocketServerTransportListenerFactory defaultWebSocketServerTransportListenerFactory; + + public WebSocketProtocolSelectorHandler( + URL url, + FrameworkModel frameworkModel, + TripleConfig tripleConfig, + WebSocketServerTransportListenerFactory defaultWebSocketServerTransportListenerFactory) { + this.url = url; + this.frameworkModel = frameworkModel; + this.tripleConfig = tripleConfig; + this.defaultWebSocketServerTransportListenerFactory = defaultWebSocketServerTransportListenerFactory; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) { + H2StreamChannel streamChannel = new NettyWebSocketChannel(ctx.channel(), tripleConfig); + HttpWriteQueueHandler writeQueueHandler = ctx.channel().pipeline().get(HttpWriteQueueHandler.class); + if (writeQueueHandler != null) { + HttpWriteQueue writeQueue = writeQueueHandler.getWriteQueue(); + streamChannel = new Http2WriteQueueChannel(streamChannel, writeQueue); + } + WebSocketTransportListener webSocketTransportListener = + defaultWebSocketServerTransportListenerFactory.newInstance(streamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> webSocketTransportListener.close()); + ctx.pipeline() + .addLast(new NettyHttp2FrameHandler(streamChannel, webSocketTransportListener)) + .remove(this); + ctx.fireChannelRead(msg.retain()); + } +} diff --git a/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketServerUpgradeCodec.java b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketServerUpgradeCodec.java new file mode 100644 index 00000000000..1970d2162a3 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-websocket/src/main/java/org/apache/dubbo/remoting/websocket/netty4/WebSocketServerUpgradeCodec.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.websocket.netty4; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; + +public class WebSocketServerUpgradeCodec implements HttpServerUpgradeHandler.UpgradeCodec { + + private final List> shouldRemoveChannelHandlers; + + private final ChannelHandler[] channelHandlers; + + public WebSocketServerUpgradeCodec( + List> shouldRemoveChannelHandlers, ChannelHandler... channelHandlers) { + this.shouldRemoveChannelHandlers = shouldRemoveChannelHandlers; + this.channelHandlers = channelHandlers; + } + + @Override + public Collection requiredUpgradeHeaders() { + return Collections.emptyList(); + } + + @Override + public boolean prepareUpgradeResponse( + ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, HttpHeaders upgradeHeaders) { + if (shouldRemoveChannelHandlers != null) { + for (Class shouldRemoveChannelHandler : shouldRemoveChannelHandlers) { + ctx.pipeline().remove(shouldRemoveChannelHandler); + } + } + if (channelHandlers != null) { + for (ChannelHandler channelHandler : channelHandlers) { + ctx.pipeline().addLast(channelHandler); + } + } + return false; + } + + @Override + public void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest) {} +} diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml index 2384d0c81ad..2546529bda7 100644 --- a/dubbo-remoting/pom.xml +++ b/dubbo-remoting/pom.xml @@ -34,6 +34,7 @@ dubbo-remoting-netty4 dubbo-remoting-http12 dubbo-remoting-http3 + dubbo-remoting-websocket false diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java index d7512eeb88d..59f0235de85 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java @@ -103,6 +103,8 @@ public interface Constants { String HTTP3_KEY = "http3"; + String TRIPLE_SERVLET_KEY = "triple.servlet"; + String H2_SETTINGS_SUPPORT_NO_LOWER_HEADER_KEY = "dubbo.rpc.tri.support-no-lower-header"; String H2_SETTINGS_IGNORE_1_0_0_KEY = "dubbo.rpc.tri.ignore-1.0.0-version"; String H2_SETTINGS_RESOLVE_FALLBACK_TO_DEFAULT_KEY = "dubbo.rpc.tri.resolve-fallback-to-default"; diff --git a/dubbo-rpc/dubbo-rpc-triple/pom.xml b/dubbo-rpc/dubbo-rpc-triple/pom.xml index 2e7085832f9..929b7944441 100644 --- a/dubbo-rpc/dubbo-rpc-triple/pom.xml +++ b/dubbo-rpc/dubbo-rpc-triple/pom.xml @@ -47,6 +47,11 @@ ${project.parent.version} true + + org.apache.dubbo + dubbo-remoting-websocket + ${project.parent.version} + org.apache.dubbo dubbo-remoting-netty4 diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstants.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstants.java index 0635bcbef3f..3cf37a4b120 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstants.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstants.java @@ -40,5 +40,7 @@ public final class TripleConstants { public static final String TRIPLE_HANDLER_TYPE_REST = "rest"; public static final String TRIPLE_HANDLER_TYPE_GRPC = "grpc"; + public static final String UPGRADE_HEADER_KEY = "Upgrade"; + private TripleConstants() {} } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index a04f4a9d217..bbd53f4ab15 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -32,6 +32,9 @@ import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameCodec; import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2ProtocolSelectorHandler; import org.apache.dubbo.remoting.utils.UrlUtils; +import org.apache.dubbo.remoting.websocket.netty4.WebSocketFrameCodec; +import org.apache.dubbo.remoting.websocket.netty4.WebSocketProtocolSelectorHandler; +import org.apache.dubbo.remoting.websocket.netty4.WebSocketServerUpgradeCodec; import org.apache.dubbo.rpc.model.FrameworkModel; import org.apache.dubbo.rpc.model.ScopeModelAware; import org.apache.dubbo.rpc.protocol.tri.h12.TripleProtocolDetector; @@ -40,16 +43,23 @@ import org.apache.dubbo.rpc.protocol.tri.transport.TripleGoAwayHandler; import org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler; import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler; +import org.apache.dubbo.rpc.protocol.tri.websocket.DefaultWebSocketServerTransportListenerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolConfig; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2FrameCodecBuilder; @@ -152,6 +162,21 @@ private void configurerHttp1Handlers(URL url, List handlers) { new TripleServerConnectionHandler(), buildHttp2MultiplexHandler(url, tripleConfig), new TripleTailHandler()); + } else if (AsciiString.contentEquals(HttpHeaderValues.WEBSOCKET, protocol)) { + return new WebSocketServerUpgradeCodec( + Arrays.asList( + HttpObjectAggregator.class, + NettyHttp1Codec.class, + NettyHttp1ConnectionHandler.class), + new WebSocketServerCompressionHandler(), + new HttpWriteQueueHandler(), + new WebSocketProtocolSelectorHandler( + url, + frameworkModel, + tripleConfig, + DefaultWebSocketServerTransportListenerFactory.INSTANCE), + buildWebSocketServerProtocolHandler(tripleConfig), + new WebSocketFrameCodec()); } // Not upgrade request return null; @@ -205,4 +230,14 @@ private Http2FrameCodec buildHttp2FrameCodec(TripleConfig tripleConfig) { .validateHeaders(false) .build(); } + + private WebSocketServerProtocolHandler buildWebSocketServerProtocolHandler(TripleConfig tripleConfig) { + return new WebSocketServerProtocolHandler(WebSocketServerProtocolConfig.newBuilder() + .checkStartsWith(true) + .handleCloseFrames(false) + .decoderConfig(WebSocketDecoderConfig.newBuilder() + .maxFramePayloadLength(tripleConfig.getMaxBodySizeOrDefault()) + .build()) + .build()); + } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/MethodMeta.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/MethodMeta.java index 5e3f7c2eac7..87667690738 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/MethodMeta.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/MethodMeta.java @@ -153,7 +153,7 @@ public String toShortString() { return MethodUtils.toShortString(method); } - private static final class StreamParameterMeta extends ParameterMeta { + public static final class StreamParameterMeta extends ParameterMeta { private final Class type; private final Type genericType; diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java index 35c40c881e9..92735f485b0 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java @@ -28,6 +28,7 @@ import org.apache.dubbo.rpc.protocol.tri.rest.argument.AbstractArgumentResolver; import org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta.AnnotationMeta; import org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta.MethodMeta; +import org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta.MethodMeta.StreamParameterMeta; import org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta.MethodParameterMeta; import org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta.NamedValueMeta; import org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta.ParameterMeta; @@ -60,6 +61,9 @@ protected NamedValueMeta createNamedValueMeta(ParameterMeta param) { } } paramCount = methodMeta.getMethodDescriptor().getRpcType() != RpcType.UNARY ? 1 : paramMetas.length; + } else if (param instanceof StreamParameterMeta) { + paramCount = 1; + noBodyParam = false; } return new FallbackNamedValueMeta(param.isAnnotated(Annotations.Nonnull), noBodyParam, paramCount); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListener.java new file mode 100644 index 00000000000..e43367d7b79 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListener.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.remoting.http12.HttpHeaders; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.remoting.http12.h2.Http2Header; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; +import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver; +import org.apache.dubbo.remoting.http12.message.StreamingDecoder; +import org.apache.dubbo.remoting.websocket.FinalFragmentStreamingDecoder; +import org.apache.dubbo.remoting.websocket.WebSocketHeaderNames; +import org.apache.dubbo.remoting.websocket.WebSocketTransportListener; +import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.model.MethodDescriptor.RpcType; +import org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListener; + +import java.util.concurrent.Executor; + +public class DefaultWebSocketServerTransportListener extends GenericHttp2ServerTransportListener + implements WebSocketTransportListener { + + private boolean autoClose = false; + + public DefaultWebSocketServerTransportListener( + H2StreamChannel h2StreamChannel, URL url, FrameworkModel frameworkModel) { + super(h2StreamChannel, url, frameworkModel); + } + + @Override + protected void onBeforeMetadata(Http2Header metadata) {} + + @Override + protected Executor initializeExecutor(URL url, Http2Header metadata) { + return getExecutor(url, metadata); + } + + @Override + protected void onPrepareMetadata(Http2Header metadata) { + doRoute(metadata); + } + + @Override + protected StreamingDecoder newStreamingDecoder() { + return new FinalFragmentStreamingDecoder(); + } + + @Override + protected Http2ServerChannelObserver newResponseObserver(H2StreamChannel h2StreamChannel) { + return new WebSocketServerChannelObserver(getFrameworkModel(), h2StreamChannel); + } + + @Override + protected Http2ServerChannelObserver newStreamResponseObserver(H2StreamChannel h2StreamChannel) { + return new WebSocketServerChannelObserver(getFrameworkModel(), h2StreamChannel); + } + + @Override + protected Http2ServerChannelObserver prepareResponseObserver(Http2ServerChannelObserver responseObserver) { + responseObserver.addTrailersCustomizer(this::customizeWebSocketStatus); + return super.prepareResponseObserver(responseObserver); + } + + @Override + protected void prepareUnaryServerCall() { + autoClose = true; + super.prepareUnaryServerCall(); + } + + @Override + protected void prepareStreamServerCall() { + if (getContext().getMethodDescriptor().getRpcType().equals(RpcType.SERVER_STREAM)) { + autoClose = true; + } + super.prepareStreamServerCall(); + } + + @Override + protected void onDataCompletion(Http2InputMessage message) { + if (autoClose) { + getStreamingDecoder().close(); + return; + } + super.onDataCompletion(message); + } + + private void customizeWebSocketStatus(HttpHeaders httpHeaders, Throwable throwable) { + if (throwable != null) { + httpHeaders.set(WebSocketHeaderNames.WEBSOCKET_MESSAGE.getName(), throwable.getMessage()); + } + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListenerFactory.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListenerFactory.java new file mode 100644 index 00000000000..fd5c331f5e6 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/DefaultWebSocketServerTransportListenerFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory; +import org.apache.dubbo.remoting.websocket.WebSocketTransportListener; +import org.apache.dubbo.rpc.model.FrameworkModel; + +public class DefaultWebSocketServerTransportListenerFactory implements WebSocketServerTransportListenerFactory { + + public static final WebSocketServerTransportListenerFactory INSTANCE = + new DefaultWebSocketServerTransportListenerFactory(); + + @Override + public WebSocketTransportListener newInstance( + H2StreamChannel streamChannel, URL url, FrameworkModel frameworkModel) { + return new DefaultWebSocketServerTransportListener(streamChannel, url, frameworkModel); + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketServerChannelObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketServerChannelObserver.java new file mode 100644 index 00000000000..3590dbc0269 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/websocket/WebSocketServerChannelObserver.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.websocket; + +import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2StreamServerChannelObserver; + +public class WebSocketServerChannelObserver extends Http2StreamServerChannelObserver { + + protected WebSocketServerChannelObserver(FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) { + super(frameworkModel, h2StreamChannel); + } + + @Override + protected void doOnNext(Object data) throws Throwable { + int statusCode = resolveStatusCode(data); + HttpOutputMessage httpOutputMessage = buildMessage(statusCode, data); + sendMessage(httpOutputMessage); + } + + @Override + protected void doOnError(Throwable throwable) {} +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory new file mode 100644 index 00000000000..eb51344a149 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.websocket.WebSocketServerTransportListenerFactory @@ -0,0 +1 @@ +default=org.apache.dubbo.rpc.protocol.tri.websocket.DefaultWebSocketServerTransportListenerFactory diff --git a/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/pom.xml b/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/pom.xml index 07ec5b92a4a..01830da529c 100644 --- a/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/pom.xml +++ b/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/pom.xml @@ -67,6 +67,13 @@ true + + org.apache.dubbo + dubbo-triple-websocket + ${project.version} + true + + jakarta.servlet jakarta.servlet-api @@ -77,5 +84,17 @@ tomcat-embed-core provided + + + jakarta.websocket + jakarta.websocket-api + provided + + + + jakarta.websocket + jakarta.websocket-client-api + provided + diff --git a/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTriple3AutoConfiguration.java b/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTriple3AutoConfiguration.java index 8caac7a29a7..08ab713a552 100644 --- a/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTriple3AutoConfiguration.java +++ b/dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTriple3AutoConfiguration.java @@ -18,6 +18,7 @@ import org.apache.dubbo.rpc.protocol.tri.ServletExchanger; import org.apache.dubbo.rpc.protocol.tri.servlet.jakarta.TripleFilter; +import org.apache.dubbo.rpc.protocol.tri.websocket.jakarta.TripleWebSocketFilter; import jakarta.servlet.Filter; import org.apache.coyote.ProtocolHandler; @@ -39,18 +40,20 @@ @Conditional(SpringBoot3Condition.class) public class DubboTriple3AutoConfiguration { - public static final String PREFIX = "dubbo.protocol.triple.servlet"; + public static final String SERVLET_PREFIX = "dubbo.protocol.triple.servlet"; + + public static final String WEBSOCKET_PREFIX = "dubbo.protocol.triple.websocket"; @Configuration(proxyBeanMethods = false) @ConditionalOnClass(Filter.class) @ConditionalOnWebApplication(type = Type.SERVLET) - @ConditionalOnProperty(prefix = PREFIX, name = "enabled", havingValue = "true") + @ConditionalOnProperty(prefix = SERVLET_PREFIX, name = "enabled", havingValue = "true") public static class TripleServletConfiguration { @Bean public FilterRegistrationBean tripleProtocolFilter( - @Value("${" + PREFIX + ".filter-url-patterns:/*}") String[] urlPatterns, - @Value("${" + PREFIX + ".filter-order:-1000000}") int order, + @Value("${" + SERVLET_PREFIX + ".filter-url-patterns:/*}") String[] urlPatterns, + @Value("${" + SERVLET_PREFIX + ".filter-order:-1000000}") int order, @Value("${server.port:8080}") int serverPort) { ServletExchanger.bindServerPort(serverPort); FilterRegistrationBean registrationBean = new FilterRegistrationBean<>(); @@ -62,9 +65,9 @@ public FilterRegistrationBean tripleProtocolFilter( @Bean @ConditionalOnClass(Http2Protocol.class) - @ConditionalOnProperty(prefix = PREFIX, name = "max-concurrent-streams") + @ConditionalOnProperty(prefix = SERVLET_PREFIX, name = "max-concurrent-streams") public WebServerFactoryCustomizer tripleTomcatHttp2Customizer( - @Value("${" + PREFIX + ".max-concurrent-streams}") int maxConcurrentStreams) { + @Value("${" + SERVLET_PREFIX + ".max-concurrent-streams}") int maxConcurrentStreams) { return factory -> factory.addConnectorCustomizers(connector -> { ProtocolHandler handler = connector.getProtocolHandler(); for (UpgradeProtocol upgradeProtocol : handler.findUpgradeProtocols()) { @@ -78,4 +81,24 @@ public WebServerFactoryCustomizer tripleTomc }); } } + + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(Filter.class) + @ConditionalOnWebApplication(type = Type.SERVLET) + @ConditionalOnProperty(prefix = WEBSOCKET_PREFIX, name = "enabled", havingValue = "true") + public static class TripleWebSocketConfiguration { + + @Bean + public FilterRegistrationBean tripleWebSocketFilter( + @Value("${" + WEBSOCKET_PREFIX + ".filter-url-patterns:/*}") String[] urlPatterns, + @Value("${" + WEBSOCKET_PREFIX + ".filter-order:-1000000}") int order, + @Value("${server.port:8080}") int serverPort) { + ServletExchanger.bindServerPort(serverPort); + FilterRegistrationBean registrationBean = new FilterRegistrationBean<>(); + registrationBean.setFilter(new TripleWebSocketFilter()); + registrationBean.addUrlPatterns(urlPatterns); + registrationBean.setOrder(order); + return registrationBean; + } + } } diff --git a/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/pom.xml b/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/pom.xml index 665f791a606..d8eeab98f43 100644 --- a/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/pom.xml +++ b/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/pom.xml @@ -82,6 +82,13 @@ true + + org.apache.dubbo + dubbo-triple-websocket + ${project.version} + true + + javax.servlet javax.servlet-api @@ -93,6 +100,18 @@ provided + + jakarta.websocket + jakarta.websocket-api + provided + + + + jakarta.websocket + jakarta.websocket-client-api + provided + + io.micrometer diff --git a/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTripleAutoConfiguration.java b/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTripleAutoConfiguration.java index 3dfecdb9529..c238865a0ff 100644 --- a/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTripleAutoConfiguration.java +++ b/dubbo-spring-boot-project/dubbo-spring-boot-autoconfigure/src/main/java/org/apache/dubbo/spring/boot/autoconfigure/DubboTripleAutoConfiguration.java @@ -18,6 +18,7 @@ import org.apache.dubbo.rpc.protocol.tri.ServletExchanger; import org.apache.dubbo.rpc.protocol.tri.servlet.TripleFilter; +import org.apache.dubbo.rpc.protocol.tri.websocket.TripleWebSocketFilter; import javax.servlet.Filter; @@ -40,18 +41,20 @@ @Conditional(SpringBoot12Condition.class) public class DubboTripleAutoConfiguration { - public static final String PREFIX = "dubbo.protocol.triple.servlet"; + public static final String SERVLET_PREFIX = "dubbo.protocol.triple.servlet"; + + public static final String WEBSOCKET_PREFIX = "dubbo.protocol.triple.websocket"; @Configuration(proxyBeanMethods = false) @ConditionalOnClass(Filter.class) @ConditionalOnWebApplication(type = Type.SERVLET) - @ConditionalOnProperty(prefix = PREFIX, name = "enabled", havingValue = "true") + @ConditionalOnProperty(prefix = SERVLET_PREFIX, name = "enabled", havingValue = "true") public static class TripleServletConfiguration { @Bean public FilterRegistrationBean tripleProtocolFilter( - @Value("${" + PREFIX + ".filter-url-patterns:/*}") String[] urlPatterns, - @Value("${" + PREFIX + ".filter-order:-1000000}") int order, + @Value("${" + SERVLET_PREFIX + ".filter-url-patterns:/*}") String[] urlPatterns, + @Value("${" + SERVLET_PREFIX + ".filter-order:-1000000}") int order, @Value("${server.port:8080}") int serverPort) { ServletExchanger.bindServerPort(serverPort); FilterRegistrationBean registrationBean = new FilterRegistrationBean<>(); @@ -63,9 +66,9 @@ public FilterRegistrationBean tripleProtocolFilter( @Bean @ConditionalOnClass(Http2Protocol.class) - @ConditionalOnProperty(prefix = PREFIX, name = "max-concurrent-streams") + @ConditionalOnProperty(prefix = SERVLET_PREFIX, name = "max-concurrent-streams") public WebServerFactoryCustomizer tripleTomcatHttp2Customizer( - @Value("${" + PREFIX + ".max-concurrent-streams}") int maxConcurrentStreams) { + @Value("${" + SERVLET_PREFIX + ".max-concurrent-streams}") int maxConcurrentStreams) { return factory -> factory.addConnectorCustomizers(connector -> { ProtocolHandler handler = connector.getProtocolHandler(); for (UpgradeProtocol upgradeProtocol : handler.findUpgradeProtocols()) { @@ -79,4 +82,24 @@ public WebServerFactoryCustomizer tripleTomc }); } } + + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(Filter.class) + @ConditionalOnWebApplication(type = Type.SERVLET) + @ConditionalOnProperty(prefix = WEBSOCKET_PREFIX, name = "enabled", havingValue = "true") + public static class TripleWebSocketConfiguration { + + @Bean + public FilterRegistrationBean tripleWebSocketFilter( + @Value("${" + WEBSOCKET_PREFIX + ".filter-url-patterns:/*}") String[] urlPatterns, + @Value("${" + WEBSOCKET_PREFIX + ".filter-order:-1000000}") int order, + @Value("${server.port:8080}") int serverPort) { + ServletExchanger.bindServerPort(serverPort); + FilterRegistrationBean registrationBean = new FilterRegistrationBean<>(); + registrationBean.setFilter(new TripleWebSocketFilter()); + registrationBean.addUrlPatterns(urlPatterns); + registrationBean.setOrder(order); + return registrationBean; + } + } } diff --git a/dubbo-spring-boot-project/dubbo-spring-boot-compatible/dubbo-spring-boot-autoconfigure-compatible/pom.xml b/dubbo-spring-boot-project/dubbo-spring-boot-compatible/dubbo-spring-boot-autoconfigure-compatible/pom.xml index d45aceca74b..455533ad504 100644 --- a/dubbo-spring-boot-project/dubbo-spring-boot-compatible/dubbo-spring-boot-autoconfigure-compatible/pom.xml +++ b/dubbo-spring-boot-project/dubbo-spring-boot-compatible/dubbo-spring-boot-autoconfigure-compatible/pom.xml @@ -73,6 +73,12 @@ ${project.version} + + javax.websocket + javax.websocket-api + provided + + org.springframework.boot diff --git a/dubbo-test/dubbo-dependencies-all/pom.xml b/dubbo-test/dubbo-dependencies-all/pom.xml index 83c80c43eb3..ddd9b6b6eb9 100644 --- a/dubbo-test/dubbo-dependencies-all/pom.xml +++ b/dubbo-test/dubbo-dependencies-all/pom.xml @@ -256,6 +256,11 @@ dubbo-triple-servlet ${project.version} + + org.apache.dubbo + dubbo-triple-websocket + ${project.version} + @@ -300,6 +305,11 @@ dubbo-remoting-http3 ${project.version} + + org.apache.dubbo + dubbo-remoting-websocket + ${project.version} + org.apache.dubbo dubbo-remoting-netty