Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async invocations using optional synthetic SimplyTimed behavior #2745

Merged
merged 4 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,15 +17,18 @@
package io.helidon.microprofile.metrics;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.Priority;
import javax.inject.Inject;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;

import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.SimpleTimer;

/**
Expand All @@ -42,12 +45,12 @@ final class InterceptorSyntheticSimplyTimed {

private static final Logger LOGGER = Logger.getLogger(InterceptorSyntheticSimplyTimed.class.getName());

private final MetricRegistry metricRegistry;
private final boolean isEnabled;
private final RestEndpointMetricsInfo restEndpointMetricsInfo;

@Inject
InterceptorSyntheticSimplyTimed(MetricRegistry registry, RestEndpointMetricsInfo restEndpointMetricsInfo) {
metricRegistry = registry;
InterceptorSyntheticSimplyTimed(RestEndpointMetricsInfo restEndpointMetricsInfo) {
this.restEndpointMetricsInfo = restEndpointMetricsInfo;
isEnabled = restEndpointMetricsInfo.isEnabled();
}

Expand All @@ -63,16 +66,45 @@ public Object interceptRestEndpoint(InvocationContext context) throws Throwable
if (!isEnabled) {
return context.proceed();
}
long startNanos = System.nanoTime();
try {
LOGGER.fine("Interceptor of SyntheticSimplyTimed called for '" + context.getTarget().getClass()
+ "::" + context.getMethod().getName() + "'");

Method timedMethod = context.getMethod();
SimpleTimer simpleTimer = MetricsCdiExtension.syntheticSimpleTimer(timedMethod);
AsyncResponse asyncResponse = restEndpointMetricsInfo.asyncResponse(context);
if (asyncResponse != null) {
asyncResponse.register(new FinishCallback(startNanos, simpleTimer));
return context.proceed();
}
return simpleTimer.time(context::proceed);
} catch (Throwable t) {
LOGGER.fine("Throwable caught by interceptor '" + t.getMessage() + "'");
LOGGER.log(Level.FINE, "Throwable caught by interceptor", t);
throw t;
}
}

/**
* Async callback which updates a {@code SimpleTimer} associated with the REST endpoint.
*/
static class FinishCallback implements CompletionCallback {

private final long startTimeNanos;
private final SimpleTimer simpleTimer;

private FinishCallback(long startTimeNanos, SimpleTimer simpleTimer) {
this.simpleTimer = simpleTimer;
this.startTimeNanos = startTimeNanos;
}

@Override
public void onComplete(Throwable throwable) {
long nowNanos = System.nanoTime();
simpleTimer.update(Duration.ofNanos(nowNanos - startTimeNanos));
if (throwable != null) {
LOGGER.log(Level.FINE, "Throwable detected by interceptor callback", throwable);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,13 +61,16 @@
import javax.inject.Qualifier;
import javax.inject.Singleton;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.OPTIONS;
import javax.ws.rs.PATCH;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

import io.helidon.common.Errors;
import io.helidon.common.context.Contexts;
Expand Down Expand Up @@ -142,6 +145,7 @@ public class MetricsCdiExtension implements Extension {
private final Map<Class<?>, Set<Method>> methodsWithSyntheticSimpleTimer = new HashMap<>();
private final Set<Class<?>> syntheticSimpleTimerClassesProcessed = new HashSet<>();
private final Set<Method> syntheticSimpleTimersToRegister = new HashSet<>();
private final Map<Method, AsyncResponseInfo> asyncSyntheticSimpleTimerInfo = new HashMap<>();

@SuppressWarnings("unchecked")
private static <T> T getReference(BeanManager bm, Type type, Bean<?> bean) {
Expand Down Expand Up @@ -312,6 +316,12 @@ void before(@Observes BeforeBeanDiscovery discovery) {
// Config might disable the MP synthetic SimpleTimer feature for JAX-RS endpoints.
// For efficiency, prepare to consult config only once rather than from each interceptor instance.
discovery.addAnnotatedType(RestEndpointMetricsInfo.class, RestEndpointMetricsInfo.class.getSimpleName());

asyncSyntheticSimpleTimerInfo.clear();
}

Map<Method, AsyncResponseInfo> asyncResponseInfo() {
return asyncSyntheticSimpleTimerInfo;
}

private void clearAnnotationInfo(@Observes AfterDeploymentValidation adv) {
Expand Down Expand Up @@ -517,6 +527,25 @@ static SimpleTimer syntheticSimpleTimer(Method method) {
.simpleTimer(SYNTHETIC_SIMPLE_TIMER_METADATA, syntheticSimpleTimerMetricTags(method));
}

private SimpleTimer registerAndSaveAsyncSyntheticSimpleTimer(Method method) {
SimpleTimer result = syntheticSimpleTimer(method);
asyncSyntheticSimpleTimerInfo.computeIfAbsent(method, this::asyncResponse);
return result;
}

private AsyncResponseInfo asyncResponse(Method m) {
int candidateAsyncResponseParameterSlot = 0;

for (Parameter p : m.getParameters()) {
if (AsyncResponse.class.isAssignableFrom(p.getType()) && p.getAnnotation(Suspended.class) != null) {
return new AsyncResponseInfo(candidateAsyncResponseParameterSlot);
}
candidateAsyncResponseParameterSlot++;

}
return null;
}

/**
* Creates the {@link MetricID} for the synthetic {@link SimplyTimed} annotation we add to each JAX-RS method.
*
Expand Down Expand Up @@ -656,7 +685,7 @@ private void collectSyntheticSimpleTimerMetric(@Observes ProcessManagedBean<?> p
}

private void registerSyntheticSimpleTimerMetrics(@Observes @RuntimeStart Object event) {
syntheticSimpleTimersToRegister.forEach(MetricsCdiExtension::syntheticSimpleTimer);
syntheticSimpleTimersToRegister.forEach(this::registerAndSaveAsyncSyntheticSimpleTimer);
if (LOGGER.isLoggable(Level.FINE)) {
Set<Class<?>> syntheticSimpleTimerAnnotatedClassesIgnored = new HashSet<>(methodsWithSyntheticSimpleTimer.keySet());
syntheticSimpleTimerAnnotatedClassesIgnored.removeAll(syntheticSimpleTimerClassesProcessed);
Expand All @@ -670,7 +699,7 @@ private void registerSyntheticSimpleTimerMetrics(@Observes @RuntimeStart Object
syntheticSimpleTimersToRegister.clear();
}

static boolean restEndpointsMetricEnabledFromConfig() {
boolean restEndpointsMetricEnabledFromConfig() {
try {
return ((Config) (ConfigProvider.getConfig()))
.get("metrics")
Expand Down Expand Up @@ -934,4 +963,28 @@ public boolean isSynthetic() {
return annotatedMember.getJavaMember().isSynthetic();
}
}

/**
* A {@code AsyncResponse} parameter annotated with {@code Suspended} in a JAX-RS method subject to inferred
* {@code SimplyTimed} behavior.
*/
static class AsyncResponseInfo {

// which parameter slot in the method the AsyncResponse is
private final int parameterSlot;

AsyncResponseInfo(int parameterSlot) {
this.parameterSlot = parameterSlot;
}

/**
* Returns the {@code AsyncResponse} argument object in the given invocation.
*
* @param context the {@code InvocationContext} representing the call with an {@code AsyncResponse} parameter
* @return the {@code AsyncResponse} instance
*/
AsyncResponse asyncResponse(InvocationContext context) {
return AsyncResponse.class.cast(context.getParameters()[parameterSlot]);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,25 +16,43 @@
*/
package io.helidon.microprofile.metrics;

import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.util.Map;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;
import javax.interceptor.InvocationContext;
import javax.ws.rs.container.AsyncResponse;

import io.helidon.microprofile.metrics.MetricsCdiExtension.AsyncResponseInfo;

/**
* Captures whether configuration enables or disables synthetic {@code SimplyMetric} annotation
* behavior efficiently so interceptor instances know efficiently whether to find and update
* the corresponding metrics or not.
* Captures information about REST endpoint synthetic annotations so interceptors can be quicker. Includes:
* <ul>
* <li>whether configuration enables or disables synthetic {@code SimplyMetric} annotation behavior</li>
* <li>which JAX-RS endpoint methods (if any) are asynchronous</li>
* </ul>
*/
@ApplicationScoped
class RestEndpointMetricsInfo {

private boolean isEnabled;
private Map<Method, AsyncResponseInfo> asyncResponseInfo;

@PostConstruct
void setup() {
isEnabled = MetricsCdiExtension.restEndpointsMetricEnabledFromConfig();
@Inject
RestEndpointMetricsInfo(BeanManager beanManager) {
MetricsCdiExtension metricsCdiExtension = beanManager.getExtension(MetricsCdiExtension.class);
isEnabled = metricsCdiExtension.restEndpointsMetricEnabledFromConfig();
asyncResponseInfo = metricsCdiExtension.asyncResponseInfo();
}

public boolean isEnabled() {
return isEnabled;
}

public AsyncResponse asyncResponse(InvocationContext context) {
AsyncResponseInfo info = asyncResponseInfo.get(context.getMethod());
return info == null ? null : info.asyncResponse(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.microprofile.metrics;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import javax.ws.rs.client.AsyncInvoker;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;

import io.helidon.microprofile.metrics.InterceptorSyntheticSimplyTimed.FinishCallback;
import io.helidon.microprofile.tests.junit5.HelidonTest;

import org.eclipse.microprofile.metrics.MetricID;
import org.eclipse.microprofile.metrics.SimpleTimer;
import org.eclipse.microprofile.metrics.Tag;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@HelidonTest
public class HelloWorldAsyncResponseTest extends HelloWorldTest {

@Test
public void test() {
String result = webTarget
.path("helloworld/slow")
.request()
.accept(MediaType.TEXT_PLAIN)
.get(String.class);

assertThat("Mismatched string result", result, is(HelloWorldResource.SLOW_RESPONSE));

Tag[] tags = {
new Tag("class", HelloWorldResource.class.getName()),
new Tag("method", "slowMessage_" + AsyncResponse.class.getName())
};

SimpleTimer simpleTimer = syntheticSimpleTimerRegistry().simpleTimer("REST.request", tags);
assertThat(simpleTimer, is(notNullValue()));
Duration minDuration = Duration.ofSeconds(HelloWorldResource.SLOW_DELAY_SECS);
assertThat(simpleTimer.getElapsedTime().compareTo(minDuration), is(greaterThan(0)));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,13 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* HelloWorldResource class.
Expand All @@ -34,6 +40,13 @@
@RequestScoped
public class HelloWorldResource {

static final String SLOW_RESPONSE = "At last";

// In case pipeline runs need a different time
static final int SLOW_DELAY_SECS = Integer.getInteger("helidon.asyncSimplyTimedDelaySeconds", 2);

private static ExecutorService executorService = Executors.newSingleThreadExecutor();

@Inject
MetricRegistry metricRegistry;

Expand All @@ -51,4 +64,18 @@ public String message() {
public String messageWithArg(String input){
return "Hello World, " + input;
}

@GET
@Path("/slow")
@Produces(MediaType.TEXT_PLAIN)
public void slowMessage(@Suspended AsyncResponse ar) {
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(SLOW_DELAY_SECS);
} catch (InterruptedException e) {
// absorb silently
}
ar.resume(SLOW_RESPONSE);
});
}
}