From aea67d4f43a6c3d33d52fac0825596f47180df64 Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Wed, 5 Feb 2020 10:27:47 +0100 Subject: [PATCH 1/7] netty connector/container modifications (OO) Signed-off-by: Maxim Nesen --- .../netty/connector/JerseyClientHandler.java | 91 ++++----- .../netty/connector/NettyConnector.java | 159 ++++++++------- .../connector/internal/NettyInputStream.java | 186 +++++++++++------- .../netty/connector/HelloWorldTest.java | 3 +- .../netty/connector/HttpHeadersTest.java | 3 +- .../httpserver/JerseyHttp2ServerHandler.java | 15 +- .../netty/httpserver/JerseyServerHandler.java | 14 +- pom.xml | 2 +- 8 files changed, 260 insertions(+), 213 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java index 5531498358..9ab9745c09 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -20,17 +20,14 @@ import java.io.InputStream; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingDeque; import javax.ws.rs.core.Response; import org.glassfish.jersey.client.ClientRequest; import org.glassfish.jersey.client.ClientResponse; -import org.glassfish.jersey.client.spi.AsyncConnectorCallback; import org.glassfish.jersey.netty.connector.internal.NettyInputStream; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpContent; @@ -39,8 +36,6 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; /** * Jersey implementation of Netty channel handler. @@ -49,19 +44,38 @@ */ class JerseyClientHandler extends SimpleChannelInboundHandler { - private final NettyConnector connector; - private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); - - private final AsyncConnectorCallback asyncConnectorCallback; private final ClientRequest jerseyRequest; - private final CompletableFuture future; + private final CompletableFuture responseAvailable; + private final CompletableFuture responseDone; + + private NettyInputStream nis; + private ClientResponse jerseyResponse; - JerseyClientHandler(NettyConnector nettyConnector, ClientRequest request, - AsyncConnectorCallback callback, CompletableFuture future) { - this.connector = nettyConnector; - this.asyncConnectorCallback = callback; + JerseyClientHandler(ClientRequest request, + CompletableFuture responseAvailable, + CompletableFuture responseDone) { this.jerseyRequest = request; - this.future = future; + this.responseAvailable = responseAvailable; + this.responseDone = responseDone; + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + notifyResponse(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + // assert: no-op, if channel is closed after LastHttpContent has been consumed + responseDone.completeExceptionally(new IOException("Stream closed")); + } + + protected void notifyResponse() { + if (jerseyResponse != null) { + ClientResponse cr = jerseyResponse; + jerseyResponse = null; + responseAvailable.complete(cr); + } } @Override @@ -69,7 +83,7 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { if (msg instanceof HttpResponse) { final HttpResponse response = (HttpResponse) msg; - final ClientResponse jerseyResponse = new ClientResponse(new Response.StatusType() { + jerseyResponse = new ClientResponse(new Response.StatusType() { @Override public int getStatusCode() { return response.status().code(); @@ -89,19 +103,15 @@ public String getReasonPhrase() { for (Map.Entry entry : response.headers().entries()) { jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue()); } - isList.clear(); // clearing the content - possible leftover from previous request processing. + // request entity handling. if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0) || HttpUtil.isTransferEncodingChunked(response)) { - ctx.channel().closeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - isList.add(Unpooled.EMPTY_BUFFER); - } - }); + nis = new NettyInputStream(); + responseDone.whenComplete((_r, th) -> nis.complete(th)); - jerseyResponse.setEntityStream(new NettyInputStream(isList)); + jerseyResponse.setEntityStream(nis); } else { jerseyResponse.setEntityStream(new InputStream() { @Override @@ -110,44 +120,29 @@ public int read() throws IOException { } }); } - - if (asyncConnectorCallback != null) { - connector.executorService.execute(new Runnable() { - @Override - public void run() { - asyncConnectorCallback.response(jerseyResponse); - future.complete(jerseyResponse); - } - }); - } - } if (msg instanceof HttpContent) { + HttpContent httpContent = (HttpContent) msg; ByteBuf content = httpContent.content(); + if (content.isReadable()) { content.retain(); - isList.add(content); + nis.publish(content); } if (msg instanceof LastHttpContent) { - isList.add(Unpooled.EMPTY_BUFFER); + responseDone.complete(null); + notifyResponse(); } } } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) { - if (asyncConnectorCallback != null) { - connector.executorService.execute(new Runnable() { - @Override - public void run() { - asyncConnectorCallback.failure(cause); - } - }); - } - future.completeExceptionally(cause); - ctx.close(); + responseDone.completeExceptionally(cause); } } diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java index 2b36006608..373d3c8bf0 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java @@ -20,6 +20,8 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -28,7 +30,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; @@ -76,6 +77,7 @@ class NettyConnector implements Connector { final ExecutorService executorService; final EventLoopGroup group; final Client client; + final HashMap> connections = new HashMap<>(); NettyConnector(Client client) { @@ -83,72 +85,72 @@ class NettyConnector implements Connector { if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) { executorService = Executors.newFixedThreadPool((Integer) threadPoolSize); + this.group = new NioEventLoopGroup((Integer) threadPoolSize); } else { executorService = Executors.newCachedThreadPool(); + this.group = new NioEventLoopGroup(); } - this.group = new NioEventLoopGroup(); this.client = client; } @Override public ClientResponse apply(ClientRequest jerseyRequest) { - - final AtomicReference syncResponse = new AtomicReference<>(null); - final AtomicReference syncException = new AtomicReference<>(null); - try { - Future resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() { - @Override - public void response(ClientResponse response) { - syncResponse.set(response); - } - - @Override - public void failure(Throwable failure) { - syncException.set(failure); - } - }); + CompletableFuture resultFuture = execute(jerseyRequest); Integer timeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(), ClientProperties.READ_TIMEOUT, 0); - if (timeout != null && timeout > 0) { - resultFuture.get(timeout, TimeUnit.MILLISECONDS); - } else { - resultFuture.get(); - } + return (timeout != null && timeout > 0) ? resultFuture.get(timeout, TimeUnit.MILLISECONDS) + : resultFuture.get(); } catch (ExecutionException ex) { Throwable e = ex.getCause() == null ? ex : ex.getCause(); throw new ProcessingException(e.getMessage(), e); } catch (Exception ex) { throw new ProcessingException(ex.getMessage(), ex); } - - Throwable throwable = syncException.get(); - if (throwable == null) { - return syncResponse.get(); - } else { - throw new RuntimeException(throwable); - } } @Override public Future apply(final ClientRequest jerseyRequest, final AsyncConnectorCallback jerseyCallback) { + return execute(jerseyRequest).whenCompleteAsync((r, th) -> { + if (th == null) jerseyCallback.response(r); + else jerseyCallback.failure(th); + }, executorService); + } - final CompletableFuture settableFuture = new CompletableFuture<>(); + protected CompletableFuture execute(final ClientRequest jerseyRequest) { + final CompletableFuture responseAvailable = new CompletableFuture<>(); + final CompletableFuture responseDone = new CompletableFuture<>(); final URI requestUri = jerseyRequest.getUri(); String host = requestUri.getHost(); int port = requestUri.getPort() != -1 ? requestUri.getPort() : "https".equals(requestUri.getScheme()) ? 443 : 80; try { - Bootstrap b = new Bootstrap(); - b.group(group) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { + String key = requestUri.getScheme() + "://" + host + ":" + port; + ArrayList conns; + synchronized (connections) { + conns = connections.get(key); + if (conns == null) { + conns = new ArrayList<>(0); + connections.put(key, conns); + } + } + + Channel chan; + synchronized (conns) { + chan = conns.size() == 0 ? null : conns.remove(conns.size() - 1); + } + + if (chan == null) { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // Enable HTTPS if necessary. @@ -177,32 +179,47 @@ protected void initChannel(SocketChannel ch) throws Exception { p.addLast(new HttpClientCodec()); p.addLast(new ChunkedWriteHandler()); p.addLast(new HttpContentDecompressor()); - p.addLast(new JerseyClientHandler(NettyConnector.this, jerseyRequest, jerseyCallback, settableFuture)); - } - }); - - // connect timeout - Integer connectTimeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(), - ClientProperties.CONNECT_TIMEOUT, 0); - if (connectTimeout > 0) { - b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); - } + } + }); - // Make the connection attempt. - final Channel ch = b.connect(host, port).sync().channel(); + // connect timeout + Integer connectTimeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(), + ClientProperties.CONNECT_TIMEOUT, 0); + if (connectTimeout > 0) { + b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); + } - // guard against prematurely closed channel - final GenericFutureListener> closeListener = - new GenericFutureListener>() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - if (!settableFuture.isDone()) { - settableFuture.completeExceptionally(new IOException("Channel closed.")); - } - } - }; + // Make the connection attempt. + chan = b.connect(host, port).sync().channel(); + } - ch.closeFuture().addListener(closeListener); + // assert: clientHandler will always notify responseDone: either normally, or exceptionally + // assert: clientHandler may notify responseAvailable, if sufficient parts of response are detected to construct + // a valid ClientResponse + // assert: responseAvailable completion may be racing against responseDone completion + // assert: it is ok to abort the entire response, if responseDone is completed exceptionally - in particular, nothing + // will leak + final Channel ch = chan; + JerseyClientHandler clientHandler = new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone); + ch.pipeline().addLast(clientHandler); + + responseDone.whenComplete((_r, th) -> { + ch.pipeline().remove(clientHandler); + + if (th == null) { + synchronized (connections) { + ArrayList conns1 = connections.get(key); + synchronized (conns1) { + conns1.add(ch); + } + } + } else { + ch.close(); + // if responseAvailable has been completed, no-op: jersey will encounter IOException while reading response body + // if responseAvailable has not been completed, abort + responseAvailable.completeExceptionally(th); + } + }); HttpRequest nettyRequest; String pathWithQuery = buildPathWithQueryParameters(requestUri); @@ -226,14 +243,23 @@ public void operationComplete(io.netty.util.concurrent.Future futu nettyRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost()); if (jerseyRequest.hasEntity()) { + // guard against prematurely closed channel + final GenericFutureListener> closeListener = + new GenericFutureListener>() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!responseDone.isDone()) { + responseDone.completeExceptionally(new IOException("Channel closed.")); + } + } + }; + ch.closeFuture().addListener(closeListener); if (jerseyRequest.getLengthLong() == -1) { HttpUtil.setTransferEncodingChunked(nettyRequest, true); } else { nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong()); } - } - if (jerseyRequest.hasEntity()) { // Send the HTTP request. ch.writeAndFlush(nettyRequest); @@ -260,27 +286,22 @@ public void run() { try { jerseyRequest.writeEntity(); } catch (IOException e) { - jerseyCallback.failure(e); - settableFuture.completeExceptionally(e); + responseDone.completeExceptionally(e); } } }); ch.flush(); } else { - // close listener is not needed any more. - ch.closeFuture().removeListener(closeListener); - // Send the HTTP request. ch.writeAndFlush(nettyRequest); } } catch (InterruptedException e) { - settableFuture.completeExceptionally(e); - return settableFuture; + responseDone.completeExceptionally(e); } - return settableFuture; + return responseAvailable; } private String buildPathWithQueryParameters(URI requestUri) { diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index 741121f506..ea8958ff59 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -16,107 +16,141 @@ package org.glassfish.jersey.netty.connector.internal; -import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.LinkedBlockingDeque; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; /** * Input stream which servers as Request entity input. *

- * Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey + * Converts Netty NIO buffers to an input streams and stores them in the queue, + * waiting for Jersey to process it. + * + * @author Pavel Bucek */ public class NettyInputStream extends InputStream { - private final LinkedBlockingDeque isList; + private volatile boolean end = false; + private Throwable cause; + + private final ArrayDeque isList; + private ByteBuf current; + private ByteBuffer buffer; + + private byte[] ONE_BYTE; + private boolean reading; - public NettyInputStream(LinkedBlockingDeque isList) { - this.isList = isList; + public NettyInputStream() { + this.isList = new ArrayDeque<>(); } @Override public int read(byte[] b, int off, int len) throws IOException { - - ByteBuf take; - try { - take = isList.take(); - boolean isReadable = take.isReadable(); - int read = -1; - if (checkEndOfInputOrError(take)) { - take.release(); + if (current == null) { + buffer = awaitNext(); + if (buffer == null) { + // assert: end is true + if (cause == null) { return -1; - } - - if (isReadable) { - int readableBytes = take.readableBytes(); - read = Math.min(readableBytes, len); - take.readBytes(b, off, read); - if (read < len) { - take.release(); - } else { - isList.addFirst(take); - } - } else { - read = 0; - take.release(); //We don't need `0` - } - - return read; - } catch (InterruptedException e) { - throw new IOException("Interrupted.", e); - } + } + + throw new IOException(cause); + } + } + + int rem = buffer.remaining(); + if (rem < len) { + len = rem; + } + buffer.get(b, off, len); + if (rem == len) { + current.release(); + current = null; + buffer = null; + } + + return len; } @Override public int read() throws IOException { - - ByteBuf take; - try { - take = isList.take(); - boolean isReadable = take.isReadable(); - if (checkEndOfInputOrError(take)) { - take.release(); - return -1; - } - - if (isReadable) { - return take.readInt(); - } else { - take.release(); //We don't need `0` - } - - return 0; - } catch (InterruptedException e) { - throw new IOException("Interrupted.", e); - } + if (ONE_BYTE == null) { + ONE_BYTE = new byte[1]; + } + int r = read(ONE_BYTE, 0, 1); + if (r < 0) { + return r; + } + + return ONE_BYTE[0] & 0xff; } @Override - public void close() throws IOException { - if (isList != null) { - while (!isList.isEmpty()) { - try { - isList.take().release(); - } catch (InterruptedException e) { - throw new IOException("Interrupted. Potential ByteBuf Leak.", e); - } - } - } - super.close(); + public synchronized void close() { + if (current != null) { + current.release(); + } + + current = null; + buffer = null; + cleanup(true); } - @Override - public int available() throws IOException { - ByteBuf peek = isList.peek(); - if (peek != null && peek.isReadable()) { - return peek.readableBytes(); - } - return 0; + protected synchronized ByteBuffer awaitNext() { + while (isList.isEmpty()) { + if (end) { + return null; + } + + try { + reading = true; + wait(); + reading = false; + } catch (InterruptedException ie) { + // waiting uninterruptibly + } + } + + current = isList.poll(); + return current.nioBuffer().asReadOnlyBuffer(); + } + + public synchronized void complete(Throwable cause) { + this.cause = cause; + cleanup(cause != null); + } + + protected void cleanup(boolean drain) { + if (drain) { + while (!isList.isEmpty()) { + isList.poll().release(); + } + } + + end = true; + + if (reading) { + notifyAll(); + } + } + + public synchronized void publish(ByteBuf content) { + if (end || content.nioBuffer().remaining() == 0) { + content.release(); + return; + } + + isList.add(content); + if (reading) { + notifyAll(); + } } - private boolean checkEndOfInputOrError(ByteBuf take) throws IOException { - return take == Unpooled.EMPTY_BUFFER; + public void clear() { + end = false; + isList.clear(); } } diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java index 6716c9be43..20caa533d3 100644 --- a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java +++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -77,6 +77,7 @@ protected void configureClient(ClientConfig config) { public void testConnection() { Response response = target().path(ROOT_PATH).request("text/plain").get(); assertEquals(200, response.getStatus()); + response.close(); } @Test diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java index 10cfa14e95..f5438e5ac6 100644 --- a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java +++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -64,5 +64,6 @@ public void testPost() { assertEquals(200, response.getStatus()); assertTrue(response.hasEntity()); + response.close(); } } diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java index e64476db71..6eeac1756a 100644 --- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java +++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -25,11 +25,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.LinkedBlockingDeque; import javax.ws.rs.core.SecurityContext; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -55,7 +52,7 @@ class JerseyHttp2ServerHandler extends ChannelDuplexHandler { private final URI baseUri; - private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); + private final NettyInputStream nettyInputStream = new NettyInputStream(); private final NettyHttpContainer container; private final ResourceConfig resourceConfig; @@ -92,9 +89,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception * Process incoming data. */ private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception { - isList.add(data.content()); + nettyInputStream.publish(data.content()); if (data.isEndStream()) { - isList.add(Unpooled.EMPTY_BUFFER); + nettyInputStream.complete(null); } } @@ -163,11 +160,11 @@ public void removeProperty(String name) { ctx.channel().closeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { - isList.add(Unpooled.EMPTY_BUFFER); + nettyInputStream.complete(future.cause()); } }); - requestContext.setEntityStream(new NettyInputStream(isList)); + requestContext.setEntityStream(nettyInputStream); } else { requestContext.setEntityStream(new InputStream() { @Override diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java index 712cb1f531..0f2a7ae8f0 100644 --- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java +++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -20,13 +20,11 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.LinkedBlockingDeque; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.MediaType; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -53,7 +51,7 @@ class JerseyServerHandler extends ChannelInboundHandlerAdapter { private final URI baseUri; - private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); + private final NettyInputStream nettyInputStream = new NettyInputStream(); private final NettyHttpContainer container; private final ResourceConfig resourceConfig; @@ -82,7 +80,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) { ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)); } - isList.clear(); // clearing the content - possible leftover from previous request processing. + nettyInputStream.clear(); // clearing the content - possible leftover from previous request processing. final ContainerRequest requestContext = createContainerRequest(ctx, req); requestContext.setWriter(new NettyResponseWriter(ctx, req, container)); @@ -105,7 +103,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) { //Otherwise, it's safe to discard during next processing if ((!isJson && contentLength != -1) || HttpUtil.isTransferEncodingChunked(req) || (isJson && contentLength >= 2)) { - requestContext.setEntityStream(new NettyInputStream(isList)); + requestContext.setEntityStream(nettyInputStream); } } @@ -128,11 +126,11 @@ public void run() { ByteBuf content = httpContent.content(); if (content.isReadable()) { - isList.add(content); + nettyInputStream.publish(content); } if (msg instanceof LastHttpContent) { - isList.add(Unpooled.EMPTY_BUFFER); + nettyInputStream.complete(null); } } } diff --git a/pom.xml b/pom.xml index a157ec49b9..3cad1082dd 100644 --- a/pom.xml +++ b/pom.xml @@ -2141,7 +2141,7 @@ 1.10.19 2.7.4 0.8.17 - 4.1.31.Final + 4.1.43.Final 1.6.7 0.30.0 6.0.0 From 8c7d90cb06e6f5a217c25afd56291dbb7a42092e Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Tue, 18 Feb 2020 09:34:07 +0100 Subject: [PATCH 2/7] available and releaseByteBuf methods added Signed-off-by: Maxim Nesen --- .../connector/internal/NettyInputStream.java | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index ea8958ff59..2dbb6d6130 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -67,9 +67,7 @@ public int read(byte[] b, int off, int len) throws IOException { } buffer.get(b, off, len); if (rem == len) { - current.release(); - current = null; - buffer = null; + releaseByteBuf(); } return len; @@ -90,13 +88,19 @@ public int read() throws IOException { @Override public synchronized void close() { - if (current != null) { - current.release(); - } - current = null; - buffer = null; - cleanup(true); + releaseByteBuf(); + + cleanup(true); + } + + private void releaseByteBuf() { + if (current != null) { + current.release(); + } + + current = null; + buffer = null; } protected synchronized ByteBuffer awaitNext() { @@ -137,6 +141,23 @@ protected void cleanup(boolean drain) { } } + @Override + public int available() throws IOException { + if (end || reading) { + return 0; + } + if (current != null) { + return current.readableBytes(); + } + if (!isList.isEmpty()) { + final ByteBuf peek = isList.peek(); + if (peek != null && peek.isReadable()) { + return peek.readableBytes(); + } + } + return 0; + } + public synchronized void publish(ByteBuf content) { if (end || content.nioBuffer().remaining() == 0) { content.release(); From df5add259e2f510dfdfe83aeadbb906e0e9b70ab Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Tue, 18 Feb 2020 11:40:40 +0100 Subject: [PATCH 3/7] synchronized releaseByteBuf Signed-off-by: Maxim Nesen --- .../jersey/netty/connector/internal/NettyInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index 2dbb6d6130..52f795ab3a 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -94,7 +94,7 @@ public synchronized void close() { cleanup(true); } - private void releaseByteBuf() { + private synchronized void releaseByteBuf() { if (current != null) { current.release(); } From 5f6111333ea37c17a7348fca80c31b672b7f23ef Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Tue, 18 Feb 2020 11:57:23 +0100 Subject: [PATCH 4/7] available simplified Signed-off-by: Maxim Nesen --- .../netty/connector/internal/NettyInputStream.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index 52f795ab3a..ccfa5ea212 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -143,19 +143,7 @@ protected void cleanup(boolean drain) { @Override public int available() throws IOException { - if (end || reading) { - return 0; - } - if (current != null) { - return current.readableBytes(); - } - if (!isList.isEmpty()) { - final ByteBuf peek = isList.peek(); - if (peek != null && peek.isReadable()) { - return peek.readableBytes(); - } - } - return 0; + return buffer == null ? 0: buffer.remaining(); } public synchronized void publish(ByteBuf content) { From b6daf7ed02532ebe58801da65ccf8cb1812407b2 Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Tue, 18 Feb 2020 12:15:27 +0100 Subject: [PATCH 5/7] available simplified (stylecheck) Signed-off-by: Maxim Nesen --- .../jersey/netty/connector/internal/NettyInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index ccfa5ea212..e3d98271e7 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -143,7 +143,7 @@ protected void cleanup(boolean drain) { @Override public int available() throws IOException { - return buffer == null ? 0: buffer.remaining(); + return buffer == null ? 0 : buffer.remaining(); } public synchronized void publish(ByteBuf content) { From 55615c8d8105659c46cd48b8f33b6398dd37bf86 Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Tue, 18 Feb 2020 13:17:03 +0100 Subject: [PATCH 6/7] available synchronized Signed-off-by: Maxim Nesen --- .../jersey/netty/connector/internal/NettyInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index e3d98271e7..1268ddc368 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -142,7 +142,7 @@ protected void cleanup(boolean drain) { } @Override - public int available() throws IOException { + public synchronized int available() throws IOException { return buffer == null ? 0 : buffer.remaining(); } From c7d9ba7b19c54b76cd8c3ee2b684088d3f7bdcb2 Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Wed, 19 Feb 2020 07:43:05 +0100 Subject: [PATCH 7/7] complete, close, available, releaseByteBuf are not synchronized, cleanup synchronized Signed-off-by: Maxim Nesen --- .../netty/connector/internal/NettyInputStream.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index 1268ddc368..3da70198a7 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -87,14 +87,14 @@ public int read() throws IOException { } @Override - public synchronized void close() { + public void close() { releaseByteBuf(); cleanup(true); } - private synchronized void releaseByteBuf() { + private void releaseByteBuf() { if (current != null) { current.release(); } @@ -122,12 +122,12 @@ protected synchronized ByteBuffer awaitNext() { return current.nioBuffer().asReadOnlyBuffer(); } - public synchronized void complete(Throwable cause) { + public void complete(Throwable cause) { this.cause = cause; cleanup(cause != null); } - protected void cleanup(boolean drain) { + protected synchronized void cleanup(boolean drain) { if (drain) { while (!isList.isEmpty()) { isList.poll().release(); @@ -142,7 +142,7 @@ protected void cleanup(boolean drain) { } @Override - public synchronized int available() throws IOException { + public int available() throws IOException { return buffer == null ? 0 : buffer.remaining(); }