diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index dadc40e07d033..c5bce0818abe3 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.cache.runtime; import java.time.Duration; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.Supplier; @@ -16,6 +17,9 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.TimeoutException; import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; @CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding. @Interceptor @@ -53,6 +57,7 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { try { ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType()); if (returnType != ReturnType.NonAsync) { + Context context = Vertx.currentContext(); Uni cacheValue = cache.getAsync(key, new Function>() { @SuppressWarnings("unchecked") @Override @@ -65,11 +70,32 @@ public Uni apply(Object key) { throw new CacheException(e); } } + }).emitOn(new Executor() { + // We need make sure we go back to the original context when the cache value is computed. + // Otherwise, we would always emit on the context having computed the value, which could + // break the duplicated context isolation. + @Override + public void execute(Runnable command) { + Context ctx = Vertx.currentContext(); + if (ctx == context) { + // We are already on the right context, execute immediately. + command.run(); + } else { + // Jump back to the captured context. + context.runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } }); if (binding.lockTimeout() <= 0) { return createAsyncResult(cacheValue, returnType); } + // IMPORTANT: The item/failure are emitted on the captured context. cacheValue = cacheValue.ifNoItem().after(Duration.ofMillis(binding.lockTimeout())) .recoverWithUni(new Supplier>() { @Override