diff --git a/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/ChunkOutputStream.java b/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/ChunkOutputStream.java index 53db275b7dc..0e9d9bb9043 100644 --- a/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/ChunkOutputStream.java +++ b/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/ChunkOutputStream.java @@ -21,6 +21,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultHttpContent; +import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException; import java.io.IOException; import java.io.OutputStream; @@ -29,27 +30,32 @@ public class ChunkOutputStream extends OutputStream { final ByteBuf buffer; final ChannelHandlerContext ctx; final NettyHttpResponse response; + int chunkSize = 0; - ChunkOutputStream(final NettyHttpResponse response, final ChannelHandlerContext ctx, final int chunksize) { + ChunkOutputStream(final NettyHttpResponse response, final ChannelHandlerContext ctx, final int chunkSize) { this.response = response; - if (chunksize < 1) { + if (chunkSize < 1) { throw new IllegalArgumentException(); } - // TODO buffer pool - this.buffer = Unpooled.buffer(0, chunksize); + this.buffer = Unpooled.buffer(0, chunkSize); + this.chunkSize = chunkSize; this.ctx = ctx; } @Override public void write(int b) throws IOException { if (buffer.maxWritableBytes() < 1) { - flush(); + throwExceedPayloadLimitException(buffer.readableBytes() + 1); } buffer.writeByte(b); } - public void reset() - { + private void throwExceedPayloadLimitException(int dataSize) throws ExceedPayloadLimitException { + throw new ExceedPayloadLimitException( + "Data length too large: " + dataSize + ", max payload: " + chunkSize); + } + + public void reset() { if (response.isCommitted()) throw new IllegalStateException(); buffer.clear(); } @@ -65,16 +71,10 @@ public void close() throws IOException { public void write(byte[] b, int off, int len) throws IOException { int dataLengthLeftToWrite = len; int dataToWriteOffset = off; - int spaceLeftInCurrentChunk; - while ((spaceLeftInCurrentChunk = buffer.maxWritableBytes()) < dataLengthLeftToWrite) { - buffer.writeBytes(b, dataToWriteOffset, spaceLeftInCurrentChunk); - dataToWriteOffset = dataToWriteOffset + spaceLeftInCurrentChunk; - dataLengthLeftToWrite = dataLengthLeftToWrite - spaceLeftInCurrentChunk; - flush(); - } - if (dataLengthLeftToWrite > 0) { - buffer.writeBytes(b, dataToWriteOffset, dataLengthLeftToWrite); + if (buffer.maxWritableBytes() < dataLengthLeftToWrite) { + throwExceedPayloadLimitException(buffer.readableBytes() + len); } + buffer.writeBytes(b, dataToWriteOffset, dataLengthLeftToWrite); } @Override diff --git a/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/NettyHttpResponse.java b/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/NettyHttpResponse.java index 9cb6a38f186..4ed2c5525b5 100644 --- a/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/NettyHttpResponse.java +++ b/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/NettyHttpResponse.java @@ -27,7 +27,9 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.LastHttpContent; +import org.apache.dubbo.common.URL; import org.apache.dubbo.metadata.rest.media.MediaType; +import org.apache.dubbo.remoting.Constants; import org.apache.dubbo.rpc.protocol.rest.RestHeaderEnum; @@ -59,15 +61,14 @@ public class NettyHttpResponse implements HttpResponse { // raw response class private Class entityClass; - public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive) { - this(ctx, keepAlive, null); + public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, URL url) { + this(ctx, keepAlive, null, url); } - public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, final HttpMethod method) { + public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, HttpMethod method, URL url) { outputHeaders = new HashMap<>(); this.method = method; - // TODO chunk size to config - os = new ChunkOutputStream(this, ctx, 1000); + os = new ChunkOutputStream(this, ctx, url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD)); this.ctx = ctx; this.keepAlive = keepAlive; } diff --git a/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/RestHttpRequestDecoder.java b/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/RestHttpRequestDecoder.java index 7a1aa1bad3b..d5233c3e10f 100644 --- a/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/RestHttpRequestDecoder.java +++ b/dubbo-rpc/dubbo-rpc-rest/src/main/java/org/apache/dubbo/rpc/protocol/rest/netty/RestHttpRequestDecoder.java @@ -60,7 +60,7 @@ public RestHttpRequestDecoder(URL url, ServiceDeployer serviceDeployer) { protected void decode(ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request, List out) throws Exception { boolean keepAlive = HttpHeaders.isKeepAlive(request); - NettyHttpResponse nettyHttpResponse = new NettyHttpResponse(ctx, keepAlive); + NettyHttpResponse nettyHttpResponse = new NettyHttpResponse(ctx, keepAlive,url); NettyRequestFacade requestFacade = new NettyRequestFacade(request, ctx,serviceDeployer); executor.execute(() -> { diff --git a/dubbo-rpc/dubbo-rpc-rest/src/test/java/org/apache/dubbo/rpc/protocol/rest/JaxrsRestProtocolTest.java b/dubbo-rpc/dubbo-rpc-rest/src/test/java/org/apache/dubbo/rpc/protocol/rest/JaxrsRestProtocolTest.java index 21bb14b9113..080e92a0e0f 100644 --- a/dubbo-rpc/dubbo-rpc-rest/src/test/java/org/apache/dubbo/rpc/protocol/rest/JaxrsRestProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-rest/src/test/java/org/apache/dubbo/rpc/protocol/rest/JaxrsRestProtocolTest.java @@ -63,6 +63,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import static org.apache.dubbo.remoting.Constants.SERVER_KEY; @@ -743,6 +744,35 @@ void testReExport() { exporter.unexport(); } + @Test + void testBody() { + + + Assertions.assertThrowsExactly(RpcException.class, () -> { + DemoService server = new DemoServiceImpl(); + + URL url = this.registerProvider(exportUrl, server, DemoService.class); + + URL nettyUrl = url.addParameter(org.apache.dubbo.remoting.Constants.PAYLOAD_KEY, 1024); + + Exporter exporter = protocol.export(proxy.getInvoker(server, DemoService.class, nettyUrl)); + + + DemoService demoService = this.proxy.getProxy(protocol.refer(DemoService.class, nettyUrl)); + + List users = new ArrayList<>(); + for (int i = 0; i < 10000; i++) { + users.add(User.getInstance()); + + } + + demoService.list(users); + + exporter.unexport(); + }); + + } + private URL registerProvider(URL url, Object impl, Class interfaceClass) { ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass); ProviderModel providerModel = new ProviderModel(