Skip to content

Commit

Permalink
When a header frame with an end_stream flag is received, the close me…
Browse files Browse the repository at this point in the history
…thod of the streaming decoder is called (#14313)
  • Loading branch information
finefuture authored Jun 14, 2024
1 parent e4fa369 commit e49b8b2
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ protected void doOnCompleted(Throwable throwable) {
if (httpMetadata == null) {
return;
}
if (!headerSent) {
HttpHeaders headers = httpMetadata.headers();
headers.set(HttpHeaderNames.STATUS.getName(), resolveStatusCode(throwable));
headers.set(HttpHeaderNames.CONTENT_TYPE.getName(), responseEncoder.contentType());
}
trailersCustomizer.accept(httpMetadata.headers(), throwable);
getHttpChannel().writeHeader(httpMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ protected void onUnary() {}

@Override
protected void onMetadataCompletion(Http2Header metadata) {
super.onMetadataCompletion(metadata);
processGrpcHeaders(metadata);
super.onMetadataCompletion(metadata);
}

private void processGrpcHeaders(Http2Header metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
import org.apache.dubbo.remoting.http12.HttpMethods;
import org.apache.dubbo.remoting.http12.h2.CancelStreamException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
Expand All @@ -50,15 +48,11 @@
import org.apache.dubbo.rpc.protocol.tri.h12.StreamingHttpMessageListener;
import org.apache.dubbo.rpc.protocol.tri.h12.UnaryServerCallListener;

import java.io.ByteArrayInputStream;
import java.util.concurrent.Executor;

public class GenericHttp2ServerTransportListener extends AbstractServerTransportListener<Http2Header, Http2InputMessage>
implements Http2TransportListener {

private static final Http2InputMessage EMPTY_MESSAGE =
new Http2InputMessageFrame(new ByteArrayInputStream(new byte[0]), true);

private final ExecutorSupport executorSupport;
private final StreamingDecoder streamingDecoder;
private final FrameworkModel frameworkModel;
Expand Down Expand Up @@ -93,18 +87,6 @@ protected Executor initializeExecutor(Http2Header metadata) {
return new SerializingExecutor(executorSupport.getExecutor(metadata));
}

@Override
protected void doOnMetadata(Http2Header metadata) {
if (metadata.isEndStream()) {
if (!HttpMethods.supportBody(metadata.method())) {
super.doOnMetadata(metadata);
doOnData(EMPTY_MESSAGE);
}
return;
}
super.doOnMetadata(metadata);
}

@Override
protected HttpMessageListener buildHttpMessageListener() {
RpcInvocationBuildContext context = getContext();
Expand Down Expand Up @@ -180,6 +162,9 @@ private BiStreamServerCallListener startBiStreaming(
protected void onMetadataCompletion(Http2Header metadata) {
serverChannelObserver.setResponseEncoder(getContext().getHttpMessageEncoder());
serverChannelObserver.request(1);
if (metadata.isEndStream()) {
getStreamingDecoder().close();
}
}

@Override
Expand Down

0 comments on commit e49b8b2

Please sign in to comment.