Skip to content

Commit

Permalink
Handle duplicated context in the CacheResultInterceptor
Browse files Browse the repository at this point in the history
- Capture the context when calling the interceptor
- Make sure the item is emitted on the captured context

(cherry picked from commit f781b7f)
  • Loading branch information
cescoffier authored and gsmet committed Nov 15, 2023
1 parent d6e7fcd commit 87d5728
Showing 1 changed file with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.cache.runtime;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -16,6 +17,10 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.TimeoutException;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

@CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding.
@Interceptor
Expand Down Expand Up @@ -53,6 +58,7 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
try {
ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
if (returnType != ReturnType.NonAsync) {
Context context = Vertx.currentContext();
Uni<Object> cacheValue = cache.getAsync(key, new Function<Object, Uni<Object>>() {
@SuppressWarnings("unchecked")
@Override
Expand All @@ -65,11 +71,54 @@ public Uni<Object> apply(Object key) {
throw new CacheException(e);
}
}
}).emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
Context ctx = Vertx.currentContext();
if (context == null) {
// We didn't capture a context
if (ctx == null) {
// We are not on a context => we can execute immediately.
command.run();
} else {
// We are on a context.
// We cannot continue on the current context as we may share a duplicated context.
// We need a new one. Note that duplicate() does not duplicate the duplicated context,
// but the root context.
((ContextInternal) ctx).duplicate()
.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
} else {
// We captured a context.
if (ctx == context) {
// We are on the same context => we can execute immediately
command.run();
} else {
// 1) We are not on a context (ctx == null) => we need to switch to the captured context.
// 2) We are on a different context (ctx != null) => we need to switch to the captured context.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
}
});

if (binding.lockTimeout() <= 0) {
return createAsyncResult(cacheValue, returnType);
}
// IMPORTANT: The item/failure are emitted on the captured context.
cacheValue = cacheValue.ifNoItem().after(Duration.ofMillis(binding.lockTimeout()))
.recoverWithUni(new Supplier<Uni<?>>() {
@Override
Expand Down

0 comments on commit 87d5728

Please sign in to comment.