Skip to content

Commit

Permalink
fix chunkoutputstream chunksize exception
Browse files Browse the repository at this point in the history
  • Loading branch information
suncairong163 committed Aug 20, 2023
1 parent 6b617a1 commit 254005f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -29,27 +30,32 @@ public class ChunkOutputStream extends OutputStream {
final ByteBuf buffer;
final ChannelHandlerContext ctx;
final NettyHttpResponse response;
int chunkSize = 0;

ChunkOutputStream(final NettyHttpResponse response, final ChannelHandlerContext ctx, final int chunksize) {
ChunkOutputStream(final NettyHttpResponse response, final ChannelHandlerContext ctx, final int chunkSize) {
this.response = response;
if (chunksize < 1) {
if (chunkSize < 1) {
throw new IllegalArgumentException();
}
// TODO buffer pool
this.buffer = Unpooled.buffer(0, chunksize);
this.buffer = Unpooled.buffer(0, chunkSize);
this.chunkSize = chunkSize;
this.ctx = ctx;
}

@Override
public void write(int b) throws IOException {
if (buffer.maxWritableBytes() < 1) {
flush();
throwExceedPayloadLimitException(buffer.readableBytes() + 1);
}
buffer.writeByte(b);
}

public void reset()
{
private void throwExceedPayloadLimitException(int dataSize) throws ExceedPayloadLimitException {
throw new ExceedPayloadLimitException(
"Data length too large: " + dataSize + ", max payload: " + chunkSize);
}

public void reset() {
if (response.isCommitted()) throw new IllegalStateException();
buffer.clear();
}
Expand All @@ -65,16 +71,10 @@ public void close() throws IOException {
public void write(byte[] b, int off, int len) throws IOException {
int dataLengthLeftToWrite = len;
int dataToWriteOffset = off;
int spaceLeftInCurrentChunk;
while ((spaceLeftInCurrentChunk = buffer.maxWritableBytes()) < dataLengthLeftToWrite) {
buffer.writeBytes(b, dataToWriteOffset, spaceLeftInCurrentChunk);
dataToWriteOffset = dataToWriteOffset + spaceLeftInCurrentChunk;
dataLengthLeftToWrite = dataLengthLeftToWrite - spaceLeftInCurrentChunk;
flush();
}
if (dataLengthLeftToWrite > 0) {
buffer.writeBytes(b, dataToWriteOffset, dataLengthLeftToWrite);
if (buffer.maxWritableBytes() < dataLengthLeftToWrite) {
throwExceedPayloadLimitException(buffer.readableBytes() + len);
}
buffer.writeBytes(b, dataToWriteOffset, dataLengthLeftToWrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.rest.media.MediaType;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.protocol.rest.RestHeaderEnum;


Expand Down Expand Up @@ -59,15 +61,14 @@ public class NettyHttpResponse implements HttpResponse {
// raw response class
private Class<?> entityClass;

public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive) {
this(ctx, keepAlive, null);
public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, URL url) {
this(ctx, keepAlive, null, url);
}

public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, final HttpMethod method) {
public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, HttpMethod method, URL url) {
outputHeaders = new HashMap<>();
this.method = method;
// TODO chunk size to config
os = new ChunkOutputStream(this, ctx, 1000);
os = new ChunkOutputStream(this, ctx, url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD));
this.ctx = ctx;
this.keepAlive = keepAlive;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public RestHttpRequestDecoder(URL url, ServiceDeployer serviceDeployer) {
protected void decode(ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request, List<Object> out) throws Exception {
boolean keepAlive = HttpHeaders.isKeepAlive(request);

NettyHttpResponse nettyHttpResponse = new NettyHttpResponse(ctx, keepAlive);
NettyHttpResponse nettyHttpResponse = new NettyHttpResponse(ctx, keepAlive,url);
NettyRequestFacade requestFacade = new NettyRequestFacade(request, ctx,serviceDeployer);

executor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static org.apache.dubbo.remoting.Constants.SERVER_KEY;
Expand Down Expand Up @@ -743,6 +744,35 @@ void testReExport() {
exporter.unexport();
}

@Test
void testBody() {


Assertions.assertThrowsExactly(RpcException.class, () -> {
DemoService server = new DemoServiceImpl();

URL url = this.registerProvider(exportUrl, server, DemoService.class);

URL nettyUrl = url.addParameter(org.apache.dubbo.remoting.Constants.PAYLOAD_KEY, 1024);

Exporter<DemoService> exporter = protocol.export(proxy.getInvoker(server, DemoService.class, nettyUrl));


DemoService demoService = this.proxy.getProxy(protocol.refer(DemoService.class, nettyUrl));

List<User> users = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
users.add(User.getInstance());

}

demoService.list(users);

exporter.unexport();
});

}

private URL registerProvider(URL url, Object impl, Class<?> interfaceClass) {
ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
ProviderModel providerModel = new ProviderModel(
Expand Down

0 comments on commit 254005f

Please sign in to comment.