Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Multipart with Buffered Entity and Netty Connector #5689

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.internal.util.collection.LazyValue;
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.internal.util.collection.Values;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;

Expand All @@ -103,6 +106,17 @@ class NettyConnector implements Connector {
final Client client;
final HashMap<String, ArrayList<Channel>> connections = new HashMap<>();

private static final LazyValue<String> NETTY_VERSION = Values.lazy(
(Value<String>) () -> {
String nettyVersion = null;
try {
nettyVersion = io.netty.util.Version.identify().values().iterator().next().artifactVersion();
} catch (Throwable t) {
nettyVersion = "4.1.x";
}
return "Netty " + nettyVersion;
});

// If HTTP keepalive is enabled the value of "http.maxConnections" determines the maximum number
// of idle connections that will be simultaneously kept alive, per destination.
private static final String HTTP_KEEPALIVE_STRING = System.getProperty("http.keepAlive");
Expand Down Expand Up @@ -524,7 +538,7 @@ private String buildPathWithQueryParameters(URI requestUri) {

@Override
public String getName() {
return "Netty 4.1.x";
return NETTY_VERSION.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -24,8 +24,10 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The Entity Writer is used to write entity in Netty. One implementation is delayed,
Expand Down Expand Up @@ -196,10 +198,7 @@ private void _flush() throws IOException {
for (Runnable runnable : delayedOps) {
runnable.run();
}

if (outputStream.b != null) {
writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len);
}
outputStream._flush();
}
}

Expand All @@ -216,7 +215,7 @@ public OutputStream getOutputStream() {

@Override
public long getLength() {
return outputStream.len - outputStream.off;
return outputStream.writeLen;
}

@Override
Expand All @@ -225,9 +224,9 @@ public Type getType() {
}

private class DelayedOutputStream extends OutputStream {
private byte[] b;
private int off;
private int len;
private final List<WriteAction> actions = new ArrayList<>();
private int writeLen = 0;
private AtomicBoolean streamFlushed = new AtomicBoolean(false);

@Override
public void write(int b) throws IOException {
Expand All @@ -241,15 +240,39 @@ public void write(byte[] b) throws IOException {

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!flushed && this.b == null) {
this.b = b;
this.off = off;
this.len = len;
if (!flushed) {
actions.add(new WriteAction(b, off, len));
writeLen += len;
} else {
DelayedEntityWriter.this._flush();
_flush();
writer.getOutputStream().write(b, off, len);
writer.getOutputStream().flush();
}
}

public void _flush() throws IOException {
if (streamFlushed.compareAndSet(false, true)) {
DelayedEntityWriter.this._flush();
for (WriteAction action : actions) {
action.run();
}
actions.clear();
}
}
}

private class WriteAction {
private final byte[] b;

private WriteAction(byte[] b, int off, int len) {
this.b = new byte[len]; // b passed in can be reused
System.arraycopy(b, off, this.b, 0, len);
}

public void run() throws IOException {
writer.getOutputStream().write(b, 0, b.length);
writer.getOutputStream().flush();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,11 +16,17 @@

package org.glassfish.jersey.tests.e2e.client.connector;

import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.HttpUrlConnectorProvider;
import org.glassfish.jersey.client.RequestEntityProcessing;
import org.glassfish.jersey.client.spi.ConnectorProvider;
import org.glassfish.jersey.jdk.connector.JdkConnectorProvider;
import org.glassfish.jersey.jetty.connector.JettyConnectorProvider;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
import org.glassfish.jersey.logging.LoggingFeature;
import org.glassfish.jersey.media.multipart.BodyPart;
Expand All @@ -32,6 +38,8 @@
import org.glassfish.jersey.test.JerseyTest;
import org.glassfish.jersey.test.TestProperties;
import org.glassfish.jersey.test.spi.TestHelper;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DynamicContainer;
import org.junit.jupiter.api.Test;
Expand All @@ -40,18 +48,27 @@
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;

public class MultiPartTest {
Expand Down Expand Up @@ -129,5 +146,72 @@ public void testMultipart() {
}
}
}

@Test
public void testNettyBufferedMultipart() {
// setDebugLevel(Level.FINEST);
ClientConfig config = new ClientConfig();

config.connectorProvider(new NettyConnectorProvider());
config.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.BUFFERED);
config.register(org.glassfish.jersey.media.multipart.MultiPartFeature.class);
config.register(new LoggingHandler(LogLevel.DEBUG));
config.register(new LoggingInterceptor());
config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 10);
config.property("jersey.config.client.logging.verbosity", LoggingFeature.Verbosity.PAYLOAD_TEXT);
config.property("jersey.config.client.logging.logger.level", Level.FINEST.toString());

Client client = ClientBuilder.newClient(config);

FormDataMultiPart formData = new FormDataMultiPart();
FormDataBodyPart bodyPart1 = new FormDataBodyPart("hello1", "{\"first\":\"firstLine\",\"second\":\"secondLine\"}",
MediaType.APPLICATION_JSON_TYPE);
formData.bodyPart(bodyPart1);
formData.bodyPart(new FormDataBodyPart("hello2",
"{\"first\":\"firstLine\",\"second\":\"secondLine\",\"third\":\"thirdLine\"}",
MediaType.APPLICATION_JSON_TYPE));
formData.bodyPart(new FormDataBodyPart("hello3",
"{\"first\":\"firstLine\",\"second\":\"secondLine\",\""
+ "second\":\"secondLine\",\"second\":\"secondLine\",\"second\":\"secondLine\"}",
MediaType.APPLICATION_JSON_TYPE));
formData.bodyPart(new FormDataBodyPart("plaintext", "hello"));

Response response1 = client.target(target().getUri()).path("upload")
.request()
.post(Entity.entity(formData, formData.getMediaType()));

MatcherAssert.assertThat(response1.getStatus(), Matchers.is(200));
MatcherAssert.assertThat(response1.readEntity(String.class),
Matchers.stringContainsInOrder("first", "firstLine", "second", "secondLine"));
response1.close();
client.close();
}

public static void setDebugLevel(Level newLvl) {
Logger rootLogger = LogManager.getLogManager().getLogger("");
Handler[] handlers = rootLogger.getHandlers();
rootLogger.setLevel(newLvl);
for (Handler h : handlers) {
h.setLevel(Level.ALL);
}
Logger nettyLogger = Logger.getLogger("io.netty");
nettyLogger.setLevel(Level.FINEST);
}

@Provider
public class LoggingInterceptor implements WriterInterceptor {

@Override
public void aroundWriteTo(WriterInterceptorContext context)
throws IOException, WebApplicationException {
try {
MultivaluedMap<String, Object> headers = context.getHeaders();
headers.forEach((key, val) -> System.out.println(key + ":" + val));
context.proceed();
} catch (Exception e) {
throw e;
}
}
}
}
}