Skip to content

Commit

Permalink
Merge branch 'main' into otel-native-metrics-jfr-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
brunobat authored Nov 13, 2024
2 parents fd94b0b + 7fa4088 commit 55f104c
Show file tree
Hide file tree
Showing 30 changed files with 634 additions and 107 deletions.
62 changes: 57 additions & 5 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ and pull requests should be submitted there:
https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc
////
[id="websockets-next-reference-guide"]
= WebSockets Next extension reference guide
= WebSockets Next reference guide
:extension-status: preview
include::_attributes.adoc[]
:numbered:
Expand Down Expand Up @@ -78,7 +78,7 @@ implementation("io.quarkus:quarkus-websockets-next")

== Endpoints

Both the server and client APIs allow you to define _endpoints_ that are used to consume and send messages.
Both the <<server-api>> and <<client-api>> define _endpoints_ that are used to consume and send messages.
The endpoints are implemented as CDI beans and support injection.
Endpoints declare <<callback-methods,_callback methods_>> annotated with `@OnTextMessage`, `@OnBinaryMessage`, `@OnPong`, `@OnOpen`, `@OnClose` and `@OnError`.
These methods are used to handle various WebSocket events.
Expand Down Expand Up @@ -559,6 +559,7 @@ This means that if an endpoint receives events `A` and `B` (in this particular o
However, in some situations it is preferable to process events concurrently, i.e. with no ordering guarantees but also with no concurrency limits.
For this cases, the `InboundProcessingMode#CONCURRENT` should be used.

[[server-api]]
== Server API

=== HTTP server configuration
Expand Down Expand Up @@ -900,14 +901,15 @@ public class CustomTenantResolver implements TenantResolver {
----
For more information on Hibernate multitenancy, refer to the https://quarkus.io/guides/hibernate-orm#multitenancy[hibernate documentation].

[[client-api]]
== Client API

[[client-connectors]]
=== Client connectors

The `io.quarkus.websockets.next.WebSocketConnector<CLIENT>` is used to configure and create new connections for client endpoints.
A CDI bean that implements this interface is provided and can be injected in other beans.
The actual type argument is used to determine the client endpoint.
A connector can be used to configure and open a new client connection backed by a client endpoint that is used to consume and send messages.
Quarkus provides a CDI bean with bean type `io.quarkus.websockets.next.WebSocketConnector<CLIENT>` and default qualifer that can be injected in other beans.
The actual type argument of an injection point is used to determine the client endpoint.
The type is validated during build - if it does not represent a client endpoint the build fails.

Let’s consider the following client endpoint:
Expand Down Expand Up @@ -955,6 +957,31 @@ public class MyBean {

NOTE: If an application attempts to inject a connector for a missing endpoint, an error is thrown.

Connectors are not thread-safe and should not be used concurrently.
Connectors should also not be reused.
If you need to create multiple connections in a row you'll need to obtain a new connetor instance programmatically using `Instance#get()`:

[source, java]
----
import jakarta.enterprise.inject.Instance;
@Singleton
public class MyBean {
@Inject
Instance<WebSocketConnector<MyEndpoint>> connector;
void connect() {
var connection1 = connector.get().baseUri(uri)
.addHeader("Foo", "alpha")
.connectAndAwait();
var connection2 = connector.get().baseUri(uri)
.addHeader("Foo", "bravo")
.connectAndAwait();
}
}
----

==== Basic connector

In the case where the application developer does not need the combination of the client endpoint and the connector, a _basic connector_ can be used.
Expand Down Expand Up @@ -991,6 +1018,31 @@ The basic connector is closer to a low-level API and is reserved for advanced us
However, unlike others low-level WebSocket clients, it is still a CDI bean and can be injected in other beans.
It also provides a way to configure the execution model of the callbacks, ensuring optimal integration with the rest of Quarkus.

Connectors are not thread-safe and should not be used concurrently.
Connectors should also not be reused.
If you need to create multiple connections in a row you'll need to obtain a new connetor instance programmatically using `Instance#get()`:

[source, java]
----
import jakarta.enterprise.inject.Instance;
@Singleton
public class MyBean {
@Inject
Instance<BasicWebSocketConnector> connector;
void connect() {
var connection1 = connector.get().baseUri(uri)
.addHeader("Foo", "alpha")
.connectAndAwait();
var connection2 = connector.get().baseUri(uri)
.addHeader("Foo", "bravo")
.connectAndAwait();
}
}
----

[[ws-client-connection]]
=== WebSocket client connection

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.quarkus.micrometer.runtime.MicrometerRecorder;
import io.quarkus.micrometer.runtime.MicrometerTimedInterceptor;
import io.quarkus.micrometer.runtime.config.MicrometerConfig;
import io.quarkus.micrometer.runtime.export.exemplars.NoopOpenTelemetryExemplarContextUnwrapper;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.vertx.http.deployment.NonApplicationRootPathBuildItem;
Expand Down Expand Up @@ -93,6 +94,15 @@ MetricsCapabilityBuildItem metricsCapabilityBuildItem() {
null);
}

@BuildStep(onlyIfNot = PrometheusRegistryProcessor.PrometheusEnabled.class)
void registerEmptyExamplarProvider(
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(NoopOpenTelemetryExemplarContextUnwrapper.class)
.setUnremovable()
.build());
}

@BuildStep(onlyIf = { PrometheusRegistryProcessor.PrometheusEnabled.class })
MetricsCapabilityBuildItem metricsCapabilityPrometheusBuildItem(
NonApplicationRootPathBuildItem nonApplicationRootPathBuildItem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import io.quarkus.micrometer.runtime.MicrometerRecorder;
import io.quarkus.micrometer.runtime.config.MicrometerConfig;
import io.quarkus.micrometer.runtime.config.PrometheusConfigGroup;
import io.quarkus.micrometer.runtime.export.EmptyExemplarSamplerProvider;
import io.quarkus.micrometer.runtime.export.OpentelemetryExemplarSamplerProvider;
import io.quarkus.micrometer.runtime.export.PrometheusRecorder;
import io.quarkus.micrometer.runtime.export.exemplars.EmptyExemplarSamplerProvider;
import io.quarkus.micrometer.runtime.export.exemplars.NoopOpenTelemetryExemplarContextUnwrapper;
import io.quarkus.micrometer.runtime.export.exemplars.OpenTelemetryExemplarContextUnwrapper;
import io.quarkus.micrometer.runtime.export.exemplars.OpentelemetryExemplarSamplerProvider;
import io.quarkus.vertx.http.deployment.NonApplicationRootPathBuildItem;
import io.quarkus.vertx.http.deployment.RouteBuildItem;
import io.quarkus.vertx.http.runtime.management.ManagementInterfaceBuildTimeConfig;
Expand Down Expand Up @@ -73,6 +75,7 @@ void registerOpentelemetryExemplarSamplerProvider(
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(OpentelemetryExemplarSamplerProvider.class)
.addBeanClass(OpenTelemetryExemplarContextUnwrapper.class)
.setUnremovable()
.build());
}
Expand All @@ -82,6 +85,7 @@ void registerEmptyExamplarProvider(
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(EmptyExemplarSamplerProvider.class)
.addBeanClass(NoopOpenTelemetryExemplarContextUnwrapper.class)
.setUnremovable()
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.quarkus.micrometer.runtime.HttpServerMetricsTagsContributor;
import io.quarkus.micrometer.runtime.binder.HttpBinderConfiguration;
import io.quarkus.micrometer.runtime.binder.HttpCommonTags;
import io.quarkus.micrometer.runtime.export.exemplars.OpenTelemetryContextUnwrapper;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
Expand All @@ -40,6 +41,7 @@ public class VertxHttpServerMetrics extends VertxTcpServerMetrics
static final Logger log = Logger.getLogger(VertxHttpServerMetrics.class);

HttpBinderConfiguration config;
OpenTelemetryContextUnwrapper openTelemetryContextUnwrapper;

final LongAdder activeRequests;

Expand All @@ -49,9 +51,12 @@ public class VertxHttpServerMetrics extends VertxTcpServerMetrics

private final List<HttpServerMetricsTagsContributor> httpServerMetricsTagsContributors;

VertxHttpServerMetrics(MeterRegistry registry, HttpBinderConfiguration config) {
VertxHttpServerMetrics(MeterRegistry registry,
HttpBinderConfiguration config,
OpenTelemetryContextUnwrapper openTelemetryContextUnwrapper) {
super(registry, "http.server", null);
this.config = config;
this.openTelemetryContextUnwrapper = openTelemetryContextUnwrapper;

activeRequests = new LongAdder();
Gauge.builder(config.getHttpServerActiveRequestsName(), activeRequests, LongAdder::doubleValue)
Expand Down Expand Up @@ -164,12 +169,14 @@ public void requestReset(HttpRequestMetric requestMetric) {
if (path != null) {
Timer.Sample sample = requestMetric.getSample();

sample.stop(requestsTimer
.withTags(Tags.of(
openTelemetryContextUnwrapper.executeInContext(
sample::stop,
requestsTimer.withTags(Tags.of(
VertxMetricsTags.method(requestMetric.request().method()),
HttpCommonTags.uri(path, requestMetric.initialPath, 0),
Outcome.CLIENT_ERROR.asTag(),
HttpCommonTags.STATUS_RESET)));
HttpCommonTags.STATUS_RESET)),
requestMetric.request().context());
}
requestMetric.requestEnded();
}
Expand Down Expand Up @@ -207,7 +214,10 @@ public void responseEnd(HttpRequestMetric requestMetric, HttpResponse response,
}
}

sample.stop(requestsTimer.withTags(allTags));
openTelemetryContextUnwrapper.executeInContext(
sample::stop,
requestsTimer.withTags(allTags),
requestMetric.request().context());
}
requestMetric.requestEnded();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.quarkus.micrometer.runtime.binder.HttpBinderConfiguration;
import io.quarkus.micrometer.runtime.export.exemplars.OpenTelemetryContextUnwrapper;
import io.quarkus.vertx.http.runtime.ExtendedQuarkusVertxHttpMetrics;
import io.vertx.core.VertxOptions;
import io.vertx.core.datagram.DatagramSocketOptions;
Expand All @@ -37,11 +38,14 @@ public class VertxMeterBinderAdapter extends MetricsOptions
public static final String METRIC_NAME_SEPARATOR = "|";

private HttpBinderConfiguration httpBinderConfiguration;
private OpenTelemetryContextUnwrapper openTelemetryContextUnwrapper;

public VertxMeterBinderAdapter() {
}

void setHttpConfig(HttpBinderConfiguration httpBinderConfiguration) {
void initBinder(HttpBinderConfiguration httpBinderConfiguration,
OpenTelemetryContextUnwrapper openTelemetryContextUnwrapper) {
this.openTelemetryContextUnwrapper = openTelemetryContextUnwrapper;
this.httpBinderConfiguration = httpBinderConfiguration;
}

Expand Down Expand Up @@ -70,9 +74,12 @@ public MetricsOptions newOptions() {
if (httpBinderConfiguration == null) {
throw new NoStackTraceException("HttpBinderConfiguration was not found");
}
if (openTelemetryContextUnwrapper == null) {
throw new NoStackTraceException("OpenTelemetryContextUnwrapper was not found");
}
if (httpBinderConfiguration.isServerEnabled()) {
log.debugf("Create HttpServerMetrics with options %s and address %s", options, localAddress);
return new VertxHttpServerMetrics(Metrics.globalRegistry, httpBinderConfiguration);
return new VertxHttpServerMetrics(Metrics.globalRegistry, httpBinderConfiguration, openTelemetryContextUnwrapper);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.quarkus.arc.Arc;
import io.quarkus.micrometer.runtime.binder.HttpBinderConfiguration;
import io.quarkus.micrometer.runtime.export.exemplars.OpenTelemetryContextUnwrapper;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.VertxOptions;
Expand All @@ -30,18 +31,20 @@ public void accept(VertxOptions vertxOptions) {
/* RUNTIME_INIT */
public void configureBinderAdapter() {
HttpBinderConfiguration httpConfig = Arc.container().instance(HttpBinderConfiguration.class).get();
OpenTelemetryContextUnwrapper openTelemetryContextUnwrapper = Arc.container()
.instance(OpenTelemetryContextUnwrapper.class).get();
if (LaunchMode.current() == LaunchMode.DEVELOPMENT) {
if (devModeConfig == null) {
// Create an object whose attributes we can update
devModeConfig = httpConfig.unwrap();
binderAdapter.setHttpConfig(devModeConfig);
binderAdapter.initBinder(devModeConfig, openTelemetryContextUnwrapper);
} else {
// update config attributes
devModeConfig.update(httpConfig);
}
} else {
// unwrap the CDI bean (use POJO)
binderAdapter.setHttpConfig(httpConfig.unwrap());
binderAdapter.initBinder(httpConfig.unwrap(), openTelemetryContextUnwrapper);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.micrometer.runtime.export;
package io.quarkus.micrometer.runtime.export.exemplars;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.micrometer.runtime.export.exemplars;

import java.util.function.Function;

import jakarta.enterprise.context.Dependent;

import io.vertx.core.Context;

@Dependent
public class NoopOpenTelemetryExemplarContextUnwrapper implements OpenTelemetryContextUnwrapper {

@Override
public <P, R> R executeInContext(Function<P, R> methodReference, P parameter, Context requestContext) {
return methodReference.apply(parameter);// pass through
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.micrometer.runtime.export.exemplars;

import java.util.function.Function;

public interface OpenTelemetryContextUnwrapper {
/**
* Called when an HTTP server response has ended.
* Makes sure exemplars are produced because they have an OTel context.
*
* @param methodReference Ex: Sample stop method reference
* @param parameter The parameter to pass to the method
* @param requestContext The request context
* @param <P> The parameter type is a type of metric, ex: Timer
* @param <R> The return type of the method pointed by the methodReference
* @return The result of the method
*/
<P, R> R executeInContext(Function<P, R> methodReference, P parameter, io.vertx.core.Context requestContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.quarkus.micrometer.runtime.export.exemplars;

import java.util.function.Function;

import jakarta.enterprise.context.Dependent;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

@Dependent
public class OpenTelemetryExemplarContextUnwrapper implements OpenTelemetryContextUnwrapper {

@Override
public <P, R> R executeInContext(Function<P, R> methodReference, P parameter, io.vertx.core.Context requestContext) {
if (requestContext == null) {
return methodReference.apply(parameter);
}

Context newContext = QuarkusContextStorage.getContext(requestContext);

if (newContext == null) {
return methodReference.apply(parameter);
}

io.opentelemetry.context.Context oldContext = QuarkusContextStorage.INSTANCE.current();
try (Scope scope = QuarkusContextStorage.INSTANCE.attach(newContext)) {
return methodReference.apply(parameter);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.micrometer.runtime.export;
package io.quarkus.micrometer.runtime.export.exemplars;

import java.util.Optional;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ public void assertCountPointsAtLeast(final String name, final String target, fin
.untilAsserted(() -> {
List<MetricData> metricData = getFinishedMetricItems(name, target);
Assertions.assertTrue(1 <= metricData.size());
Assertions.assertTrue(countPoints <= metricData.get(0).getData().getPoints().size());
Assertions.assertTrue(countPoints <= metricData.stream()
.reduce((first, second) -> second) // get the last received
.orElse(null)
.getData()
.getPoints()
.size());
});
}

Expand Down
Loading

0 comments on commit 55f104c

Please sign in to comment.