Skip to content

Commit

Permalink
Support QPS throttling in NettyPerfClient (#1220)
Browse files Browse the repository at this point in the history
Added some dynamic throttling logic in NettyPerfClient to issue POST
requests at given QPS.
  • Loading branch information
jsjtzyy authored and justinlin-linkedin committed Jul 22, 2019
1 parent 5437b5f commit 379d881
Showing 1 changed file with 74 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.utils.Utils.*;


/**
* A Netty based client to evaluate performance of the front end(s).
Expand Down Expand Up @@ -110,6 +112,12 @@ public class NettyPerfClient {
private final CountDownLatch channelConnectLatch = new CountDownLatch(1);
private final AtomicLong totalRequestCount = new AtomicLong(0);
private final Map<Channel, String> channelToHost = new HashMap<>();
private final Map<String, AtomicLong> hostToRequestCount = new HashMap<>();
private final Map<String, Long> hostToSleepTime = new HashMap<>();
private final SleepTimeUpdater updater = new SleepTimeUpdater();
private final ScheduledExecutorService backgroundScheduler;
private final int targetQPS;
private long sleepTimeInMs = 0;

private EventLoopGroup group;
private long perfClientStartTime;
Expand All @@ -133,6 +141,7 @@ private static class ClientArgs {
final List<String> customHeaders;
final String serviceId;
final Integer testTime;
final Integer targetQPS;
final String sslPropsFilePath;
private final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -208,6 +217,11 @@ private static class ClientArgs {
.withOptionalArg()
.describedAs("testTime")
.ofType(Integer.class);
ArgumentAcceptingOptionSpec<Integer> targetQPS = parser.accepts("targetQPS", "The target QPS for each frontend.")
.withOptionalArg()
.describedAs("targetQPS")
.ofType(Integer.class)
.defaultsTo(0);
ArgumentAcceptingOptionSpec<String> sslPropsFilePath =
parser.accepts("sslPropsFilePath", "The path to the properties file with SSL settings. Set to enable SSL.")
.withOptionalArg()
Expand All @@ -228,6 +242,7 @@ private static class ClientArgs {
this.customHeaders = options.valuesOf(customHeader);
this.serviceId = options.valueOf(serviceId);
this.testTime = options.valueOf(testTime);
this.targetQPS = options.valueOf(targetQPS);
this.sslPropsFilePath = options.valueOf(sslPropsFilePath);
validateArgs();

Expand All @@ -241,6 +256,7 @@ private static class ClientArgs {
logger.info("Post blob chunk size: {}", this.postBlobChunkSize);
logger.info("SSL properties file path: {}", this.sslPropsFilePath);
logger.info("Custom Headers: {}", this.customHeaders);
logger.info("Target QPS: {}", this.targetQPS);
}

/**
Expand All @@ -262,6 +278,9 @@ private void validateArgs() {
if (serviceId == null || serviceId.isEmpty()) {
throw new IllegalArgumentException("serviceId cannot be empty");
}
if (targetQPS < 0) {
throw new IllegalArgumentException("Target QPS cannot be negative value");
}
}
}

Expand All @@ -276,7 +295,7 @@ public static void main(String[] args) {
new NettyPerfClient(clientArgs.hosts, clientArgs.port, clientArgs.path, clientArgs.pathFileName,
clientArgs.concurrency, clientArgs.postBlobTotalSize, clientArgs.postBlobChunkSize,
clientArgs.sslPropsFilePath, clientArgs.serviceId, clientArgs.targetAccountName,
clientArgs.targetContainerName, clientArgs.customHeaders);
clientArgs.targetContainerName, clientArgs.customHeaders, clientArgs.targetQPS);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Received shutdown signal. Requesting NettyPerfClient shutdown");
Expand Down Expand Up @@ -310,12 +329,14 @@ public static void main(String[] args) {
* @param targetAccountName target account name for POST
* @param targetContainerName target container name for POST
* @param customHeaders list of http headers name:value to be added.
* @param targetQPS the target QPS expected on single frontend. If not specified, targetQPS uses default value 0, which
* means the client attempts to issue request as fast as it can.
* @throws IOException
* @throws GeneralSecurityException
*/
private NettyPerfClient(List<String> hosts, int port, String path, String pathFileName, int concurrency,
Long totalSize, Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName,
String targetContainerName, List<String> customHeaders) throws Exception {
String targetContainerName, List<String> customHeaders, int targetQPS) throws Exception {
this.hosts = hosts;
this.port = port;
this.path = path;
Expand All @@ -325,6 +346,8 @@ private NettyPerfClient(List<String> hosts, int port, String path, String pathFi
this.pathList = null;
}
this.concurrency = concurrency;
this.targetQPS = targetQPS;
sleepTimeInMs = this.targetQPS > 0 ? 1000L * concurrency / this.targetQPS : 0;
if (chunkSize != null) {
this.totalSize = totalSize;
chunk = new byte[chunkSize];
Expand All @@ -344,6 +367,8 @@ private NettyPerfClient(List<String> hosts, int port, String path, String pathFi
this.customHeaders.add(new Pair<>(customHeaderNameValue[0], customHeaderNameValue[1]));
}
}
// only when target QPS is specified, the client would create background scheduler to update sleep time.
backgroundScheduler = targetQPS > 0 ? Utils.newScheduler(1, false) : null;
logger.info("Instantiated NettyPerfClient which will interact with hosts {}, port {}, path {} with concurrency {}",
this.hosts, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths",
this.concurrency);
Expand Down Expand Up @@ -380,8 +405,14 @@ public void initChannel(SocketChannel ch) throws Exception {
channelToHost.put(future.channel(), host);
future.addListener(channelConnectListener);
}
hostToRequestCount.put(host, new AtomicLong(0));
hostToSleepTime.put(host, sleepTimeInMs);
}
channelConnectLatch.countDown();
if (backgroundScheduler != null) {
backgroundScheduler.scheduleAtFixedRate(updater, 0, 1, TimeUnit.SECONDS);
logger.info("Background scheduler is instantiated to update sleep time.");
}
isRunning = true;
logger.info("Created {} channel(s) per remote host", concurrency);
logger.info("NettyPerfClient started");
Expand All @@ -402,6 +433,9 @@ protected void shutdown() {
} else {
logger.info("NettyPerfClient shutdown complete");
}
if (backgroundScheduler != null) {
shutDownExecutorService(backgroundScheduler, 5, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
logger.error("NettyPerfClient shutdown interrupted", e);
} finally {
Expand All @@ -424,10 +458,38 @@ protected void shutdown() {
* Blocking function to wait on the NettyPerfClient shutting down.
* @throws InterruptedException
*/
protected void awaitShutdown() throws InterruptedException {
private void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}

/**
* A periodic updater that adjusts sleep time for connections based on current QPS and target QPS.
*/
private class SleepTimeUpdater implements Runnable {
@Override
public void run() {
for (Map.Entry<String, AtomicLong> hostAndRequestCount : hostToRequestCount.entrySet()) {
String hostname = hostAndRequestCount.getKey();
AtomicLong requestCount = hostAndRequestCount.getValue();
Long hostSleepTime = hostToSleepTime.get(hostname);
long currentQPS = requestCount.get();
logger.info("For host {}, request count per sec is {}, current sleep time is {} ms", hostname, currentQPS,
hostSleepTime);
if (targetQPS > 0 && currentQPS > 0) {
// formula to update sleep time is: new sleep time = (currentQPS / targetQPS) * (current sleep time)
long newSleepTime = currentQPS * hostSleepTime / targetQPS;
// if currentQPS is already higher than target one and new sleep time still equals current sleep time, we
// explicitly plus one more millisecond.
newSleepTime = newSleepTime + ((currentQPS > targetQPS && newSleepTime == hostSleepTime) ? 1 : 0);
hostSleepTime = newSleepTime;
requestCount.set(0);
logger.info("Updated sleep time is {} ms for host {}", hostSleepTime, hostname);
}
hostToSleepTime.put(hostname, hostSleepTime);
}
}
}

/**
* Custom handler that sends out the request and receives and processes the response.
* TODO support GET-after-POST and DELETE-after-POST operations for race condition testing.
Expand Down Expand Up @@ -533,10 +595,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
private void sendRequest(ChannelHandlerContext ctx) {
requestId++;
long globalId = totalRequestCount.incrementAndGet();
logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId,
ctx.channel());
Channel channel = ctx.channel();
String hostname = ((SocketChannel) channel).remoteAddress().getHostName();
hostToRequestCount.get(hostname).incrementAndGet();
logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId, channel);
reset();
perfClientMetrics.requestRate.mark();
try {
Thread.sleep(hostToSleepTime.get(hostname));
} catch (InterruptedException e) {
logger.error("Interrupted with exception:", e);
}
ctx.writeAndFlush(request);
if (request.method().equals(HttpMethod.POST)) {
ctx.writeAndFlush(chunkedInput);
Expand Down

0 comments on commit 379d881

Please sign in to comment.