Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow NettyPerfClient to connect multiple frontends #1217

Merged
merged 1 commit into from
Jul 17, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
Expand All @@ -47,7 +48,6 @@
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
Expand All @@ -61,7 +61,9 @@
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand All @@ -78,15 +80,15 @@


/**
* A Netty based client to evaluate performance of the front end.
* A Netty based client to evaluate performance of the front end(s).
*/
public class NettyPerfClient {
private static final String GET = "GET";
private static final String POST = "POST";
private static final List<String> SUPPORTED_REQUEST_TYPES = Arrays.asList(GET, POST);
private static final Logger logger = LoggerFactory.getLogger(NettyPerfClient.class);

private final String host;
private final List<String> hosts;
private final int port;
private final String path;
private final List<String> pathList;
Expand All @@ -105,7 +107,9 @@ public class NettyPerfClient {
private final JmxReporter reporter = JmxReporter.forRegistry(metricRegistry).build();
private final PerfClientMetrics perfClientMetrics = new PerfClientMetrics(metricRegistry);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final CountDownLatch channelConnectLatch = new CountDownLatch(1);
private final AtomicLong totalRequestCount = new AtomicLong(0);
private final Map<Channel, String> channelToHost = new HashMap<>();

private EventLoopGroup group;
private long perfClientStartTime;
Expand All @@ -116,7 +120,7 @@ public class NettyPerfClient {
* Abstraction class for all the parameters that are expected.
*/
private static class ClientArgs {
final String host;
final List<String> hosts;
final Integer port;
final String path;
final String pathFileName;
Expand All @@ -136,11 +140,12 @@ private static class ClientArgs {
* Parses the arguments provided and extracts them into variables that can be retrieved.
* @param args the command line argument list.
*/
protected ClientArgs(String args[]) {
ClientArgs(String[] args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Use 'protected' to be more strict?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore this. Looks good.

OptionParser parser = new OptionParser();
ArgumentAcceptingOptionSpec<String> host = parser.accepts("host", "Front end host to contact")
ArgumentAcceptingOptionSpec<String> hosts = parser.accepts("hosts", "Front end hosts(s) to contact")
.withOptionalArg()
.describedAs("host")
.describedAs("hosts")
.withValuesSeparatedBy(",")
.ofType(String.class)
.defaultsTo("localhost");
ArgumentAcceptingOptionSpec<Integer> port = parser.accepts("port", "Front end port")
Expand Down Expand Up @@ -210,7 +215,7 @@ protected ClientArgs(String args[]) {
.ofType(String.class);

OptionSet options = parser.parse(args);
this.host = options.valueOf(host);
this.hosts = options.valuesOf(hosts);
this.port = options.valueOf(port);
this.path = options.valueOf(path);
this.pathFileName = options.valueOf(pathFileName);
Expand All @@ -226,7 +231,7 @@ protected ClientArgs(String args[]) {
this.sslPropsFilePath = options.valueOf(sslPropsFilePath);
validateArgs();

logger.info("Host: {}", this.host);
logger.info("Hosts: {}", this.hosts);
logger.info("Port: {}", this.port);
logger.info("Path: {}", this.path);
logger.info("Path File Name: {}", this.pathFileName);
Expand Down Expand Up @@ -268,7 +273,7 @@ public static void main(String[] args) {
try {
ClientArgs clientArgs = new ClientArgs(args);
final NettyPerfClient nettyPerfClient =
new NettyPerfClient(clientArgs.host, clientArgs.port, clientArgs.path, clientArgs.pathFileName,
new NettyPerfClient(clientArgs.hosts, clientArgs.port, clientArgs.path, clientArgs.pathFileName,
clientArgs.concurrency, clientArgs.postBlobTotalSize, clientArgs.postBlobChunkSize,
clientArgs.sslPropsFilePath, clientArgs.serviceId, clientArgs.targetAccountName,
clientArgs.targetContainerName, clientArgs.customHeaders);
Expand All @@ -294,7 +299,7 @@ public static void main(String[] args) {

/**
* Creates an instance of NettyPerfClient
* @param host host to contact.
* @param hosts a list of hosts to contact.
* @param port port to contact.
* @param path resource path.
* @param concurrency number of parallel requests.
Expand All @@ -308,10 +313,10 @@ public static void main(String[] args) {
* @throws IOException
* @throws GeneralSecurityException
*/
private NettyPerfClient(String host, int port, String path, String pathFileName, int concurrency, Long totalSize,
Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName,
private NettyPerfClient(List<String> hosts, int port, String path, String pathFileName, int concurrency,
Long totalSize, Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName,
String targetContainerName, List<String> customHeaders) throws Exception {
this.host = host;
this.hosts = hosts;
this.port = port;
this.path = path;
if (pathFileName != null) {
Expand Down Expand Up @@ -339,8 +344,8 @@ private NettyPerfClient(String host, int port, String path, String pathFileName,
this.customHeaders.add(new Pair<>(customHeaderNameValue[0], customHeaderNameValue[1]));
}
}
logger.info("Instantiated NettyPerfClient which will interact with host {}, port {}, path {} with concurrency {}",
this.host, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths",
logger.info("Instantiated NettyPerfClient which will interact with hosts {}, port {}, path {} with concurrency {}",
this.hosts, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths",
this.concurrency);
}

Expand All @@ -355,20 +360,30 @@ protected void start() {
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
channelConnectLatch.await();
if (sslFactory != null) {
String host = channelToHost.get(ch);
if (host == null) {
throw new Exception("Unrecognized host!");
}
logger.info("Initializing the channel to {}", host);
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved
ch.pipeline().addLast(new SslHandler(sslFactory.createSSLEngine(host, port, SSLFactory.Mode.CLIENT)));
}
ch.pipeline().addLast(new HttpClientCodec()).addLast(new ChunkedWriteHandler()).addLast(new ResponseHandler());
}
});
logger.info("Connecting to {}:{}", host, port);
b.remoteAddress(host, port);
perfClientStartTime = System.currentTimeMillis();
for (int i = 0; i < concurrency; i++) {
b.connect().addListener(channelConnectListener);
for (String host : hosts) {
logger.info("Connecting to {}:{}", host, port);
for (int i = 0; i < concurrency; i++) {
ChannelFuture future = b.connect(host, port);
channelToHost.put(future.channel(), host);
future.addListener(channelConnectListener);
}
}
channelConnectLatch.countDown();
isRunning = true;
logger.info("Created {} channel(s)", concurrency);
logger.info("Created {} channel(s) per remote host", concurrency);
logger.info("NettyPerfClient started");
}

Expand Down Expand Up @@ -415,6 +430,7 @@ protected void awaitShutdown() throws InterruptedException {

/**
* Custom handler that sends out the request and receives and processes the response.
* TODO support GET-after-POST and DELETE-after-POST operations for race condition testing.
*/
private class ResponseHandler extends SimpleChannelInboundHandler<HttpObject> {
private final Logger logger = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -446,8 +462,15 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject in) {
perfClientMetrics.timeToFirstResponseChunkInMs.update(responseReceiveStart);
logger.trace("Response receive has started on channel {}. Took {} ms", ctx.channel(), responseReceiveStart);
response = (HttpResponse) in;
if (response.status() != HttpResponseStatus.OK) {
logger.error("Got Response code {} and headers were {}", response.status().code(), response.headers());
if (response.status().code() >= 200 && response.status().code() < 300) {
logger.trace("Request succeeded");
if (response.headers().contains("Location")) {
logger.info(response.headers().get("Location"));
}
} else if (response.status().code() >= 300 && response.status().code() < 400) {
logger.warn("Redirection code {} and headers were {}", response.status().code(), response.headers());
} else {
logger.error("Response error code {} and headers were {}", response.status().code(), response.headers());
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (in instanceof HttpContent) {
Expand Down Expand Up @@ -634,13 +657,15 @@ private class ChannelConnectListener implements GenericFutureListener<ChannelFut
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
perfClientMetrics.connectError.inc();
logger.error("Channel {} to {}:{} could not be connected.", future.channel(), host, port, future.cause());
Channel channel = future.channel();
logger.error("Channel {} to {}:{} could not be connected.", channel, channelToHost.get(channel), port,
future.cause());
}
}
}

/**
* Metrics that track peformance.
* Metrics that track performance.
*/
private static class PerfClientMetrics {
public final Meter bytesReceiveRate;
Expand Down