Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support QPS throttling in NettyPerfClient #1220

Merged
merged 2 commits into from
Jul 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.utils.Utils.*;


/**
* A Netty based client to evaluate performance of the front end(s).
Expand Down Expand Up @@ -110,6 +112,12 @@ public class NettyPerfClient {
private final CountDownLatch channelConnectLatch = new CountDownLatch(1);
private final AtomicLong totalRequestCount = new AtomicLong(0);
private final Map<Channel, String> channelToHost = new HashMap<>();
private final Map<String, AtomicLong> hostToRequestCount = new HashMap<>();
private final Map<String, Long> hostToSleepTime = new HashMap<>();
private final SleepTimeUpdater updater = new SleepTimeUpdater();
private final ScheduledExecutorService backgroundScheduler;
private final int targetQPS;
private long sleepTimeInMs = 0;

private EventLoopGroup group;
private long perfClientStartTime;
Expand All @@ -133,6 +141,7 @@ private static class ClientArgs {
final List<String> customHeaders;
final String serviceId;
final Integer testTime;
final Integer targetQPS;
final String sslPropsFilePath;
private final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -208,6 +217,11 @@ private static class ClientArgs {
.withOptionalArg()
.describedAs("testTime")
.ofType(Integer.class);
ArgumentAcceptingOptionSpec<Integer> targetQPS = parser.accepts("targetQPS", "The target QPS for each frontend.")
.withOptionalArg()
.describedAs("targetQPS")
.ofType(Integer.class)
.defaultsTo(0);
ArgumentAcceptingOptionSpec<String> sslPropsFilePath =
parser.accepts("sslPropsFilePath", "The path to the properties file with SSL settings. Set to enable SSL.")
.withOptionalArg()
Expand All @@ -228,6 +242,7 @@ private static class ClientArgs {
this.customHeaders = options.valuesOf(customHeader);
this.serviceId = options.valueOf(serviceId);
this.testTime = options.valueOf(testTime);
this.targetQPS = options.valueOf(targetQPS);
this.sslPropsFilePath = options.valueOf(sslPropsFilePath);
validateArgs();

Expand All @@ -241,6 +256,7 @@ private static class ClientArgs {
logger.info("Post blob chunk size: {}", this.postBlobChunkSize);
logger.info("SSL properties file path: {}", this.sslPropsFilePath);
logger.info("Custom Headers: {}", this.customHeaders);
logger.info("Target QPS: {}", this.targetQPS);
}

/**
Expand All @@ -262,6 +278,9 @@ private void validateArgs() {
if (serviceId == null || serviceId.isEmpty()) {
throw new IllegalArgumentException("serviceId cannot be empty");
}
if (targetQPS < 0) {
throw new IllegalArgumentException("Target QPS cannot be negative value");
}
}
}

Expand All @@ -276,7 +295,7 @@ public static void main(String[] args) {
new NettyPerfClient(clientArgs.hosts, clientArgs.port, clientArgs.path, clientArgs.pathFileName,
clientArgs.concurrency, clientArgs.postBlobTotalSize, clientArgs.postBlobChunkSize,
clientArgs.sslPropsFilePath, clientArgs.serviceId, clientArgs.targetAccountName,
clientArgs.targetContainerName, clientArgs.customHeaders);
clientArgs.targetContainerName, clientArgs.customHeaders, clientArgs.targetQPS);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Received shutdown signal. Requesting NettyPerfClient shutdown");
Expand Down Expand Up @@ -310,12 +329,14 @@ public static void main(String[] args) {
* @param targetAccountName target account name for POST
* @param targetContainerName target container name for POST
* @param customHeaders list of http headers name:value to be added.
* @param targetQPS the target QPS expected on single frontend. If not specified, targetQPS uses default value 0, which
* means the client attempts to issue request as fast as it can.
* @throws IOException
* @throws GeneralSecurityException
*/
private NettyPerfClient(List<String> hosts, int port, String path, String pathFileName, int concurrency,
Long totalSize, Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName,
String targetContainerName, List<String> customHeaders) throws Exception {
String targetContainerName, List<String> customHeaders, int targetQPS) throws Exception {
this.hosts = hosts;
this.port = port;
this.path = path;
Expand All @@ -325,6 +346,8 @@ private NettyPerfClient(List<String> hosts, int port, String path, String pathFi
this.pathList = null;
}
this.concurrency = concurrency;
this.targetQPS = targetQPS;
sleepTimeInMs = this.targetQPS > 0 ? 1000L * concurrency / this.targetQPS : 0;
if (chunkSize != null) {
this.totalSize = totalSize;
chunk = new byte[chunkSize];
Expand All @@ -344,6 +367,8 @@ private NettyPerfClient(List<String> hosts, int port, String path, String pathFi
this.customHeaders.add(new Pair<>(customHeaderNameValue[0], customHeaderNameValue[1]));
}
}
// only when target QPS is specified, the client would create background scheduler to update sleep time.
backgroundScheduler = targetQPS > 0 ? Utils.newScheduler(1, false) : null;
logger.info("Instantiated NettyPerfClient which will interact with hosts {}, port {}, path {} with concurrency {}",
this.hosts, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths",
this.concurrency);
Expand Down Expand Up @@ -380,8 +405,14 @@ public void initChannel(SocketChannel ch) throws Exception {
channelToHost.put(future.channel(), host);
future.addListener(channelConnectListener);
}
hostToRequestCount.put(host, new AtomicLong(0));
hostToSleepTime.put(host, sleepTimeInMs);
}
channelConnectLatch.countDown();
if (backgroundScheduler != null) {
backgroundScheduler.scheduleAtFixedRate(updater, 0, 1, TimeUnit.SECONDS);
logger.info("Background scheduler is instantiated to update sleep time.");
}
isRunning = true;
logger.info("Created {} channel(s) per remote host", concurrency);
logger.info("NettyPerfClient started");
Expand All @@ -402,6 +433,9 @@ protected void shutdown() {
} else {
logger.info("NettyPerfClient shutdown complete");
}
if (backgroundScheduler != null) {
shutDownExecutorService(backgroundScheduler, 5, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
logger.error("NettyPerfClient shutdown interrupted", e);
} finally {
Expand All @@ -424,10 +458,38 @@ protected void shutdown() {
* Blocking function to wait on the NettyPerfClient shutting down.
* @throws InterruptedException
*/
protected void awaitShutdown() throws InterruptedException {
private void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}

/**
* A periodic updater that adjusts sleep time for connections based on current QPS and target QPS.
*/
private class SleepTimeUpdater implements Runnable {
@Override
public void run() {
for (Map.Entry<String, AtomicLong> hostAndRequestCount : hostToRequestCount.entrySet()) {
String hostname = hostAndRequestCount.getKey();
AtomicLong requestCount = hostAndRequestCount.getValue();
Long hostSleepTime = hostToSleepTime.get(hostname);
long currentQPS = requestCount.get();
logger.info("For host {}, request count per sec is {}, current sleep time is {} ms", hostname, currentQPS,
hostSleepTime);
if (targetQPS > 0 && currentQPS > 0) {
// formula to update sleep time is: new sleep time = (currentQPS / targetQPS) * (current sleep time)
long newSleepTime = currentQPS * hostSleepTime / targetQPS;
// if currentQPS is already higher than target one and new sleep time still equals current sleep time, we
// explicitly plus one more millisecond.
newSleepTime = newSleepTime + ((currentQPS > targetQPS && newSleepTime == hostSleepTime) ? 1 : 0);
hostSleepTime = newSleepTime;
requestCount.set(0);
logger.info("Updated sleep time is {} ms for host {}", hostSleepTime, hostname);
}
hostToSleepTime.put(hostname, hostSleepTime);
}
}
}

/**
* Custom handler that sends out the request and receives and processes the response.
* TODO support GET-after-POST and DELETE-after-POST operations for race condition testing.
Expand Down Expand Up @@ -533,10 +595,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
private void sendRequest(ChannelHandlerContext ctx) {
requestId++;
long globalId = totalRequestCount.incrementAndGet();
logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId,
ctx.channel());
Channel channel = ctx.channel();
String hostname = ((SocketChannel) channel).remoteAddress().getHostName();
hostToRequestCount.get(hostname).incrementAndGet();
logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId, channel);
reset();
perfClientMetrics.requestRate.mark();
try {
Thread.sleep(hostToSleepTime.get(hostname));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since this function will be called within a nonblocking function, maybe we shouldn't do a thread.sleep here.
Some think like

Channel channel = ctx.Channel();
ctx.channel().eventLoop().schedule(new Runnable() {
    // send the request out.
}, hostToSleepTime.get(hostname), TimeUnit.MILLSECOND);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for letting me know this. Temporarily use current version, will take your suggestion in future PR after several tests.

} catch (InterruptedException e) {
logger.error("Interrupted with exception:", e);
}
ctx.writeAndFlush(request);
if (request.method().equals(HttpMethod.POST)) {
ctx.writeAndFlush(chunkedInput);
Expand Down