-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support QPS throttling in NettyPerfClient #1220
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,8 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static com.github.ambry.utils.Utils.*; | ||
|
||
|
||
/** | ||
* A Netty based client to evaluate performance of the front end(s). | ||
|
@@ -110,6 +112,12 @@ public class NettyPerfClient { | |
private final CountDownLatch channelConnectLatch = new CountDownLatch(1); | ||
private final AtomicLong totalRequestCount = new AtomicLong(0); | ||
private final Map<Channel, String> channelToHost = new HashMap<>(); | ||
private final Map<String, AtomicLong> hostToRequestCount = new HashMap<>(); | ||
private final Map<String, Long> hostToSleepTime = new HashMap<>(); | ||
private final SleepTimeUpdater updater = new SleepTimeUpdater(); | ||
private final ScheduledExecutorService backgroundScheduler; | ||
private final int targetQPS; | ||
private long sleepTimeInMs = 0; | ||
|
||
private EventLoopGroup group; | ||
private long perfClientStartTime; | ||
|
@@ -133,6 +141,7 @@ private static class ClientArgs { | |
final List<String> customHeaders; | ||
final String serviceId; | ||
final Integer testTime; | ||
final Integer targetQPS; | ||
final String sslPropsFilePath; | ||
private final Logger logger = LoggerFactory.getLogger(getClass()); | ||
|
||
|
@@ -208,6 +217,11 @@ private static class ClientArgs { | |
.withOptionalArg() | ||
.describedAs("testTime") | ||
.ofType(Integer.class); | ||
ArgumentAcceptingOptionSpec<Integer> targetQPS = parser.accepts("targetQPS", "The target QPS for each frontend.") | ||
.withOptionalArg() | ||
.describedAs("targetQPS") | ||
.ofType(Integer.class) | ||
.defaultsTo(0); | ||
ArgumentAcceptingOptionSpec<String> sslPropsFilePath = | ||
parser.accepts("sslPropsFilePath", "The path to the properties file with SSL settings. Set to enable SSL.") | ||
.withOptionalArg() | ||
|
@@ -228,6 +242,7 @@ private static class ClientArgs { | |
this.customHeaders = options.valuesOf(customHeader); | ||
this.serviceId = options.valueOf(serviceId); | ||
this.testTime = options.valueOf(testTime); | ||
this.targetQPS = options.valueOf(targetQPS); | ||
this.sslPropsFilePath = options.valueOf(sslPropsFilePath); | ||
validateArgs(); | ||
|
||
|
@@ -241,6 +256,7 @@ private static class ClientArgs { | |
logger.info("Post blob chunk size: {}", this.postBlobChunkSize); | ||
logger.info("SSL properties file path: {}", this.sslPropsFilePath); | ||
logger.info("Custom Headers: {}", this.customHeaders); | ||
logger.info("Target QPS: {}", this.targetQPS); | ||
} | ||
|
||
/** | ||
|
@@ -262,6 +278,9 @@ private void validateArgs() { | |
if (serviceId == null || serviceId.isEmpty()) { | ||
throw new IllegalArgumentException("serviceId cannot be empty"); | ||
} | ||
if (targetQPS < 0) { | ||
throw new IllegalArgumentException("Target QPS cannot be negative value"); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -276,7 +295,7 @@ public static void main(String[] args) { | |
new NettyPerfClient(clientArgs.hosts, clientArgs.port, clientArgs.path, clientArgs.pathFileName, | ||
clientArgs.concurrency, clientArgs.postBlobTotalSize, clientArgs.postBlobChunkSize, | ||
clientArgs.sslPropsFilePath, clientArgs.serviceId, clientArgs.targetAccountName, | ||
clientArgs.targetContainerName, clientArgs.customHeaders); | ||
clientArgs.targetContainerName, clientArgs.customHeaders, clientArgs.targetQPS); | ||
// attach shutdown handler to catch control-c | ||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | ||
logger.info("Received shutdown signal. Requesting NettyPerfClient shutdown"); | ||
|
@@ -310,12 +329,14 @@ public static void main(String[] args) { | |
* @param targetAccountName target account name for POST | ||
* @param targetContainerName target container name for POST | ||
* @param customHeaders list of http headers name:value to be added. | ||
* @param targetQPS the target QPS expected on single frontend. If not specified, targetQPS uses default value 0, which | ||
* means the client attempts to issue request as fast as it can. | ||
* @throws IOException | ||
* @throws GeneralSecurityException | ||
*/ | ||
private NettyPerfClient(List<String> hosts, int port, String path, String pathFileName, int concurrency, | ||
Long totalSize, Integer chunkSize, String sslPropsFilePath, String serviceId, String targetAccountName, | ||
String targetContainerName, List<String> customHeaders) throws Exception { | ||
String targetContainerName, List<String> customHeaders, int targetQPS) throws Exception { | ||
this.hosts = hosts; | ||
this.port = port; | ||
this.path = path; | ||
|
@@ -325,6 +346,8 @@ private NettyPerfClient(List<String> hosts, int port, String path, String pathFi | |
this.pathList = null; | ||
} | ||
this.concurrency = concurrency; | ||
this.targetQPS = targetQPS; | ||
sleepTimeInMs = this.targetQPS > 0 ? 1000L * concurrency / this.targetQPS : 0; | ||
if (chunkSize != null) { | ||
this.totalSize = totalSize; | ||
chunk = new byte[chunkSize]; | ||
|
@@ -344,6 +367,8 @@ private NettyPerfClient(List<String> hosts, int port, String path, String pathFi | |
this.customHeaders.add(new Pair<>(customHeaderNameValue[0], customHeaderNameValue[1])); | ||
} | ||
} | ||
// only when target QPS is specified, the client would create background scheduler to update sleep time. | ||
backgroundScheduler = targetQPS > 0 ? Utils.newScheduler(1, false) : null; | ||
logger.info("Instantiated NettyPerfClient which will interact with hosts {}, port {}, path {} with concurrency {}", | ||
this.hosts, this.port, this.pathList == null ? this.path : "has " + this.pathList.size() + "paths", | ||
this.concurrency); | ||
|
@@ -380,8 +405,14 @@ public void initChannel(SocketChannel ch) throws Exception { | |
channelToHost.put(future.channel(), host); | ||
future.addListener(channelConnectListener); | ||
} | ||
hostToRequestCount.put(host, new AtomicLong(0)); | ||
hostToSleepTime.put(host, sleepTimeInMs); | ||
} | ||
channelConnectLatch.countDown(); | ||
if (backgroundScheduler != null) { | ||
backgroundScheduler.scheduleAtFixedRate(updater, 0, 1, TimeUnit.SECONDS); | ||
logger.info("Background scheduler is instantiated to update sleep time."); | ||
} | ||
isRunning = true; | ||
logger.info("Created {} channel(s) per remote host", concurrency); | ||
logger.info("NettyPerfClient started"); | ||
|
@@ -402,6 +433,9 @@ protected void shutdown() { | |
} else { | ||
logger.info("NettyPerfClient shutdown complete"); | ||
} | ||
if (backgroundScheduler != null) { | ||
shutDownExecutorService(backgroundScheduler, 5, TimeUnit.SECONDS); | ||
} | ||
} catch (InterruptedException e) { | ||
logger.error("NettyPerfClient shutdown interrupted", e); | ||
} finally { | ||
|
@@ -424,10 +458,33 @@ protected void shutdown() { | |
* Blocking function to wait on the NettyPerfClient shutting down. | ||
* @throws InterruptedException | ||
*/ | ||
protected void awaitShutdown() throws InterruptedException { | ||
private void awaitShutdown() throws InterruptedException { | ||
shutdownLatch.await(); | ||
} | ||
|
||
/** | ||
* A periodic updater that adjusts sleep time for connections based on current QPS and target QPS. | ||
*/ | ||
private class SleepTimeUpdater implements Runnable { | ||
@Override | ||
public void run() { | ||
for (Map.Entry<String, AtomicLong> hostAndRequestCount : hostToRequestCount.entrySet()) { | ||
String hostname = hostAndRequestCount.getKey(); | ||
AtomicLong requestCount = hostAndRequestCount.getValue(); | ||
Long hostSleepTime = hostToSleepTime.get(hostname); | ||
logger.info("For host {}, request count per sec is {}, current sleep time is {} ms", hostname, | ||
requestCount.get(), hostSleepTime); | ||
if (targetQPS > 0 && requestCount.get() > 0) { | ||
long val = requestCount.get() * 100 * hostSleepTime / targetQPS; | ||
hostSleepTime = val / 100 + (val > 100 && (val % 100 >= 50 || hostSleepTime == 1) ? 1 : 0); | ||
requestCount.set(0); | ||
logger.info("Updated sleep time is {} ms for host {}", hostSleepTime, hostname); | ||
} | ||
hostToSleepTime.put(hostname, hostSleepTime); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Custom handler that sends out the request and receives and processes the response. | ||
* TODO support GET-after-POST and DELETE-after-POST operations for race condition testing. | ||
|
@@ -533,10 +590,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | |
private void sendRequest(ChannelHandlerContext ctx) { | ||
requestId++; | ||
long globalId = totalRequestCount.incrementAndGet(); | ||
logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId, | ||
ctx.channel()); | ||
Channel channel = ctx.channel(); | ||
String hostname = ((SocketChannel) channel).remoteAddress().getHostName(); | ||
hostToRequestCount.get(hostname).incrementAndGet(); | ||
logger.trace("Sending request with global ID {} and local ID {} on channel {}", globalId, requestId, channel); | ||
reset(); | ||
perfClientMetrics.requestRate.mark(); | ||
try { | ||
Thread.sleep(hostToSleepTime.get(hostname)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Since this function will be called within a nonblocking function, maybe we shouldn't do a thread.sleep here.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for letting me know this. Temporarily use current version, will take your suggestion in future PR after several tests. |
||
} catch (InterruptedException e) { | ||
logger.error("Interrupted with exception:", e); | ||
} | ||
ctx.writeAndFlush(request); | ||
if (request.method().equals(HttpMethod.POST)) { | ||
ctx.writeAndFlush(chunkedInput); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand your calculation here, can you elaborate?
BTW, I think some simple function like
might just do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed