Skip to content

Commit

Permalink
netty connector/container modifications (OO)
Browse files Browse the repository at this point in the history
Signed-off-by: Maxim Nesen <maxim.nesen@oracle.com>
  • Loading branch information
senivam committed Feb 18, 2020
1 parent 38d0819 commit aea67d4
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 213 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,17 +20,14 @@
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;

import javax.ws.rs.core.Response;

import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.ClientResponse;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.netty.connector.internal.NettyInputStream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
Expand All @@ -39,8 +36,6 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
* Jersey implementation of Netty channel handler.
Expand All @@ -49,27 +44,46 @@
*/
class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {

private final NettyConnector connector;
private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();

private final AsyncConnectorCallback asyncConnectorCallback;
private final ClientRequest jerseyRequest;
private final CompletableFuture future;
private final CompletableFuture<ClientResponse> responseAvailable;
private final CompletableFuture<?> responseDone;

private NettyInputStream nis;
private ClientResponse jerseyResponse;

JerseyClientHandler(NettyConnector nettyConnector, ClientRequest request,
AsyncConnectorCallback callback, CompletableFuture future) {
this.connector = nettyConnector;
this.asyncConnectorCallback = callback;
JerseyClientHandler(ClientRequest request,
CompletableFuture<ClientResponse> responseAvailable,
CompletableFuture<?> responseDone) {
this.jerseyRequest = request;
this.future = future;
this.responseAvailable = responseAvailable;
this.responseDone = responseDone;
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
notifyResponse();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
// assert: no-op, if channel is closed after LastHttpContent has been consumed
responseDone.completeExceptionally(new IOException("Stream closed"));
}

protected void notifyResponse() {
if (jerseyResponse != null) {
ClientResponse cr = jerseyResponse;
jerseyResponse = null;
responseAvailable.complete(cr);
}
}

@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;

final ClientResponse jerseyResponse = new ClientResponse(new Response.StatusType() {
jerseyResponse = new ClientResponse(new Response.StatusType() {
@Override
public int getStatusCode() {
return response.status().code();
Expand All @@ -89,19 +103,15 @@ public String getReasonPhrase() {
for (Map.Entry<String, String> entry : response.headers().entries()) {
jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
}
isList.clear(); // clearing the content - possible leftover from previous request processing.

// request entity handling.
if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
|| HttpUtil.isTransferEncodingChunked(response)) {

ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
isList.add(Unpooled.EMPTY_BUFFER);
}
});
nis = new NettyInputStream();
responseDone.whenComplete((_r, th) -> nis.complete(th));

jerseyResponse.setEntityStream(new NettyInputStream(isList));
jerseyResponse.setEntityStream(nis);
} else {
jerseyResponse.setEntityStream(new InputStream() {
@Override
Expand All @@ -110,44 +120,29 @@ public int read() throws IOException {
}
});
}

if (asyncConnectorCallback != null) {
connector.executorService.execute(new Runnable() {
@Override
public void run() {
asyncConnectorCallback.response(jerseyResponse);
future.complete(jerseyResponse);
}
});
}

}
if (msg instanceof HttpContent) {

HttpContent httpContent = (HttpContent) msg;

ByteBuf content = httpContent.content();

if (content.isReadable()) {
content.retain();
isList.add(content);
nis.publish(content);
}

if (msg instanceof LastHttpContent) {
isList.add(Unpooled.EMPTY_BUFFER);
responseDone.complete(null);
notifyResponse();
}
}
}



@Override
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) {
if (asyncConnectorCallback != null) {
connector.executorService.execute(new Runnable() {
@Override
public void run() {
asyncConnectorCallback.failure(cause);
}
});
}
future.completeExceptionally(cause);
ctx.close();
responseDone.completeExceptionally(cause);
}
}
Loading

0 comments on commit aea67d4

Please sign in to comment.