Skip to content

Commit

Permalink
Merge pull request quarkusio#40545 from mkouba/issue-40535
Browse files Browse the repository at this point in the history
WebSockets Next: support close status code/reason
  • Loading branch information
mkouba authored May 10, 2024
2 parents 84112bd + 2d5bbf9 commit 33db951
Show file tree
Hide file tree
Showing 15 changed files with 303 additions and 13 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ Methods annotated with `@OnOpen` and `@OnClose` may accept the following paramet
* `WebSocketConnection`
* `HandshakeRequest`
* `String` parameters annotated with `@PathParam`

An endpoint method annotated with `@OnClose` may also accept the `io.quarkus.websockets.next.CloseReason` parameter that may indicate a reason for closing a connection.

=== Allowed Returned Types

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.websockets.next.deployment;

import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.websockets.next.CloseReason;
import io.quarkus.websockets.next.runtime.WebSocketConnectionBase;

class CloseReasonCallbackArgument implements CallbackArgument {

@Override
public boolean matches(ParameterContext context) {
return context.callbackAnnotation().name().equals(WebSocketDotNames.ON_CLOSE)
&& context.parameter().type().name().equals(WebSocketDotNames.CLOSE_REASON);
}

@Override
public ResultHandle get(InvocationBytecodeContext context) {
return context.bytecode().invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketConnectionBase.class, "closeReason", CloseReason.class),
context.getConnection());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.jboss.jandex.DotName;

import io.quarkus.websockets.next.CloseReason;
import io.quarkus.websockets.next.HandshakeRequest;
import io.quarkus.websockets.next.OnBinaryMessage;
import io.quarkus.websockets.next.OnClose;
Expand Down Expand Up @@ -52,6 +53,7 @@ final class WebSocketDotNames {
static final DotName PATH_PARAM = DotName.createSimple(PathParam.class);
static final DotName HANDSHAKE_REQUEST = DotName.createSimple(HandshakeRequest.class);
static final DotName THROWABLE = DotName.createSimple(Throwable.class);
static final DotName CLOSE_REASON = DotName.createSimple(CloseReason.class);

static final List<DotName> CALLBACK_ANNOTATIONS = List.of(ON_OPEN, ON_CLOSE, ON_BINARY_MESSAGE, ON_TEXT_MESSAGE,
ON_PONG_MESSAGE, ON_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ void builtinCallbackArguments(BuildProducer<CallbackArgumentBuildItem> providers
providers.produce(new CallbackArgumentBuildItem(new PathParamCallbackArgument()));
providers.produce(new CallbackArgumentBuildItem(new HandshakeRequestCallbackArgument()));
providers.produce(new CallbackArgumentBuildItem(new ErrorCallbackArgument()));
providers.produce(new CallbackArgumentBuildItem(new CloseReasonCallbackArgument()));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.websockets.next.CloseReason;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketException;
Expand All @@ -30,7 +29,7 @@ void testInvalidArgument() {
public static class Endpoint {

@OnOpen
void open(List<String> unsupported) {
void open(CloseReason unsupported) {
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.quarkus.websockets.next.test.closereason;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.BasicWebSocketConnector;
import io.quarkus.websockets.next.CloseReason;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.vertx.core.Vertx;

public class ClientCloseReasonTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Closing.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("closing")
URI closingUri;

@Test
public void testClosed() throws InterruptedException {
CountDownLatch closedClientLatch = new CountDownLatch(1);
AtomicReference<Integer> closeStatusCode = new AtomicReference<>();
AtomicReference<String> closeMessage = new AtomicReference<>();
WebSocketClientConnection connection = BasicWebSocketConnector
.create()
.baseUri(closingUri)
.onClose((c, cr) -> {
closeStatusCode.set((int) cr.getCode());
closeMessage.set(cr.getMessage());
closedClientLatch.countDown();
})
.connectAndAwait();
connection.closeAndAwait(new CloseReason(4001, "foo"));
assertTrue(Closing.CLOSED.await(5, TimeUnit.SECONDS));
assertTrue(closedClientLatch.await(5, TimeUnit.SECONDS));
assertEquals(4001, closeStatusCode.get());
assertEquals("foo", closeMessage.get());
}

@WebSocket(path = "/closing")
public static class Closing {

static final CountDownLatch CLOSED = new CountDownLatch(1);

@OnOpen
public String open() {
return "ready";
}

@OnClose
void onClose(CloseReason reason) {
assertEquals(4001, reason.getCode());
assertEquals("foo", reason.getMessage());
CLOSED.countDown();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.quarkus.websockets.next.test.closereason;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.BasicWebSocketConnector;
import io.quarkus.websockets.next.CloseReason;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnection;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;

public class ServerCloseReasonTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Closing.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("closing")
URI closingUri;

@Test
public void testClosed() throws InterruptedException {
CountDownLatch closedClientLatch = new CountDownLatch(1);
AtomicReference<Integer> closeStatusCode = new AtomicReference<>();
AtomicReference<String> closeMessage = new AtomicReference<>();
WebSocketClientConnection connection = BasicWebSocketConnector
.create()
.baseUri(closingUri)
.onClose((c, cr) -> {
closeStatusCode.set((int) cr.getCode());
closeMessage.set(cr.getMessage());
closedClientLatch.countDown();
})
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertTrue(Closing.CLOSED.await(5, TimeUnit.SECONDS));
assertTrue(closedClientLatch.await(5, TimeUnit.SECONDS));
assertEquals(4001, closeStatusCode.get());
assertEquals("foo", closeMessage.get());
}

@WebSocket(path = "/closing")
public static class Closing {

static final CountDownLatch CLOSED = new CountDownLatch(1);

@OnTextMessage
public Uni<Void> onMessage(String message, WebSocketConnection connection) {
return connection.close(new CloseReason(4001, message));
}

@OnClose
void onClose(CloseReason reason) {
assertEquals(4001, reason.getCode());
assertEquals("foo", reason.getMessage());
CLOSED.countDown();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ static BasicWebSocketConnector create() {
* @return self
* @see #executionModel(ExecutionModel)
*/
BasicWebSocketConnector onClose(BiConsumer<WebSocketClientConnection, Short> consumer);
BasicWebSocketConnector onClose(BiConsumer<WebSocketClientConnection, CloseReason> consumer);

/**
* Set a callback to be invoked when an error occurs.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.websockets.next;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;

/**
* Indicates a reason for closing a connection. See also RFC-6455
* <a href="https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1">section 5.5.1</a>. The pre-defined status codes are
* listed in <a href="https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1">section 7.4.1</a>.
*
* @see WebSocketCloseStatus
* @see WebSocketConnection#close(CloseReason)
* @see WebSocketClientConnection#close(CloseReason)
*/
public class CloseReason {

public static final CloseReason NORMAL = new CloseReason(WebSocketCloseStatus.NORMAL_CLOSURE.code());

private final int code;

private final String message;

/**
*
* @param code The status code must comply with RFC-6455
*/
public CloseReason(int code) {
this(code, null);
}

/**
*
* @param code The status code must comply with RFC-6455
* @param message
*/
public CloseReason(int code, String message) {
if (!WebSocketCloseStatus.isValidStatusCode(code)) {
throw new IllegalArgumentException("Invalid status code: " + code);
}
this.code = code;
this.message = message;
}

public int getCode() {
return code;
}

public String getMessage() {
return message;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* <li>{@link WebSocketConnection}/{@link WebSocketClientConnection}; depending on the endpoint type</li>
* <li>{@link HandshakeRequest}</li>
* <li>{@link String} parameters annotated with {@link PathParam}</li>
* <li>{@link CloseReason}</li>
* </ul>
* Note that it's not possible to send a message to the current connection as the socket is already closed when the method
* invoked. However, it is possible to send messages to other open connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,32 @@ default boolean isOpen() {
* @return a new {@link Uni} with a {@code null} item
*/
@CheckReturnValue
Uni<Void> close();
default Uni<Void> close() {
return close(CloseReason.NORMAL);
}

/**
* Close the connection.
* Close the connection with a specific reason.
*
* @param reason
* @return a new {@link Uni} with a {@code null} item
*/
Uni<Void> close(CloseReason reason);

/**
* Close the connection and wait for the completion.
*/
default void closeAndAwait() {
close().await().indefinitely();
}

/**
* Close the connection with a specific reason and wait for the completion.
*/
default void closeAndAwait(CloseReason reason) {
close(reason).await().indefinitely();
}

/**
*
* @return the handshake request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,32 @@ default boolean isOpen() {
* @return a new {@link Uni} with a {@code null} item
*/
@CheckReturnValue
Uni<Void> close();
default Uni<Void> close() {
return close(CloseReason.NORMAL);
}

/**
* Close the connection.
* Close the connection with a specific reason.
*
* @param reason
* @return a new {@link Uni} with a {@code null} item
*/
Uni<Void> close(CloseReason reason);

/**
* Close the connection and wait for the completion.
*/
default void closeAndAwait() {
close().await().indefinitely();
}

/**
* Close the connection and wait for the completion.
*/
default void closeAndAwait(CloseReason reason) {
close(reason).await().indefinitely();
}

/**
*
* @return the handshake request
Expand Down
Loading

0 comments on commit 33db951

Please sign in to comment.