From 2084972393af8961e060c10a27dd1c293adfa9f9 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Tue, 9 Jul 2019 14:17:36 -0700 Subject: [PATCH] Allow NettyPerfClient to connect multiple frontends 1. log response based on response code range rather than single response code value. 2. allow perfClient to connect multiple frontends. --- .../tools/perf/rest/NettyPerfClient.java | 75 ++++++++++++------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java b/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java index 45f48b886b..d7a0de1087 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java @@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -47,7 +48,6 @@ import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; @@ -61,7 +61,9 @@ import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -78,7 +80,7 @@ /** - * A Netty based client to evaluate performance of the front end. + * A Netty based client to evaluate performance of the front end(s). */ public class NettyPerfClient { private static final String GET = "GET"; @@ -86,7 +88,7 @@ public class NettyPerfClient { private static final List SUPPORTED_REQUEST_TYPES = Arrays.asList(GET, POST); private static final Logger logger = LoggerFactory.getLogger(NettyPerfClient.class); - private final String host; + private final List hosts; private final int port; private final String path; private final List pathList; @@ -105,7 +107,9 @@ public class NettyPerfClient { private final JmxReporter reporter = JmxReporter.forRegistry(metricRegistry).build(); private final PerfClientMetrics perfClientMetrics = new PerfClientMetrics(metricRegistry); private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private final CountDownLatch channelConnectLatch = new CountDownLatch(1); private final AtomicLong totalRequestCount = new AtomicLong(0); + private final Map channelToHost = new HashMap<>(); private EventLoopGroup group; private long perfClientStartTime; @@ -116,7 +120,7 @@ public class NettyPerfClient { * Abstraction class for all the parameters that are expected. */ private static class ClientArgs { - final String host; + final List hosts; final Integer port; final String path; final String pathFileName; @@ -136,11 +140,12 @@ private static class ClientArgs { * Parses the arguments provided and extracts them into variables that can be retrieved. * @param args the command line argument list. */ - protected ClientArgs(String args[]) { + ClientArgs(String[] args) { OptionParser parser = new OptionParser(); - ArgumentAcceptingOptionSpec host = parser.accepts("host", "Front end host to contact") + ArgumentAcceptingOptionSpec hosts = parser.accepts("hosts", "Front end hosts(s) to contact") .withOptionalArg() - .describedAs("host") + .describedAs("hosts") + .withValuesSeparatedBy(",") .ofType(String.class) .defaultsTo("localhost"); ArgumentAcceptingOptionSpec port = parser.accepts("port", "Front end port") @@ -210,7 +215,7 @@ protected ClientArgs(String args[]) { .ofType(String.class); OptionSet options = parser.parse(args); - this.host = options.valueOf(host); + this.hosts = options.valuesOf(hosts); this.port = options.valueOf(port); this.path = options.valueOf(path); this.pathFileName = options.valueOf(pathFileName); @@ -226,7 +231,7 @@ protected ClientArgs(String args[]) { this.sslPropsFilePath = options.valueOf(sslPropsFilePath); validateArgs(); - logger.info("Host: {}", this.host); + logger.info("Hosts: {}", this.hosts); logger.info("Port: {}", this.port); logger.info("Path: {}", this.path); logger.info("Path File Name: {}", this.pathFileName); @@ -268,7 +273,7 @@ public static void main(String[] args) { try { ClientArgs clientArgs = new ClientArgs(args); final NettyPerfClient nettyPerfClient = - new NettyPerfClient(clientArgs.host, clientArgs.port, clientArgs.path, clientArgs.pathFileName, + new NettyPerfClient(clientArgs.hosts, clientArgs.port, clientArgs.path, clientArgs.pathFileName, clientArgs.concurrency, clientArgs.postBlobTotalSize, clientArgs.postBlobChunkSize, clientArgs.sslPropsFilePath, clientArgs.serviceId, clientArgs.targetAccountName, clientArgs.targetContainerName, clientArgs.customHeaders); @@ -294,7 +299,7 @@ public static void main(String[] args) { /** * Creates an instance of NettyPerfClient - * @param host host to contact. + * @param hosts a list of hosts to contact. * @param port port to contact. * @param path resource path. * @param concurrency number of parallel requests. @@ -308,10 +313,10 @@ public static void main(String[] args) { * @throws IOException * @throws GeneralSecurityException */ - private NettyPerfClient(String host, int port, String path, String pathFileName, int concurrency, Long totalSize, - Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName, + private NettyPerfClient(List hosts, int port, String path, String pathFileName, int concurrency, + Long totalSize, Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName, String targetContainerName, List customHeaders) throws Exception { - this.host = host; + this.hosts = hosts; this.port = port; this.path = path; if (pathFileName != null) { @@ -339,8 +344,8 @@ private NettyPerfClient(String host, int port, String path, String pathFileName, this.customHeaders.add(new Pair<>(customHeaderNameValue[0], customHeaderNameValue[1])); } } - logger.info("Instantiated NettyPerfClient which will interact with host {}, port {}, path {} with concurrency {}", - this.host, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths", + logger.info("Instantiated NettyPerfClient which will interact with hosts {}, port {}, path {} with concurrency {}", + this.hosts, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths", this.concurrency); } @@ -355,20 +360,30 @@ protected void start() { b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { + channelConnectLatch.await(); if (sslFactory != null) { + String host = channelToHost.get(ch); + if (host == null) { + throw new Exception("Unrecognized host!"); + } + logger.info("Initializing the channel to {}", host); ch.pipeline().addLast(new SslHandler(sslFactory.createSSLEngine(host, port, SSLFactory.Mode.CLIENT))); } ch.pipeline().addLast(new HttpClientCodec()).addLast(new ChunkedWriteHandler()).addLast(new ResponseHandler()); } }); - logger.info("Connecting to {}:{}", host, port); - b.remoteAddress(host, port); perfClientStartTime = System.currentTimeMillis(); - for (int i = 0; i < concurrency; i++) { - b.connect().addListener(channelConnectListener); + for (String host : hosts) { + logger.info("Connecting to {}:{}", host, port); + for (int i = 0; i < concurrency; i++) { + ChannelFuture future = b.connect(host, port); + channelToHost.put(future.channel(), host); + future.addListener(channelConnectListener); + } } + channelConnectLatch.countDown(); isRunning = true; - logger.info("Created {} channel(s)", concurrency); + logger.info("Created {} channel(s) per remote host", concurrency); logger.info("NettyPerfClient started"); } @@ -415,6 +430,7 @@ protected void awaitShutdown() throws InterruptedException { /** * Custom handler that sends out the request and receives and processes the response. + * TODO support GET-after-POST and DELETE-after-POST operations for race condition testing. */ private class ResponseHandler extends SimpleChannelInboundHandler { private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -446,8 +462,15 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject in) { perfClientMetrics.timeToFirstResponseChunkInMs.update(responseReceiveStart); logger.trace("Response receive has started on channel {}. Took {} ms", ctx.channel(), responseReceiveStart); response = (HttpResponse) in; - if (response.status() != HttpResponseStatus.OK) { - logger.error("Got Response code {} and headers were {}", response.status().code(), response.headers()); + if (response.status().code() >= 200 && response.status().code() < 300) { + logger.trace("Request succeeded"); + if (response.headers().contains("Location")) { + logger.info(response.headers().get("Location")); + } + } else if (response.status().code() >= 300 && response.status().code() < 400) { + logger.warn("Redirection code {} and headers were {}", response.status().code(), response.headers()); + } else { + logger.error("Response error code {} and headers were {}", response.status().code(), response.headers()); } } if (in instanceof HttpContent) { @@ -634,13 +657,15 @@ private class ChannelConnectListener implements GenericFutureListener