Skip to content

Commit

Permalink
Merge branch '1.13.x' into 1.14.x
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Dec 5, 2024
2 parents c95db08 + 8cd92d9 commit a7d7488
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,11 @@ private void monitor(MeterRegistry registry, ForkJoinPool fj) {
+ "underestimates the actual total number of steals when the pool " + "is not quiescent")
.register(registry),

Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)
Gauge
.builder(metricPrefix + "executor.queued", fj,
pool -> pool.getQueuedTaskCount() + pool.getQueuedSubmissionCount())
.tags(tags)
.description("An estimate of the total number of tasks currently held in queues by worker threads")
.description("The approximate number of tasks that are queued for execution")
.register(registry),

Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.params.provider.CsvSource;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.AssertionsForClassTypes.*;
import static org.awaitility.Awaitility.await;
Expand Down Expand Up @@ -302,6 +303,32 @@ void monitorScheduledExecutorServiceWithRepetitiveTasks(String metricPrefix, Str
assertThat(registry.get(expectedMetricPrefix + "executor.idle").tags(userTags).timer().count()).isEqualTo(0L);
}

@Test
@Issue("#5650")
void queuedSubmissionsAreIncludedInExecutorQueuedMetric() {
ForkJoinPool pool = new ForkJoinPool(1, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false, 1, 1, 1,
a -> true, 555, TimeUnit.MILLISECONDS);
ExecutorServiceMetrics.monitor(registry, pool, "myForkJoinPool");
AtomicBoolean busy = new AtomicBoolean(true);

// will be an active task
pool.execute(() -> {
while (busy.get()) {
}
});

// will be queued for submission
pool.execute(() -> {
});
pool.execute(() -> {
});

double queued = registry.get("executor.queued").tag("name", "myForkJoinPool").gauge().value();
busy.set(false);

assertThat(queued).isEqualTo(2.0);
}

@SuppressWarnings("unchecked")
private <T extends Executor> T monitorExecutorService(String executorName, String metricPrefix, T exec) {
if (metricPrefix == null) {
Expand Down

0 comments on commit a7d7488

Please sign in to comment.