From 897d3a3ef23c35e8a5c9dae0d9d31c8fb3da545c Mon Sep 17 00:00:00 2001 From: jansupol Date: Wed, 3 Jul 2024 10:40:10 +0200 Subject: [PATCH] Support Multipart with Buffered Entity and Netty Connector Signed-off-by: jansupol --- .../netty/connector/NettyConnector.java | 16 +++- .../connector/internal/NettyEntityWriter.java | 51 ++++++++--- .../e2e/client/connector/MultiPartTest.java | 86 ++++++++++++++++++- 3 files changed, 137 insertions(+), 16 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java index afb18ea365..d1de3ac1c0 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java @@ -88,6 +88,9 @@ import org.glassfish.jersey.client.spi.AsyncConnectorCallback; import org.glassfish.jersey.client.spi.Connector; import org.glassfish.jersey.innate.VirtualThreadUtil; +import org.glassfish.jersey.internal.util.collection.LazyValue; +import org.glassfish.jersey.internal.util.collection.Value; +import org.glassfish.jersey.internal.util.collection.Values; import org.glassfish.jersey.message.internal.OutboundMessageContext; import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter; @@ -103,6 +106,17 @@ class NettyConnector implements Connector { final Client client; final HashMap> connections = new HashMap<>(); + private static final LazyValue NETTY_VERSION = Values.lazy( + (Value) () -> { + String nettyVersion = null; + try { + nettyVersion = io.netty.util.Version.identify().values().iterator().next().artifactVersion(); + } catch (Throwable t) { + nettyVersion = "4.1.x"; + } + return "Netty " + nettyVersion; + }); + // If HTTP keepalive is enabled the value of "http.maxConnections" determines the maximum number // of idle connections that will be simultaneously kept alive, per destination. private static final String HTTP_KEEPALIVE_STRING = System.getProperty("http.keepAlive"); @@ -524,7 +538,7 @@ private String buildPathWithQueryParameters(URI requestUri) { @Override public String getName() { - return "Netty 4.1.x"; + return NETTY_VERSION.get(); } @Override diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java index a9e70409f8..bcd3fd868c 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -24,8 +24,10 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * The Entity Writer is used to write entity in Netty. One implementation is delayed, @@ -196,10 +198,7 @@ private void _flush() throws IOException { for (Runnable runnable : delayedOps) { runnable.run(); } - - if (outputStream.b != null) { - writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len); - } + outputStream._flush(); } } @@ -216,7 +215,7 @@ public OutputStream getOutputStream() { @Override public long getLength() { - return outputStream.len - outputStream.off; + return outputStream.writeLen; } @Override @@ -225,9 +224,9 @@ public Type getType() { } private class DelayedOutputStream extends OutputStream { - private byte[] b; - private int off; - private int len; + private final List actions = new ArrayList<>(); + private int writeLen = 0; + private AtomicBoolean streamFlushed = new AtomicBoolean(false); @Override public void write(int b) throws IOException { @@ -241,15 +240,39 @@ public void write(byte[] b) throws IOException { @Override public void write(byte[] b, int off, int len) throws IOException { - if (!flushed && this.b == null) { - this.b = b; - this.off = off; - this.len = len; + if (!flushed) { + actions.add(new WriteAction(b, off, len)); + writeLen += len; } else { - DelayedEntityWriter.this._flush(); + _flush(); writer.getOutputStream().write(b, off, len); + writer.getOutputStream().flush(); + } + } + + public void _flush() throws IOException { + if (streamFlushed.compareAndSet(false, true)) { + DelayedEntityWriter.this._flush(); + for (WriteAction action : actions) { + action.run(); + } + actions.clear(); } } } + + private class WriteAction { + private final byte[] b; + + private WriteAction(byte[] b, int off, int len) { + this.b = new byte[len]; // b passed in can be reused + System.arraycopy(b, off, this.b, 0, len); + } + + public void run() throws IOException { + writer.getOutputStream().write(b, 0, b.length); + writer.getOutputStream().flush(); + } + } } } diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/MultiPartTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/MultiPartTest.java index 613925cd31..54e70c0a47 100644 --- a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/MultiPartTest.java +++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/MultiPartTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -16,11 +16,17 @@ package org.glassfish.jersey.tests.e2e.client.connector; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.HttpUrlConnectorProvider; +import org.glassfish.jersey.client.RequestEntityProcessing; import org.glassfish.jersey.client.spi.ConnectorProvider; import org.glassfish.jersey.jdk.connector.JdkConnectorProvider; import org.glassfish.jersey.jetty.connector.JettyConnectorProvider; +import org.glassfish.jersey.media.multipart.FormDataBodyPart; +import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.netty.connector.NettyConnectorProvider; import org.glassfish.jersey.logging.LoggingFeature; import org.glassfish.jersey.media.multipart.BodyPart; @@ -32,6 +38,8 @@ import org.glassfish.jersey.test.JerseyTest; import org.glassfish.jersey.test.TestProperties; import org.glassfish.jersey.test.spi.TestHelper; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DynamicContainer; import org.junit.jupiter.api.Test; @@ -40,18 +48,27 @@ import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.core.Application; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.ext.WriterInterceptor; +import javax.ws.rs.ext.WriterInterceptorContext; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.logging.Handler; import java.util.logging.Level; +import java.util.logging.LogManager; import java.util.logging.Logger; public class MultiPartTest { @@ -129,5 +146,72 @@ public void testMultipart() { } } } + + @Test + public void testNettyBufferedMultipart() { +// setDebugLevel(Level.FINEST); + ClientConfig config = new ClientConfig(); + + config.connectorProvider(new NettyConnectorProvider()); + config.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.BUFFERED); + config.register(org.glassfish.jersey.media.multipart.MultiPartFeature.class); + config.register(new LoggingHandler(LogLevel.DEBUG)); + config.register(new LoggingInterceptor()); + config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 10); + config.property("jersey.config.client.logging.verbosity", LoggingFeature.Verbosity.PAYLOAD_TEXT); + config.property("jersey.config.client.logging.logger.level", Level.FINEST.toString()); + + Client client = ClientBuilder.newClient(config); + + FormDataMultiPart formData = new FormDataMultiPart(); + FormDataBodyPart bodyPart1 = new FormDataBodyPart("hello1", "{\"first\":\"firstLine\",\"second\":\"secondLine\"}", + MediaType.APPLICATION_JSON_TYPE); + formData.bodyPart(bodyPart1); + formData.bodyPart(new FormDataBodyPart("hello2", + "{\"first\":\"firstLine\",\"second\":\"secondLine\",\"third\":\"thirdLine\"}", + MediaType.APPLICATION_JSON_TYPE)); + formData.bodyPart(new FormDataBodyPart("hello3", + "{\"first\":\"firstLine\",\"second\":\"secondLine\",\"" + + "second\":\"secondLine\",\"second\":\"secondLine\",\"second\":\"secondLine\"}", + MediaType.APPLICATION_JSON_TYPE)); + formData.bodyPart(new FormDataBodyPart("plaintext", "hello")); + + Response response1 = client.target(target().getUri()).path("upload") + .request() + .post(Entity.entity(formData, formData.getMediaType())); + + MatcherAssert.assertThat(response1.getStatus(), Matchers.is(200)); + MatcherAssert.assertThat(response1.readEntity(String.class), + Matchers.stringContainsInOrder("first", "firstLine", "second", "secondLine")); + response1.close(); + client.close(); + } + + public static void setDebugLevel(Level newLvl) { + Logger rootLogger = LogManager.getLogManager().getLogger(""); + Handler[] handlers = rootLogger.getHandlers(); + rootLogger.setLevel(newLvl); + for (Handler h : handlers) { + h.setLevel(Level.ALL); + } + Logger nettyLogger = Logger.getLogger("io.netty"); + nettyLogger.setLevel(Level.FINEST); + } + + @Provider + public class LoggingInterceptor implements WriterInterceptor { + + @Override + public void aroundWriteTo(WriterInterceptorContext context) + throws IOException, WebApplicationException { + try { + MultivaluedMap headers = context.getHeaders(); + headers.forEach((key, val) -> System.out.println(key + ":" + val)); + context.proceed(); + } catch (Exception e) { + throw e; + } + } + } } }