Skip to content

Commit

Permalink
remove duplicate flowcontroller listener (#11642)
Browse files Browse the repository at this point in the history
* remove duplicate flowcontroller listener

* remove duplicate flowcontroller listener
  • Loading branch information
icodening authored Feb 25, 2023
1 parent 5d07038 commit 837b9c5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.protocol.tri;

import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2RemoteFlowController;
import org.apache.dubbo.common.utils.Assert;

import java.util.function.Consumer;

public class TripleHttp2FrameCodecBuilder extends Http2FrameCodecBuilder {

TripleHttp2FrameCodecBuilder(Http2Connection connection) {
connection(connection);
}

public static TripleHttp2FrameCodecBuilder fromConnection(Http2Connection connection) {
return new TripleHttp2FrameCodecBuilder(connection);
}

public static TripleHttp2FrameCodecBuilder forClient() {
return forClient(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
}

public static TripleHttp2FrameCodecBuilder forClient(int maxReservedStreams) {
return fromConnection(new DefaultHttp2Connection(false, maxReservedStreams));
}

public static TripleHttp2FrameCodecBuilder forServer() {
return forServer(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
}

public static TripleHttp2FrameCodecBuilder forServer(int maxReservedStreams) {
return fromConnection(new DefaultHttp2Connection(true, maxReservedStreams));
}

public TripleHttp2FrameCodecBuilder customizeConnection(Consumer<Http2Connection> connectionCustomizer) {
Http2Connection connection = this.connection();
Assert.notNull(connection, "connection cannot be null.");
connectionCustomizer.accept(connection);
return this;
}

public TripleHttp2FrameCodecBuilder remoteFlowController(Http2RemoteFlowController remoteFlowController) {
return this.customizeConnection((connection) -> connection.remote().flowController(remoteFlowController));
}

public TripleHttp2FrameCodecBuilder localFlowController(Http2LocalFlowController localFlowController) {
return this.customizeConnection((connection) -> connection.local().flowController(localFlowController));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
Expand Down Expand Up @@ -113,7 +112,8 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) {
} else {
headFilters = Collections.emptyList();
}
final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer()
final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
.customizeConnection((connection) -> connection.remote().flowController(new TriHttp2RemoteFlowController(connection, url.getOrDefaultApplicationModel())))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings().headerTableSize(
config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE))
Expand All @@ -127,7 +127,6 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) {
.frameLogger(SERVER_LOGGER)
.build();
ExecutorSupport executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url);
codec.connection().remote().flowController(new TriHttp2RemoteFlowController(codec.connection(), url.getOrDefaultApplicationModel()));
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
TripleWriteQueue writeQueue = new TripleWriteQueue();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
Expand All @@ -152,7 +151,8 @@ protected void initChannel(Http2StreamChannel ch) {

@Override
public void configClientPipeline(URL url, ChannelOperator operator, ContextOperator contextOperator) {
final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forClient()
.customizeConnection((connection) -> connection.remote().flowController(new TriHttp2RemoteFlowController(connection, url.getOrDefaultApplicationModel())))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings().headerTableSize(
config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE))
Expand All @@ -166,7 +166,6 @@ public void configClientPipeline(URL url, ChannelOperator operator, ContextOpera
DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(CLIENT_LOGGER)
.build();
codec.connection().remote().flowController(new TriHttp2RemoteFlowController(codec.connection(), url.getOrDefaultApplicationModel()));
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
new TripleClientHandler(frameworkModel));
Expand Down

0 comments on commit 837b9c5

Please sign in to comment.