Skip to content

Commit

Permalink
feat: netty instrumentation placeholder and netty server get, post tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene Orlovsky committed Oct 28, 2024
1 parent 155fb8d commit 1102f5b
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 1 deletion.
1 change: 1 addition & 0 deletions agent/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
javaagentLibs(project(":instrumentation:servlet:servlet-3.0:javaagent"))
javaagentLibs(project(":instrumentation:spring:spring-webmvc:spring-webmvc-3.1:javaagent"))
javaagentLibs(project(":instrumentation:spring:spring-webflux:spring-webflux-5.0:javaagent"))
javaagentLibs(project(":instrumentation:netty:netty-4.0:javaagent"))
javaagentLibs(project(":instrumentation:storm:javaagent"))
javaagentLibs(project(":instrumentation:lettuce:lettuce-5.1:javaagent"))

Expand Down
34 changes: 34 additions & 0 deletions instrumentation/netty/netty-4.0/javaagent/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group.set("io.netty")
module.set("netty-codec-http")
versions.set("[4.0.0.Final,4.1.0.Final)")
assertInverse.set(true)
}
pass {
group.set("io.netty")
module.set("netty-all")
versions.set("[4.0.0.Final,4.1.0.Final)")
excludeDependency("io.netty:netty-tcnative")
assertInverse.set(true)
}
fail {
group.set("io.netty")
module.set("netty")
versions.set("[,]")
}
}

dependencies {
implementation("io.netty:netty-codec-http:4.0.0.Final")
// implementation(project(":instrumentation:netty:netty-4-common:javaagent"))
// implementation(project(":instrumentation:netty:netty-4-common:library"))
// implementation(project(":instrumentation:netty:netty-common:library"))

// testInstrumentation(project(":instrumentation:netty:netty-3.8:javaagent"))
// testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))

// latestDepTestLibrary("io.netty:netty-codec-http:4.0.+") // see netty-4.1 module
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.lumigo.javaagent.instrumentation.netty.v4_0;

public final class HttpHeaderNames {
public static final String HOST = "Host";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String CONTENT_TYPE = "Content-Type";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.lumigo.javaagent.instrumentation.netty.v4_0;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

public class NettyHttpClient {

private final String host;
private final int port;
private final NioEventLoopGroup clientGroup;
private FullHttpResponse response;

public NettyHttpClient(String host, int port) {
this.host = host;
this.port = port;
this.clientGroup = new NioEventLoopGroup();
}

public FullHttpResponse sendGetRequest() throws InterruptedException {
return sendRequest(HttpMethod.GET, null);
}

public FullHttpResponse sendPostRequest(String content) throws InterruptedException {
return sendRequest(HttpMethod.POST, content);
}

private FullHttpResponse sendRequest(HttpMethod method, String content) throws InterruptedException {
ChannelFuture future = null;
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(new SimpleChannelInboundHandler<FullHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
response = msg.retain(); // Retain a copy of the response
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace(); // Log the exception for debugging
ctx.close(); // Close the context on error to free resources
}
});
}
});

future = bootstrap.connect(host, port).sync();
System.out.println("Client connected to server at " + host + ":" + port);

HttpRequest request;
if (method == HttpMethod.POST && content != null) {
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, "/", Unpooled.copiedBuffer(contentBytes));
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, contentBytes.length);
} else {
request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, "/");
}
request.headers().set(HttpHeaderNames.HOST, host);

// Send the request and wait for the response
future.channel().writeAndFlush(request).sync();

// Set a timeout for awaiting channel closure
if (!future.channel().closeFuture().await(30, TimeUnit.SECONDS)) {
System.err.println("Channel did not close in 30 seconds, timing out.");
}

return response;
} finally {
if (future != null && future.channel().isOpen()) {
future.channel().close();
}
clientGroup.shutdownGracefully();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.lumigo.javaagent.instrumentation.netty.v4_0;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;

public class NettyServer {
private final int port;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelFuture channelFuture;

public NettyServer(int port) {
this.port = port;
}

public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
System.out.println("Initializing channel pipeline");
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(8192)); // Add this line
ch.pipeline().addLast(new RequestsHandler());
}
});

channelFuture = serverBootstrap.bind(port).sync(); // Start the server
System.out.println("Server started on port: " + port);
}

public void stop() {
if (channelFuture != null) {
channelFuture.channel().close();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.lumigo.javaagent.instrumentation.netty.v4_0;

import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class NettyTest {
@RegisterExtension
static final AgentInstrumentationExtension instrumentation =
AgentInstrumentationExtension.create();

private static NettyServer server;
private static int serverPort;

@BeforeAll
public static void setup() throws InterruptedException {
serverPort = PortUtils.findOpenPort();
server = new NettyServer(serverPort);
server.start();
Thread.sleep(1000); // Wait briefly to ensure the server is fully initialized
}

@Test
public void testHelloWorldGetResponse() throws InterruptedException {
NettyHttpClient client = new NettyHttpClient("localhost", serverPort);
var response = client.sendGetRequest(); // New GET request method

// Validate the HTTP response
assertEquals(HttpResponseStatus.OK, response.getStatus());
String responseBody = response.content().toString(StandardCharsets.UTF_8);
assertEquals("Hello, World!", responseBody);
}

@Test
public void testEchoPostResponse() throws InterruptedException {
NettyHttpClient client = new NettyHttpClient("localhost", serverPort);
String testContent = "This is a test message";
var response = client.sendPostRequest(testContent); // New POST request method

// Validate the HTTP response
assertEquals(HttpResponseStatus.OK, response.getStatus());
String responseBody = response.content().toString(StandardCharsets.UTF_8);
assertEquals(testContent, responseBody); // Should echo back the posted content
}

@AfterAll
public static void teardown() {
server.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.lumigo.javaagent.instrumentation.netty.v4_0;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;

import java.nio.charset.StandardCharsets;

public class RequestsHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
System.out.println("Received request: " + request.getMethod() + " " + request.getUri());
FullHttpResponse response;

if (request.getMethod() == HttpMethod.GET) {
System.out.println("Handling GET request");
response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer("Hello, World!", StandardCharsets.UTF_8)
);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

} else if (request.getMethod() == HttpMethod.POST) {
System.out.println("Handling POST request");
String requestBody = request.content().toString(StandardCharsets.UTF_8);
response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer(requestBody, StandardCharsets.UTF_8)
);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

} else {
System.out.println("Unsupported method: " + request.getMethod());
response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED,
Unpooled.copiedBuffer("Method Not Allowed", StandardCharsets.UTF_8)
);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
}

// Write the response and close the connection
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Connection opened: " + ctx.channel().remoteAddress());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Connection closed: " + ctx.channel().remoteAddress());
super.channelInactive(ctx);
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

class TestServlets {
class TestServlets {
@WebServlet
public static class EchoStream_single_byte_print extends HttpServlet {
@Override
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ include "instrumentation:spring:spring-webmvc:spring-webmvc-3.1:javaagent"
include "instrumentation:spring:spring-webmvc:testing"
include "instrumentation:spring:spring-webflux:spring-webflux-5.0:javaagent"
include "instrumentation:spring:spring-webflux:testing"
include "instrumentation:netty:netty-4.0:javaagent"
include "instrumentation:storm:javaagent"
include 'instrumentation:storm:testing'
include 'instrumentation:lettuce:lettuce-5.1:library'
Expand Down

0 comments on commit 1102f5b

Please sign in to comment.