From ea3960acdcc6b2abcfe889c361eec76df06d7b34 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Tue, 9 Apr 2024 13:43:40 -0500 Subject: [PATCH] PrometheusHttpServer prevent concurrent reads when reusable memory mode --- .../prometheus/PrometheusHttpServer.java | 7 ++ .../PrometheusHttpServerBuilder.java | 9 ++- .../prometheus/PrometheusHttpServerTest.java | 65 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java index e4a15612856..f815ea0ac54 100644 --- a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java +++ b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java @@ -23,6 +23,7 @@ import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -70,6 +71,12 @@ public static PrometheusHttpServerBuilder builder() { this.memoryMode = memoryMode; this.prometheusRegistry = prometheusRegistry; prometheusRegistry.register(prometheusMetricReader); + // When memory mode is REUSABLE_DATA, concurrent reads lead to data corruption. To prevent this, + // we configure prometheus with a single thread executor such that requests are handled + // sequentially. + if (memoryMode == MemoryMode.REUSABLE_DATA) { + executor = Executors.newSingleThreadExecutor(); + } try { this.httpServer = HTTPServer.builder() diff --git a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java index 3c20eea3a2f..2be7dee6215 100644 --- a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java +++ b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java @@ -11,6 +11,7 @@ import io.opentelemetry.sdk.common.export.MemoryMode; import io.prometheus.metrics.model.registry.PrometheusRegistry; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -93,7 +94,13 @@ public PrometheusHttpServerBuilder setAllowedResourceAttributesFilter( return this; } - /** Set the {@link MemoryMode}. */ + /** + * Set the {@link MemoryMode}. + * + *

If set to {@link MemoryMode#REUSABLE_DATA}, requests are served sequentially which is + * accomplished by overriding {@link #setExecutor(ExecutorService)} to {@link + * Executors#newSingleThreadExecutor()}. + */ public PrometheusHttpServerBuilder setMemoryMode(MemoryMode memoryMode) { requireNonNull(memoryMode, "memoryMode"); this.memoryMode = memoryMode; diff --git a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java index 3fc114c1ed3..449cc4c5c4c 100644 --- a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java +++ b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.CollectionRegistration; @@ -39,6 +40,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -47,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.zip.GZIPInputStream; @@ -129,6 +132,68 @@ void fetchPrometheus() { + "target_info{kr=\"vr\"} 1\n"); } + @Test + void fetch_ReusableMemoryMode() throws InterruptedException { + try (PrometheusHttpServer prometheusServer = + PrometheusHttpServer.builder() + .setHost("localhost") + .setPort(0) + .setMemoryMode(MemoryMode.REUSABLE_DATA) + .build()) { + AtomicBoolean collectInProgress = new AtomicBoolean(); + AtomicBoolean concurrentRead = new AtomicBoolean(); + prometheusServer.register( + new CollectionRegistration() { + @Override + public Collection collectAllMetrics() { + if (!collectInProgress.compareAndSet(false, true)) { + concurrentRead.set(true); + } + Collection response = metricData.get(); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (!collectInProgress.compareAndSet(true, false)) { + concurrentRead.set(true); + } + return response; + } + }); + + WebClient client = + WebClient.builder("http://localhost:" + prometheusServer.getAddress().getPort()) + .decorator(RetryingClient.newDecorator(RetryRule.failsafe())) + .build(); + + // Spin up 4 threads calling /metrics simultaneously. If concurrent read happens, + // collectAllMetrics will set concurrentRead to true and the test will fail. + List threads = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + Thread thread = + new Thread( + () -> { + for (int j = 0; j < 10; j++) { + AggregatedHttpResponse response = client.get("/metrics").aggregate().join(); + assertThat(response.status()).isEqualTo(HttpStatus.OK); + } + }); + thread.setDaemon(true); + thread.start(); + threads.add(thread); + } + + // Wait for threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Confirm no concurrent reads took place + assertThat(concurrentRead.get()).isFalse(); + } + } + @Test void fetchOpenMetrics() { AggregatedHttpResponse response =