Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rest protocol support keep-alive timeout header config #14560

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.metadata.rest.media.MediaType;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.protocol.rest.RestHeaderEnum;
import org.apache.dubbo.rpc.protocol.rest.constans.RestConstant;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -36,10 +37,9 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
Expand All @@ -51,11 +51,13 @@ public class NettyHttpResponse implements HttpResponse {
private static final int EMPTY_CONTENT_LENGTH = 0;
private int status = 200;
private OutputStream os;
private Map<String, List<String>> outputHeaders;
private final Map<String, List<String>> outputHeaders;
private final ChannelHandlerContext ctx;
private boolean committed;
private boolean keepAlive;
private HttpMethod method;
private final boolean keepAlive;

private final int idleTimeout;
private final HttpMethod method;
// raw response body
private Object responseBody;
// raw response class
Expand All @@ -69,6 +71,7 @@ public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAliv
outputHeaders = new HashMap<>();
this.method = method;
os = new ChunkOutputStream(this, ctx, url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD));
this.idleTimeout = url.getParameter(RestConstant.IDLE_TIMEOUT_PARAM, RestConstant.IDLE_TIMEOUT);
this.ctx = ctx;
this.keepAlive = keepAlive;
}
Expand Down Expand Up @@ -125,7 +128,6 @@ public void reset() {
throw new IllegalStateException("Messages.MESSAGES.alreadyCommitted()");
}
outputHeaders.clear();
outputHeaders.clear();
}

public boolean isKeepAlive() {
Expand All @@ -141,7 +143,7 @@ public DefaultHttpResponse getDefaultHttpResponse() {
public DefaultHttpResponse getEmptyHttpResponse() {
DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(getStatus()));
if (method == null || !method.equals(HttpMethod.HEAD)) {
res.headers().add(Names.CONTENT_LENGTH, EMPTY_CONTENT_LENGTH);
res.headers().add(HttpHeaderNames.CONTENT_LENGTH, EMPTY_CONTENT_LENGTH);
}
transformResponseHeaders(res);

Expand All @@ -155,7 +157,7 @@ private void transformResponseHeaders(io.netty.handler.codec.http.HttpResponse r
public void prepareChunkStream() {
committed = true;
DefaultHttpResponse response = getDefaultHttpResponse();
HttpHeaders.setTransferEncodingChunked(response);
HttpUtil.setTransferEncodingChunked(response, true);
ctx.write(response);
}

Expand Down Expand Up @@ -185,12 +187,7 @@ public void flushBuffer() throws IOException {
@Override
public void addOutputHeaders(String name, String value) {

List<String> values = outputHeaders.get(name);

if (values == null) {
values = new ArrayList<>();
outputHeaders.put(name, values);
}
List<String> values = outputHeaders.computeIfAbsent(name, k -> new ArrayList<>());

if (values.contains(value)) {
return;
Expand All @@ -200,10 +197,12 @@ public void addOutputHeaders(String name, String value) {
}

@SuppressWarnings({"rawtypes", "unchecked"})
public static void transformHeaders(
NettyHttpResponse nettyResponse, io.netty.handler.codec.http.HttpResponse response) {
public void transformHeaders(NettyHttpResponse nettyResponse, io.netty.handler.codec.http.HttpResponse response) {
if (nettyResponse.isKeepAlive()) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
if (idleTimeout > 0) {
response.headers().set(HttpHeaderNames.KEEP_ALIVE, "timeout=" + idleTimeout);
}
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpUtil;

import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;

Expand All @@ -56,8 +56,7 @@ public RestHttpRequestDecoder(URL url, ServiceDeployer serviceDeployer) {
protected void decode(
ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request, List<Object> out)
throws Exception {
boolean keepAlive = HttpHeaders.isKeepAlive(request);

boolean keepAlive = HttpUtil.isKeepAlive(request);
NettyHttpResponse nettyHttpResponse = new NettyHttpResponse(ctx, keepAlive, url);
NettyRequestFacade requestFacade = new NettyRequestFacade(request, ctx, serviceDeployer);

Expand Down
Loading