diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java b/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java index d7a0de1087..c03abec50a 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java @@ -78,6 +78,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.github.ambry.utils.Utils.*; + /** * A Netty based client to evaluate performance of the front end(s). @@ -110,6 +112,12 @@ public class NettyPerfClient { private final CountDownLatch channelConnectLatch = new CountDownLatch(1); private final AtomicLong totalRequestCount = new AtomicLong(0); private final Map channelToHost = new HashMap<>(); + private final Map hostToRequestCount = new HashMap<>(); + private final Map hostToSleepTime = new HashMap<>(); + private final SleepTimeUpdater updater = new SleepTimeUpdater(); + private final ScheduledExecutorService backgroundScheduler; + private final int targetQPS; + private long sleepTimeInMs = 0; private EventLoopGroup group; private long perfClientStartTime; @@ -133,6 +141,7 @@ private static class ClientArgs { final List customHeaders; final String serviceId; final Integer testTime; + final Integer targetQPS; final String sslPropsFilePath; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -208,6 +217,11 @@ private static class ClientArgs { .withOptionalArg() .describedAs("testTime") .ofType(Integer.class); + ArgumentAcceptingOptionSpec targetQPS = parser.accepts("targetQPS", "The target QPS for each frontend.") + .withOptionalArg() + .describedAs("targetQPS") + .ofType(Integer.class) + .defaultsTo(0); ArgumentAcceptingOptionSpec sslPropsFilePath = parser.accepts("sslPropsFilePath", "The path to the properties file with SSL settings. Set to enable SSL.") .withOptionalArg() @@ -228,6 +242,7 @@ private static class ClientArgs { this.customHeaders = options.valuesOf(customHeader); this.serviceId = options.valueOf(serviceId); this.testTime = options.valueOf(testTime); + this.targetQPS = options.valueOf(targetQPS); this.sslPropsFilePath = options.valueOf(sslPropsFilePath); validateArgs(); @@ -241,6 +256,7 @@ private static class ClientArgs { logger.info("Post blob chunk size: {}", this.postBlobChunkSize); logger.info("SSL properties file path: {}", this.sslPropsFilePath); logger.info("Custom Headers: {}", this.customHeaders); + logger.info("Target QPS: {}", this.targetQPS); } /** @@ -262,6 +278,9 @@ private void validateArgs() { if (serviceId == null || serviceId.isEmpty()) { throw new IllegalArgumentException("serviceId cannot be empty"); } + if (targetQPS < 0) { + throw new IllegalArgumentException("Target QPS cannot be negative value"); + } } } @@ -276,7 +295,7 @@ public static void main(String[] args) { new NettyPerfClient(clientArgs.hosts, clientArgs.port, clientArgs.path, clientArgs.pathFileName, clientArgs.concurrency, clientArgs.postBlobTotalSize, clientArgs.postBlobChunkSize, clientArgs.sslPropsFilePath, clientArgs.serviceId, clientArgs.targetAccountName, - clientArgs.targetContainerName, clientArgs.customHeaders); + clientArgs.targetContainerName, clientArgs.customHeaders, clientArgs.targetQPS); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.info("Received shutdown signal. Requesting NettyPerfClient shutdown"); @@ -310,12 +329,14 @@ public static void main(String[] args) { * @param targetAccountName target account name for POST * @param targetContainerName target container name for POST * @param customHeaders list of http headers name:value to be added. + * @param targetQPS the target QPS expected on single frontend. If not specified, targetQPS uses default value 0, which + * means the client attempts to issue request as fast as it can. * @throws IOException * @throws GeneralSecurityException */ private NettyPerfClient(List hosts, int port, String path, String pathFileName, int concurrency, Long totalSize, Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName, - String targetContainerName, List customHeaders) throws Exception { + String targetContainerName, List customHeaders, int targetQPS) throws Exception { this.hosts = hosts; this.port = port; this.path = path; @@ -325,6 +346,8 @@ private NettyPerfClient(List hosts, int port, String path, String pathFi this.pathList = null; } this.concurrency = concurrency; + this.targetQPS = targetQPS; + sleepTimeInMs = this.targetQPS > 0 ? 1000L * concurrency / this.targetQPS : 0; if (chunkSize != null) { this.totalSize = totalSize; chunk = new byte[chunkSize]; @@ -344,6 +367,8 @@ private NettyPerfClient(List hosts, int port, String path, String pathFi this.customHeaders.add(new Pair<>(customHeaderNameValue[0], customHeaderNameValue[1])); } } + // only when target QPS is specified, the client would create background scheduler to update sleep time. + backgroundScheduler = targetQPS > 0 ? Utils.newScheduler(1, false) : null; logger.info("Instantiated NettyPerfClient which will interact with hosts {}, port {}, path {} with concurrency {}", this.hosts, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths", this.concurrency); @@ -380,8 +405,14 @@ public void initChannel(SocketChannel ch) throws Exception { channelToHost.put(future.channel(), host); future.addListener(channelConnectListener); } + hostToRequestCount.put(host, new AtomicLong(0)); + hostToSleepTime.put(host, sleepTimeInMs); } channelConnectLatch.countDown(); + if (backgroundScheduler != null) { + backgroundScheduler.scheduleAtFixedRate(updater, 0, 1, TimeUnit.SECONDS); + logger.info("Background scheduler is instantiated to update sleep time."); + } isRunning = true; logger.info("Created {} channel(s) per remote host", concurrency); logger.info("NettyPerfClient started"); @@ -402,6 +433,9 @@ protected void shutdown() { } else { logger.info("NettyPerfClient shutdown complete"); } + if (backgroundScheduler != null) { + shutDownExecutorService(backgroundScheduler, 5, TimeUnit.SECONDS); + } } catch (InterruptedException e) { logger.error("NettyPerfClient shutdown interrupted", e); } finally { @@ -424,10 +458,38 @@ protected void shutdown() { * Blocking function to wait on the NettyPerfClient shutting down. * @throws InterruptedException */ - protected void awaitShutdown() throws InterruptedException { + private void awaitShutdown() throws InterruptedException { shutdownLatch.await(); } + /** + * A periodic updater that adjusts sleep time for connections based on current QPS and target QPS. + */ + private class SleepTimeUpdater implements Runnable { + @Override + public void run() { + for (Map.Entry hostAndRequestCount : hostToRequestCount.entrySet()) { + String hostname = hostAndRequestCount.getKey(); + AtomicLong requestCount = hostAndRequestCount.getValue(); + Long hostSleepTime = hostToSleepTime.get(hostname); + long currentQPS = requestCount.get(); + logger.info("For host {}, request count per sec is {}, current sleep time is {} ms", hostname, currentQPS, + hostSleepTime); + if (targetQPS > 0 && currentQPS > 0) { + // formula to update sleep time is: new sleep time = (currentQPS / targetQPS) * (current sleep time) + long newSleepTime = currentQPS * hostSleepTime / targetQPS; + // if currentQPS is already higher than target one and new sleep time still equals current sleep time, we + // explicitly plus one more millisecond. + newSleepTime = newSleepTime + ((currentQPS > targetQPS && newSleepTime == hostSleepTime) ? 1 : 0); + hostSleepTime = newSleepTime; + requestCount.set(0); + logger.info("Updated sleep time is {} ms for host {}", hostSleepTime, hostname); + } + hostToSleepTime.put(hostname, hostSleepTime); + } + } + } + /** * Custom handler that sends out the request and receives and processes the response. * TODO support GET-after-POST and DELETE-after-POST operations for race condition testing. @@ -533,10 +595,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { private void sendRequest(ChannelHandlerContext ctx) { requestId++; long globalId = totalRequestCount.incrementAndGet(); - logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId, - ctx.channel()); + Channel channel = ctx.channel(); + String hostname = ((SocketChannel) channel).remoteAddress().getHostName(); + hostToRequestCount.get(hostname).incrementAndGet(); + logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId, channel); reset(); perfClientMetrics.requestRate.mark(); + try { + Thread.sleep(hostToSleepTime.get(hostname)); + } catch (InterruptedException e) { + logger.error("Interrupted with exception:", e); + } ctx.writeAndFlush(request); if (request.method().equals(HttpMethod.POST)) { ctx.writeAndFlush(chunkedInput);