Skip to content

Commit

Permalink
Perf investigations for async clientcore (#41494)
Browse files Browse the repository at this point in the history
  • Loading branch information
samvaity authored Sep 17, 2024
1 parent 72b06ee commit f72e89d
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.http.vertx.VertxAsyncHttpClientBuilder;
import com.azure.core.http.vertx.VertxAsyncHttpClientProvider;
import com.azure.core.util.logging.ClientLogger;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Expand All @@ -23,12 +24,20 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import javax.net.ssl.X509TrustManager;
import java.lang.reflect.Method;
import java.net.URI;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static com.azure.perf.test.core.PerfStressOptions.HttpClientType.JDK;
import static com.azure.perf.test.core.PerfStressOptions.HttpClientType.NETTY;
Expand All @@ -41,6 +50,7 @@
* @param <TOptions> the performance test options to use while running the test.
*/
public abstract class ApiPerfTestBase<TOptions extends PerfStressOptions> extends PerfTestBase<TOptions> {
ClientLogger LOGGER = new ClientLogger(ApiPerfTestBase.class);
private final reactor.netty.http.client.HttpClient recordPlaybackHttpClient;
private final URI testProxy;
private final TestProxyPolicy testProxyPolicy;
Expand Down Expand Up @@ -225,6 +235,106 @@ public Mono<Void> runAllAsync(long endNanoTime) {
.then();
}

public CompletableFuture<Void> runAllAsyncWithCompletableFuture(long endNanoTime) {
completedOperations = 0;
lastCompletionNanoTime = 0;
long startNanoTime = System.nanoTime();
Semaphore semaphore = new Semaphore(options.getParallel()); // Use configurable limit

List<CompletableFuture<Void>> futures = new LinkedList<>();
while (System.nanoTime() < endNanoTime) {
try {
semaphore.acquire();
// Each runTestAsyncWithCompletableFuture() call runs independently
CompletableFuture<Void> testFuture = runTestAsyncWithCompletableFuture()
.thenAccept(result -> {
completedOperations += result;
lastCompletionNanoTime = System.nanoTime() - startNanoTime;
})
.whenComplete((res, ex) -> semaphore.release());
futures.add(testFuture);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

// Remove all completed CompletableFutures from the list
futures.removeIf(CompletableFuture::isDone);
// Combine all futures so we can wait for all to complete
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
}

@Override
public Runnable runAllAsyncWithExecutorService(long endNanoTime) {
completedOperations = 0;
lastCompletionNanoTime = 0;
final ExecutorService executor = Executors.newFixedThreadPool(options.getParallel());

return () -> {
try {
while (System.nanoTime() < endNanoTime) {
long startNanoTime = System.nanoTime();

try {
Runnable task = runTestAsyncWithExecutorService();
executor.submit(() -> {
task.run();
completedOperations++;
lastCompletionNanoTime = System.nanoTime() - startNanoTime;
}).get(); // Wait for the task to complete
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(options.getDuration(), TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
};
}

@Override
public Runnable runAllAsyncWithVirtualThread(long endNanoTime) {
completedOperations = 0;
lastCompletionNanoTime = 0;

ExecutorService virtualThreadExecutor;
try {
Method method = Executors.class.getMethod("newVirtualThreadPerTaskExecutor");
virtualThreadExecutor = (ExecutorService) method.invoke(null);
} catch (Exception e) {
// Skip virtual thread tests and report 0 completed operations rather than fallback
return () -> {
completedOperations = 0;
lastCompletionNanoTime = 0;
};
}

return () -> {
while (System.nanoTime() < endNanoTime) {
long startNanoTime = System.nanoTime();
virtualThreadExecutor.execute(() -> {
try {
runTestAsyncWithVirtualThread();
completedOperations++;
lastCompletionNanoTime = System.nanoTime() - startNanoTime;
} catch (Exception e) {
LOGGER.logThrowableAsError(e);
}
});
}
virtualThreadExecutor.shutdown();
};
}

/**
* Indicates how many operations were completed in a single run of the test. Good to be used for batch operations.
*
Expand All @@ -240,6 +350,31 @@ public Mono<Void> runAllAsync(long endNanoTime) {
*/
abstract Mono<Integer> runTestAsync();

/**
* Indicates how many operations were completed in a single run of the async test using CompletableFuture.
*
* @return the number of successful operations completed.
*/
CompletableFuture<Integer> runTestAsyncWithCompletableFuture() {
throw new UnsupportedOperationException("runAllAsyncWithCompletableFuture is not supported.");
}

/**
* Indicates how many operations were completed in a single run of the async test using ExecutorService.
*
* @return the number of successful operations completed.
*/
Runnable runTestAsyncWithExecutorService() {
throw new UnsupportedOperationException("runAllAsyncWithExecutorService is not supported.");
}

/**
* Indicates how many operations were completed in a single run of the async test using Virtual Threads.
*/
Runnable runTestAsyncWithVirtualThread() {
throw new UnsupportedOperationException("runAllAsyncWithVirtualThread is not supported.");
}

/**
* Stops playback tests.
*
Expand Down Expand Up @@ -327,6 +462,12 @@ private Mono<Void> runSyncOrAsync() {
return Mono.defer(() -> {
if (options.isSync()) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> runTest())).then();
} else if (options.isCompletableFuture()) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> runTestAsyncWithCompletableFuture())).then();
} else if (options.isExecutorService()) {
return Mono.fromRunnable(runTestAsyncWithExecutorService());
} else if (options.isVirtualThread()) {
return Mono.fromRunnable(this::runTestAsyncWithVirtualThread);
} else {
return runTestAsync().then();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.perf.test.core;

import com.azure.core.util.ExpandableStringEnum;
Expand Down Expand Up @@ -49,6 +46,15 @@ public class PerfStressOptions {
@Parameter(names = { "--http-client" }, description = "The http client to use. Can be netty, okhttp, jdk, vertx or a full name of HttpClientProvider implementation class.")
private String httpClient = HttpClientType.NETTY.toString();

@Parameter(names = { "--completeablefuture" }, help = true, description = "Runs the performance test asynchronously as a CompletableFuture.")
private boolean completeablefuture = false;

@Parameter(names = { "--executorservice" }, help = true, description = "Runs the performance test asynchronously with an ExecutorService.")
private boolean executorservice = false;

@Parameter(names = { "--virtualthread" }, help = true, description = "Runs the performance test asynchronously with a virtual thread.")
private boolean virtualthread = false;

/**
* Get the configured count for performance test.
* @return The count.
Expand Down Expand Up @@ -129,6 +135,30 @@ public boolean isSync() {
return sync;
}

/**
* Get the configured CompletableFuture status for performance test.
* @return The CompletableFuture status.
*/
public boolean isCompletableFuture() {
return completeablefuture;
}

/**
* Get the configured ExecutorService status for performance test.
* @return The ExecutorService status.
*/
public boolean isExecutorService() {
return executorservice;
}

/**
* Get the configured VirtualThread status for performance test.
* @return The VirtualThread status.
*/
public boolean isVirtualThread() {
return virtualthread;
}

/**
* The http client to use. Can be netty, okhttp.
* @return The http client to use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -156,15 +160,19 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
}

if (options.getWarmup() > 0) {
runTests(tests, options.isSync(), options.getParallel(), options.getWarmup(), "Warmup");
runTests(tests, options.isSync(), options.isCompletableFuture(), options.isExecutorService(),
options.isVirtualThread(),
options.getParallel(),
options.getWarmup(), "Warmup");
}

for (int i = 0; i < options.getIterations(); i++) {
String title = "Test";
if (options.getIterations() > 1) {
title += " " + (i + 1);
}
runTests(tests, options.isSync(), options.getParallel(), options.getDuration(), title);
runTests(tests, options.isSync(), options.isCompletableFuture(), options.isExecutorService(),
startedPlayback, options.getParallel(), options.getDuration(), title);
}
} finally {
try {
Expand Down Expand Up @@ -253,13 +261,19 @@ private static void writeKeyValue(String key, Object value, StringBuilder sb, At
*
* @param tests the performance tests to be executed.
* @param sync indicate if synchronous test should be run.
* @param completableFuture indicate if completable future test should be run.
* @param executorService indicate if executor service test should be run.
* @param virtualThread indicate if virtual thread test should be run.
* @param parallel the number of parallel threads to run the performance test on.
* @param durationSeconds the duration for which performance test should be run on.
* @param title the title of the performance tests.
*
* @throws RuntimeException if the execution fails.
* @throws IllegalStateException if zero operations completed of the performance test.
*/
public static void runTests(PerfTestBase<?>[] tests, boolean sync, int parallel, int durationSeconds, String title) {
public static void runTests(PerfTestBase<?>[] tests, boolean sync, boolean completableFuture,
boolean executorService, boolean virtualThread, int parallel, int durationSeconds,
String title) {

long endNanoTime = System.nanoTime() + ((long) durationSeconds * 1000000000);

Expand Down Expand Up @@ -288,6 +302,37 @@ public static void runTests(PerfTestBase<?>[] tests, boolean sync, int parallel,
forkJoinPool.invokeAll(operations);

forkJoinPool.awaitQuiescence(durationSeconds + 1, TimeUnit.SECONDS);
} else if (completableFuture) {
List<CompletableFuture<Void>> futures = new LinkedList<>();
for (PerfTestBase<?> test : tests) {
futures.add(test.runAllAsyncWithCompletableFuture(endNanoTime));
}
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
allFutures.get(); // Wait for all futures to complete
} else if (executorService) {
// when updated to concurrentTaskLimit, the performance drops?
ExecutorService executor = Executors.newFixedThreadPool(tests.length);
try {
for (PerfTestBase<?> test : tests) {
Runnable task = test.runAllAsyncWithExecutorService(endNanoTime);
executor.submit(task);
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(durationSeconds + 1, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
} else if (virtualThread) {
for (PerfTestBase<?> test : tests) {
test.runAllAsyncWithVirtualThread(endNanoTime);
}
} else {
// Exceptions like OutOfMemoryError are handled differently by the default Reactor schedulers. Instead of terminating the
// Flux, the Flux will hang and the exception is only sent to the thread's uncaughtExceptionHandler and the Reactor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.perf.test.core;

import java.util.concurrent.CompletableFuture;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -37,6 +38,21 @@ Mono<Integer> runTestAsync() {
return runAsync().then(Mono.just(1));
}

@Override
CompletableFuture<Integer> runTestAsyncWithCompletableFuture() {
return runAsyncWithCompletableFuture().thenApply(unused -> 1);
}

@Override
Runnable runTestAsyncWithExecutorService() {
return runAsyncWithExecutorService();
}

@Override
Runnable runTestAsyncWithVirtualThread() {
return runAsyncWithVirtualThread();
}

/**
* Runs the performance test.
*/
Expand All @@ -47,4 +63,27 @@ Mono<Integer> runTestAsync() {
* @return An empty {@link Mono}
*/
public abstract Mono<Void> runAsync();

/**
* Runs the performance test asynchronously.
* @return An empty {@link CompletableFuture}
*/
public CompletableFuture<Void> runAsyncWithCompletableFuture() {
throw new UnsupportedOperationException("runAsyncWithCompletableFuture is not supported.");
}

/**
* Runs the performance test asynchronously.
* @return An empty {@link Runnable}
*/
public Runnable runAsyncWithExecutorService() {
throw new UnsupportedOperationException("runAsyncWithExecutorService is not supported.");
}

/**
* Runs the performance test asynchronously.
*/
public Runnable runAsyncWithVirtualThread() {
throw new UnsupportedOperationException("runAsyncWithVirtualThread is not supported.");
}
}
Loading

0 comments on commit f72e89d

Please sign in to comment.