Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add virtual threads support #224

Merged
merged 35 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ce2b6c3
(tmp)
kawamuray Oct 6, 2023
ea67465
(tmp)
kawamuray Oct 17, 2023
bf9d68d
tmp
kawamuray Nov 24, 2023
44d1cff
ReentrantLock version
kawamuray Nov 24, 2023
c86c1f9
Use gradle 8.5 and java 21
kawamuray Jan 11, 2024
111c70d
Eliminate synchronized in ProcessingContextImpl
kawamuray Jan 11, 2024
31ad207
Revert "ReentrantLock version"
kawamuray Jan 11, 2024
fa4dae6
Introduce SubPartitionRuntime to enable switching pthread and vthread
kawamuray Jan 17, 2024
d42b7e4
Support SubPartitionRuntime in benchmark mode
kawamuray Jan 17, 2024
77dd57d
(cleanup) no longer used jvm flags
kawamuray Jan 17, 2024
48af2b2
Eliminate use of synchronized
kawamuray Jan 17, 2024
fc8fdd9
more eliminate use of synchronized
kawamuray Jan 17, 2024
57171a0
Add it for vthread mode
kawamuray Jan 17, 2024
e3cf5f7
Revert source/target compatibility to java8
kawamuray Jan 17, 2024
d70a15c
no longer need enable-preview
kawamuray Jan 17, 2024
11de122
garbage
kawamuray Jan 17, 2024
69cf86b
Follow up work
kawamuray Jan 18, 2024
573e79e
Use just Java 21 or higher to build project
kawamuray Jan 18, 2024
f19a258
Handle TODOs
kawamuray Jan 23, 2024
8e56d30
Add integration test for Vthread runtime
kawamuray Jan 23, 2024
7c29d58
Add support to split I/O simulation latency into many in benchmark
kawamuray Jan 25, 2024
409d8af
More understandable API
kawamuray Jan 25, 2024
306eb68
Try loading async-profiler before warmup so that it won't cause deopt…
kawamuray Feb 2, 2024
b2cb330
Preload AP but record only the actual execution part
kawamuray Feb 2, 2024
0b66855
(remove) disable metrics once
kawamuray Feb 2, 2024
cbf9c8a
ThreadPoolSubpartitions should take care of thread_pool runtime speci…
kawamuray Feb 8, 2024
bfe7a38
Run benchmark with fixed heapsize
kawamuray Feb 8, 2024
42199d5
Avoid stream API for better performance
kawamuray Feb 8, 2024
2fb8b21
Benchmark use PROVIDED scope processor
kawamuray Feb 8, 2024
7a2df2d
(fixup) reduce cost of cleanup
kawamuray Feb 8, 2024
d58b4e9
Explicitly set -server option for benchmark
kawamuray Feb 9, 2024
c38afd8
Use ConcurrentHashMap.newKeySet
kawamuray Feb 9, 2024
bf1d293
(fixup) apply feedback
kawamuray Feb 19, 2024
fc55663
(fixup) nit
kawamuray Feb 19, 2024
1ccbae9
(fixiup) apply feedback for scheduled executor
kawamuray Feb 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java: [8, 11, 17, 20]
java: [21]
steps:
- uses: actions/checkout@v2
- name: Setup java
Expand Down
2 changes: 1 addition & 1 deletion benchmark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ dependencies {
// To serialize java.time.Duration
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion"

runtimeOnly "ch.qos.logback:logback-classic:1.2.12"
runtimeOnly "ch.qos.logback:logback-classic:1.4.11"
}
2 changes: 1 addition & 1 deletion benchmark/debm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ if [[ "$*" == *--taskstats* ]] && [[ "$*" != *--taskstats-bin* ]] && ! which jta
extra_opts="$extra_opts --taskstats-bin=$file"
fi

exec java -XX:+UseG1GC -cp "$classpath" com.linecorp.decaton.benchmark.Main $extra_opts "$@"
exec java -server -Xmx8g -Xcomp -XX:+UseG1GC -cp "$classpath" com.linecorp.decaton.benchmark.Main $extra_opts "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public class BenchmarkConfig {
* Latency to simulate as processing duration.
*/
int simulateLatencyMs;
/**
* Count to repeat the simulateLatencyMs.
*/
int latencyCount;
/**
* Optional bootstrap.servers to specify the cluster to use for testing. Otherwise local embedded cluster is
* used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;

import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.SubPartitionRuntime;
import com.linecorp.decaton.processor.runtime.SubscriptionBuilder;
import com.linecorp.decaton.processor.runtime.SubscriptionStateListener;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.runtime.ProcessorScope;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.SubscriptionBuilder;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
Expand Down Expand Up @@ -71,13 +71,18 @@ public void init(Config config, Recording recording, ResourceTracker resourceTra
// value than zero with the default "latest" reset policy.
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

SubPartitionRuntime subPartitionRuntime = SubPartitionRuntime.THREAD_POOL;
List<Property<?>> properties = new ArrayList<>();
for (Map.Entry<String, String> entry : config.parameters().entrySet()) {
String name = entry.getKey();
Function<String, Object> ctor = propertyConstructors.get(name);
Object value = ctor.apply(entry.getValue());
Property<?> prop = ProcessorProperties.propertyForName(name, value);
properties.add(prop);
if ("decaton.subpartition.runtime".equals(name)) {
subPartitionRuntime = SubPartitionRuntime.valueOf(entry.getValue());
} else {
Function<String, Object> ctor = propertyConstructors.get(name);
Object value = ctor.apply(entry.getValue());
Property<?> prop = ProcessorProperties.propertyForName(name, value);
properties.add(prop);
}
}

registry = new LoggingMeterRegistry(new LoggingRegistryConfig() {
Expand All @@ -98,6 +103,7 @@ public String get(String key) {
.newBuilder("decaton-benchmark")
.consumerConfig(props)
.addProperties(StaticPropertySupplier.of(properties))
.subPartitionRuntime(subPartitionRuntime)
.processorsBuilder(
ProcessorsBuilder.consuming(config.topic(),
(TaskExtractor<Task>) bytes -> {
Expand All @@ -107,11 +113,10 @@ public String get(String key) {
TaskMetadata.builder().build(), task, bytes);
})
.thenProcess(
() -> (ctx, task) -> {
(ctx, task) -> {
resourceTracker.track(Thread.currentThread().getId());
recording.process(task);
},
ProcessorScope.THREAD))
}))
.stateListener(state -> {
if (state == SubscriptionStateListener.State.RUNNING) {
startLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public BenchmarkResult execute(Config config, Consumer<Stage> stageCallback) thr

log.info("Loading runner {}", bmConfig.runner());
Runner runner = Runner.fromClassName(bmConfig.runner());
Recording recording = new Recording(bmConfig.tasks(), bmConfig.warmupTasks());
Recording recording = new Recording(bmConfig.tasks(), bmConfig.warmupTasks(),
bmConfig.latencyCount());
ResourceTracker resourceTracker = new ResourceTracker();
log.info("Initializing runner {}", bmConfig.runner());

Expand All @@ -68,13 +69,15 @@ public BenchmarkResult execute(Config config, Consumer<Stage> stageCallback) thr
final Optional<Path> profilerOutput;
final Optional<Path> taskstatsOutput;
try {
profiling.start();
stageCallback.accept(Stage.READY_WARMUP);
if (!recording.awaitWarmupComplete(3, TimeUnit.MINUTES)) {
throw new RuntimeException("timeout on awaiting benchmark to complete");
}
if (!bmConfig.skipWaitingJIT()) {
awaitJITGetsSettled();
}
profiling.stop();

profiling.start();
JvmTracker jvmTracker = JvmTracker.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public final class Main implements Callable<Integer> {
defaultValue = "0")
private int simulateLatencyMs;

@Option(names = "--latency-count",
description = "The number of times to sleep for the latency to simulating multiple I/O",
defaultValue = "1")
private int latencyCount;

@Option(names = "--bootstrap-servers",
description = "Optional bootstrap.servers property. if supplied, the specified kafka cluster is used for benchmarking instead of local embedded clusters")
private String bootstrapServers;
Expand Down Expand Up @@ -144,6 +149,7 @@ public Integer call() throws Exception {
.tasks(tasks)
.warmupTasks(warmupTasks)
.simulateLatencyMs(simulateLatencyMs)
.latencyCount(latencyCount)
.bootstrapServers(bootstrapServers)
.params(params)
.skipWaitingJIT(skipWaitingJIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ static class ExecutionRecord {
private final AtomicLong maxDeliveryTime = new AtomicLong();
private final AtomicLong totalDeliveryTime = new AtomicLong();
private final AtomicLong processCount = new AtomicLong();
private final int latencyCount;

public ExecutionRecord(int latencyCount) {
this.latencyCount = latencyCount;
}

void process(Task task, boolean warmup) {
if (!warmup) {
Expand All @@ -50,7 +55,9 @@ void process(Task task, boolean warmup) {
try {
int latency = task.getProcessLatency();
if (latency > 0) {
Thread.sleep(latency);
for (int i = 0; i < latencyCount; i++) {
Thread.sleep(latency);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -68,10 +75,10 @@ void process(Task task, boolean warmup) {
private volatile long startTimeMs;
private volatile long completeTimeMs;

public Recording(int tasks, int warmupTasks) {
public Recording(int tasks, int warmupTasks, int latencyCount) {
this.tasks = tasks;
this.warmupTasks = warmupTasks;
executionRecord = new ExecutionRecord();
executionRecord = new ExecutionRecord(latencyCount);
processedCount = new AtomicInteger();
warmupLatch = new CountDownLatch(1);
if (warmupTasks == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public void print(BenchmarkConfig config, OutputStream out, BenchmarkResult resu
pw.printf("# Runner: %s\n", config.runner());
pw.printf("# Tasks: %d (warmup: %d)\n", config.tasks(), config.warmupTasks());
pw.printf("# Simulated Latency(ms): %d\n", config.simulateLatencyMs());
pw.printf("# Latency Count: %d\n", config.latencyCount());
pw.printf("# Total Simulated Latency(ms): %d\n", config.simulateLatencyMs() * config.latencyCount());
for (Entry<String, String> e : config.params().entrySet()) {
pw.printf("# Param: %s=%s\n", e.getKey(), e.getValue());
}
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent

plugins {
id "me.champeau.jmh" version "0.6.6" apply false
id 'com.github.johnrengelman.shadow' version '7.1.1' apply false
id 'com.github.johnrengelman.shadow' version '8.1.1' apply false
id 'io.freefair.lombok' version '8.2.2' apply false
}

Expand Down
1 change: 0 additions & 1 deletion centraldogma/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"

testImplementation "com.linecorp.centraldogma:centraldogma-testing-junit4:$centralDogmaVersion"
testRuntimeOnly "ch.qos.logback:logback-classic:1.2.12"
}
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
14 changes: 7 additions & 7 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
Expand Down Expand Up @@ -202,11 +202,11 @@ fi
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.
# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.

set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
Expand Down
1 change: 0 additions & 1 deletion processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"

testImplementation project(":protobuf")
testRuntimeOnly "ch.qos.logback:logback-classic:1.2.12"

testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testFixturesImplementation "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -33,11 +33,11 @@
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.decaton.processor.internal.HashableByteArray;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorScope;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.internal.HashableByteArray;
import com.linecorp.decaton.testing.KafkaClusterExtension;
import com.linecorp.decaton.testing.RandomExtension;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
Expand Down Expand Up @@ -163,16 +163,16 @@ public void testSingleThreadProcessing() throws Exception {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
// Rather, this is just a expected behavior based on current implementation when we set concurrency to 1.
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
private final Map<HashableByteArray, List<TestTask>> produced = new HashMap<>();
private final Map<HashableByteArray, List<TestTask>> processed = new HashMap<>();
private final ConcurrentMap<HashableByteArray, List<TestTask>> produced = new ConcurrentHashMap<>();
private final ConcurrentMap<HashableByteArray, List<TestTask>> processed = new ConcurrentHashMap<>();

@Override
public synchronized void onProduce(ProducedRecord record) {
public void onProduce(ProducedRecord record) {
produced.computeIfAbsent(new HashableByteArray(record.key()), key -> new ArrayList<>()).add(record.task());
}

@Override
public synchronized void onProcess(TaskMetadata metadata, ProcessedRecord record) {
public void onProcess(TaskMetadata metadata, ProcessedRecord record) {
processed.computeIfAbsent(new HashableByteArray(record.key()), key -> new ArrayList<>()).add(record.task());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -66,7 +66,7 @@ public void testPropertyDynamicSwitch() throws Exception {
for (int i = 0; i < 10000; i++) {
keys.add("key" + i);
}
Set<HashableByteArray> processedKeys = Collections.synchronizedSet(new HashSet<>());
Set<HashableByteArray> processedKeys = ConcurrentHashMap.newKeySet();
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void testPropertyDynamicSwitch() throws Exception {
for (int i = 0; i < 10000; i++) {
keys.add("key" + i);
}
Set<HashableByteArray> processedKeys = Collections.synchronizedSet(new HashSet<>());
Set<HashableByteArray> processedKeys = ConcurrentHashMap.newKeySet();
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -79,8 +78,8 @@ public void tearDown() {
}

private static class ProcessRetriedTask implements ProcessingGuarantee {
private final Set<String> producedIds = Collections.synchronizedSet(new HashSet<>());
private final Set<String> processedIds = Collections.synchronizedSet(new HashSet<>());
private final Set<String> producedIds = ConcurrentHashMap.newKeySet();
private final Set<String> processedIds = ConcurrentHashMap.newKeySet();

@Override
public void onProduce(ProducedRecord record) {
Expand All @@ -96,8 +95,13 @@ public void onProcess(TaskMetadata metadata, ProcessedRecord record) {

@Override
public void doAssert() {
TestUtils.awaitCondition("all retried tasks must be processed",
() -> producedIds.size() == processedIds.size());
try {
TestUtils.awaitCondition("all retried tasks must be processed",
() -> producedIds.size() == processedIds.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

Expand Down
Loading
Loading