Skip to content

Commit

Permalink
WebSockets NEXT: add ability to inspect/reject HTTP upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
michalvavrik committed May 30, 2024
1 parent f695e86 commit 6caf91c
Show file tree
Hide file tree
Showing 12 changed files with 839 additions and 3 deletions.
41 changes: 39 additions & 2 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Uni<Void> consumeAsync(Message m) {
}
@OnTextMessage
ReponseMessage process(Message m) {
ResponseMessage process(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client.
Expand All @@ -287,7 +287,7 @@ Uni<ResponseMessage> processAsync(Message m) {
// Note that if the method returns `null`, no response will be sent to the client. The method completes when the returned Uni emits its item.
}
OnTextMessage
@OnTextMessage
Multi<ResponseMessage> stream(Message m) {
// Process the incoming message and send multiple responses to the client.
// The method is called for each incoming message.
Expand Down Expand Up @@ -643,6 +643,43 @@ Other options for securing HTTP upgrade requests, such as using the security ann

NOTE: When OpenID Connect extension is used and token expires, Quarkus automatically closes connection.

== Inspect and/or reject HTTP upgrade

Check warning on line 646 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'a or b' or 'a, b, or both' rather than 'and/or' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'a or b' or 'a, b, or both' rather than 'and/or' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 646, "column": 1}}}, "severity": "WARNING"}

To inspect an HTTP upgrade, you must provide a CDI bean implementing the `io.quarkus.websockets.next.HttpUpgradeCheck` interface.
Quarkus calls the `HttpUpgradeCheck#perform` method on every HTTP request that should be upgraded to a WebSocket connection.
Inside this method, you can perform any business logic and/or reject the HTTP upgrade.

Check warning on line 650 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'a or b' or 'a, b, or both' rather than 'and/or' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'a or b' or 'a, b, or both' rather than 'and/or' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 650, "column": 45}}}, "severity": "WARNING"}

.Example HttpUpgradeCheck
[source, java]
----
package io.quarkus.websockets.next.test;
import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped <1>
public class ExampleHttpUpgradeCheck implements HttpUpgradeCheck {
@Override
public Uni<CheckResult> perform(HttpUpgradeContext ctx) {
if (rejectUpgrade(ctx)) {
return CheckResult.rejectUpgrade(400); <2>
}
return CheckResult.permitUpgrade();
}
private boolean rejectUpgrade(HttpUpgradeContext ctx) {
var headers = ctx.httpRequest().headers();
// implement your business logic in here
}
}
----
<1> The CDI beans implementing `HttpUpgradeCheck` interface can be either `@ApplicationScoped`, `@Singleton` or `@Dependent` beans, but never the `@RequestScoped` beans.
<2> Reject the HTTP upgrade. Initial HTTP handshake ends with the 400 Bad Request response status code.

TIP: You can choose WebSocket endpoints to which the `HttpUpgradeCheck` is applied with the `HttpUpgradeCheck#appliesTo` method.

Check warning on line 681 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 681, "column": 43}}}, "severity": "INFO"}

[[websocket-next-configuration-reference]]
== Configuration reference

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.quarkus.arc.deployment.CustomScopeBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.deployment.TransformedAnnotationsBuildItem;
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.arc.deployment.ValidationPhaseBuildItem;
import io.quarkus.arc.deployment.ValidationPhaseBuildItem.ValidationErrorBuildItem;
import io.quarkus.arc.processor.Annotations;
Expand Down Expand Up @@ -68,6 +69,7 @@
import io.quarkus.vertx.http.deployment.RouteBuildItem;
import io.quarkus.vertx.http.runtime.HandlerType;
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.quarkus.websockets.next.InboundProcessingMode;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketClientException;
Expand Down Expand Up @@ -106,6 +108,7 @@ public class WebSocketProcessor {
static final String SERVER_ENDPOINT_SUFFIX = "_WebSocketServerEndpoint";
static final String CLIENT_ENDPOINT_SUFFIX = "_WebSocketClientEndpoint";
static final String NESTED_SEPARATOR = "$_";
static final DotName HTTP_UPGRADE_CHECK_NAME = DotName.createSimple(HttpUpgradeCheck.class);

// Parameter names consist of alphanumeric characters and underscore
private static final Pattern PATH_PARAM_PATTERN = Pattern.compile("\\{[a-zA-Z0-9_]+\\}");
Expand Down Expand Up @@ -424,6 +427,32 @@ public void registerRoutes(WebSocketServerRecorder recorder, HttpRootPathBuildIt
}
}

@BuildStep
UnremovableBeanBuildItem makeHttpUpgradeChecksUnremovable() {
// we access the checks programmatically
return UnremovableBeanBuildItem.beanTypes(HTTP_UPGRADE_CHECK_NAME);
}

@BuildStep
List<ValidationPhaseBuildItem.ValidationErrorBuildItem> validateHttpUpgradeCheckNotRequestScoped(
ValidationPhaseBuildItem validationPhase) {
return validationPhase
.getContext()
.beans()
.withBeanType(HTTP_UPGRADE_CHECK_NAME)
.filter(b -> {
var targetScope = BuiltinScope.from(b.getScope().getDotName());
return BuiltinScope.APPLICATION != targetScope
&& BuiltinScope.SINGLETON != targetScope
&& BuiltinScope.DEPENDENT != targetScope;
})
.stream()
.map(b -> new ValidationErrorBuildItem(new RuntimeException(("Bean '%s' scope is '%s', but the '%s' "
+ "implementors must be one either `@ApplicationScoped', '@Singleton' or '@Dependent' beans")
.formatted(b.getBeanClass(), b.getScope().getDotName(), HTTP_UPGRADE_CHECK_NAME))))
.toList();
}

@BuildStep
@Record(RUNTIME_INIT)
void serverSyntheticBeans(WebSocketServerRecorder recorder, List<GeneratedEndpointBuildItem> generatedEndpoints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public void testConnectionClosedWhenAuthExpires() {
} else if (System.currentTimeMillis() > threeSecondsFromNow) {
Assertions.fail("Authentication expired, therefore connection should had been closed");
}
client.sendAndAwaitReply("Hello #" + i + " from ");
try {
client.sendAndAwaitReply("Hello #" + i + " from ");
} catch (RuntimeException e) {
// this sometimes fails as connection is closed when waiting for the reply
break;
}
}

var receivedMessages = client.getMessages().stream().map(Buffer::toString).toList();
Expand All @@ -82,6 +87,8 @@ public void testConnectionClosedWhenAuthExpires() {
.atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertTrue(Endpoint.CLOSED_MESSAGE.get()
.startsWith("Connection closed with reason 'Authentication expired'")));

assertTrue(client.isClosed());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.quarkus.websockets.next.test.upgrade;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.quarkus.runtime.util.ExceptionUtil;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.http.UpgradeRejectedException;
import io.vertx.core.http.WebSocketConnectOptions;

public abstract class AbstractHttpUpgradeCheckTestBase {

@Inject
Vertx vertx;

@TestHTTPResource("opening")
URI openingUri;

@TestHTTPResource("responding")
URI respondingUri;

@TestHTTPResource("rejecting")
URI rejectingUri;

@BeforeEach
public void cleanUp() {
Opening.OPENED.set(false);
OpeningHttpUpgradeCheck.INVOKED.set(0);
}

@Test
public void testHttpUpgradeRejected() {
try (WSClient client = new WSClient(vertx)) {
CompletionException ce = assertThrows(CompletionException.class,
() -> client.connect(
new WebSocketConnectOptions().addHeader(RejectingHttpUpgradeCheck.REJECT_HEADER, "ignored"),
rejectingUri));
Throwable root = ExceptionUtil.getRootCause(ce);
assertInstanceOf(UpgradeRejectedException.class, root);
assertTrue(root.getMessage().contains("403"), root.getMessage());
}
}

@Test
public void testHttpUpgradePermitted() {
try (WSClient client = new WSClient(vertx)) {
client.connect(openingUri);
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> OpeningHttpUpgradeCheck.INVOKED.get() == 1);
}
}

@Test
public void testHttpUpgradeOkAndResponding() {
// test no HTTP Upgrade check rejected the upgrade or recorded value
try (WSClient client = new WSClient(vertx)) {
client.connect(new WebSocketConnectOptions(), respondingUri);
var response = client.sendAndAwaitReply("Ho").toString();
assertEquals("Ho Hey", response);
assertEquals(0, OpeningHttpUpgradeCheck.INVOKED.get());
}
}

@WebSocket(path = "/rejecting", endpointId = "rejecting-id")
public static class Rejecting {

@OnTextMessage
public void onMessage(String message) {
// do nothing
}

}

@WebSocket(path = "/opening", endpointId = "opening-id")
public static class Opening {

static final AtomicBoolean OPENED = new AtomicBoolean(false);

@OnTextMessage
public void onMessage(String message) {
// do nothing
}

@OnOpen
void onOpen() {
OPENED.set(true);
}

}

@WebSocket(path = "/responding", endpointId = "closing-id")
public static class Responding {

@OnTextMessage
public String onMessage(String message) {
return message + " Hey";
}

}
}
Loading

0 comments on commit 6caf91c

Please sign in to comment.