Skip to content

Commit

Permalink
WebSockets Next: configuration updates
Browse files Browse the repository at this point in the history
- remove unimplemented timeout
- add compression support/level
- add max message size
- also invoke error handlers for connection exceptions
- resolves quarkusio#39590
  • Loading branch information
mkouba committed Apr 19, 2024
1 parent 572f286 commit c70c45e
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnBinaryMessage;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;

public class WriteErrorClosedConnectionTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Echo.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() {
WSClient client = WSClient.create(vertx).connect(testUri);
client.sendAndAwait(Buffer.buffer("1"));
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed());
assertTrue(Echo.ERROR_HANDLER_CALLED.get());
}

@WebSocket(path = "/echo")
public static class Echo {

static final AtomicBoolean ERROR_HANDLER_CALLED = new AtomicBoolean();

@OnBinaryMessage
Uni<Buffer> process(Buffer message, WebSocketConnection connection) {
// This should result in a failure because the connection is closed
// but we still try to write a binary message
return connection.close().replaceWith(message);
}

@OnError
void runtimeProblem(Throwable t, WebSocketConnection connection) {
if (connection.isOpen()) {
throw new IllegalStateException();
}
ERROR_HANDLER_CALLED.set(true);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.quarkus.websockets.next.test.maxmessagesize;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class MaxMessageSizeTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Echo.class, WSClient.class);
}).overrideConfigKey("quarkus.websockets-next.max-message-size", "10");

@Inject
Vertx vertx;

@TestHTTPResource("/echo")
URI echoUri;

@Test
void testMaxMessageSize() {
WSClient client = WSClient.create(vertx).connect(echoUri);
String msg = "foo".repeat(10);
String reply = client.sendAndAwaitReply(msg).toString();
assertNotEquals(msg, reply);
assertTrue(Echo.ISE_THROWN.get());
}

@WebSocket(path = "/echo")
public static class Echo {

static final AtomicBoolean ISE_THROWN = new AtomicBoolean();

@OnTextMessage
String process(String message) {
return message;
}

@OnError
String onError(IllegalStateException ise) {
ISE_THROWN.set(true);
return ise.getMessage();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.enterprise.context.Destroyed;
import jakarta.enterprise.context.SessionScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -43,18 +48,26 @@ void testConnectionRejected() {
Throwable cause = e.getCause();
assertTrue(cause instanceof WebSocketClientHandshakeException);
assertFalse(Endpoint.OPEN_CALLED.get());
// Wait until the CDI singleton context is destroyed
// Otherwise the test app is shut down before the WebSocketSessionContext is ended properly
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> Endpoint.SESSION_CONTEXT_DESTROYED.get());
}

@WebSocket(path = "/endpoint")
public static class Endpoint {

static final AtomicBoolean OPEN_CALLED = new AtomicBoolean();
static final AtomicBoolean SESSION_CONTEXT_DESTROYED = new AtomicBoolean();

@OnOpen
void open() {
OPEN_CALLED.set(true);
}

static void sessionContextDestroyed(@Observes @Destroyed(SessionScoped.class) Object event) {
SESSION_CONTEXT_DESTROYED.set(true);
}

}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
package io.quarkus.websockets.next;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.vertx.core.http.HttpServerOptions;

@ConfigMapping(prefix = "quarkus.websockets-next")
@ConfigRoot(phase = ConfigPhase.RUN_TIME)
public interface WebSocketsRuntimeConfig {

/**
* See <a href="https://datatracker.ietf.org/doc/html/rfc6455#page-12">The WebSocket Protocol</a>
*
* @return the supported subprotocols
*/
Optional<List<String>> supportedSubprotocols();

/**
* TODO Not implemented yet.
*
* The default timeout to complete processing of a message.
* Compression Extensions for WebSocket are supported by default.
* <p>
* See also <a href="https://datatracker.ietf.org/doc/html/rfc7692">RFC 7692</a>
*/
Optional<Duration> timeout();
@WithDefault("true")
boolean perMessageCompressionSupported();

/**
* The compression level must be a value between 0 and 9. The default value is
* {@value HttpServerOptions#DEFAULT_WEBSOCKET_COMPRESSION_LEVEL}.
*/
OptionalInt compressionLevel();

/**
* The maximum size of a message in bytes. The default values is
* {@value HttpServerOptions#DEFAULT_MAX_WEBSOCKET_MESSAGE_SIZE}.
*/
OptionalInt maxMessageSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void endSession() {
}

ContextState currentRequestContextState() {
return requestContext.getState();
return requestContext.getStateIfActive();
}

static Context createNewDuplicatedContext(Context context, WebSocketConnection connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@ public class WebSocketHttpServerOptionsCustomizer implements HttpServerOptionsCu

@Override
public void customizeHttpServer(HttpServerOptions options) {
config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol);
customize(options);
}

@Override
public void customizeHttpsServer(HttpServerOptions options) {
customize(options);
}

private void customize(HttpServerOptions options) {
config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol);
options.setPerMessageWebSocketCompressionSupported(config.perMessageCompressionSupported());
if (config.compressionLevel().isPresent()) {
options.setWebSocketCompressionLevel(config.compressionLevel().getAsInt());
}
if (config.maxMessageSize().isPresent()) {
options.setMaxWebSocketMessageSize(config.maxMessageSize().getAsInt());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,21 @@ public void handle(Void event) {
});
}
});

ws.exceptionHandler(new Handler<Throwable>() {
@Override
public void handle(Throwable t) {
ContextSupport.createNewDuplicatedContext(context, connection).runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
endpoint.doOnError(t).subscribe().with(
v -> LOG.debugf("Error [%s] processed: %s", t.getClass(), connection),
t -> LOG.errorf(t, "Unhandled error occured: %s", t.toString(),
connection));
}
});
}
});
});
}
};
Expand Down

0 comments on commit c70c45e

Please sign in to comment.