diff --git a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy index 6433be20d2b..ac596b9cefe 100644 --- a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy +++ b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy @@ -15,153 +15,192 @@ */ package io.fabric8.mockwebserver -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.Response -import okhttp3.WebSocket -import okhttp3.WebSocketListener +import io.vertx.core.AsyncResult +import io.vertx.core.Handler +import io.vertx.core.Vertx +import io.vertx.core.http.WebSocket +import io.vertx.core.http.WebSocketClient +import io.vertx.core.http.WebSocketConnectOptions import spock.lang.Shared import spock.lang.Specification +import spock.util.concurrent.AsyncConditions +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.CompletableFuture import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import java.util.stream.Collectors import java.util.stream.IntStream class DefaultMockServerWebSocketTest extends Specification { + @Shared + static def vertx = Vertx.vertx() + DefaultMockServer server - @Shared - OkHttpClient client = new OkHttpClient() + WebSocketClient wsClient def setup() { server = new DefaultMockServer() server.start() + wsClient = vertx.createWebSocketClient() } def cleanup() { server.shutdown() + wsClient.close() } def "andUpgradeToWebSocket, with configured events, should emit events"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() { - @Override - void onMessage(WebSocket webSocket, String text) { - future.complete(text) + given: "A WebSocket expectation" + server.expect().withPath("/websocket") + .andUpgradeToWebSocket() + .open() + .waitFor(10L).andEmit("A text message") + .done() + .always() + and: + Queue messages = new ArrayBlockingQueue<>(1) + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().textMessageHandler { text -> + messages.add(text) + } + ws.result().closeHandler { _ -> + ws.result().close() + } } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "A text message" - cleanup: - ws.close(1000, "Test finished") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert messages.poll(10, TimeUnit.SECONDS) == "A text message" + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } def "andUpgradeToWebSocket, with configured events, should emit onClose when done"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().immediately().andEmit("event").done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() { - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - future.complete(reason) + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().immediately().andEmit("event").done().always() + and: + def future = new CompletableFuture() + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().closeHandler { _ -> + ws.result().close() + future.complete(ws.result().closeReason()) + } + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert future.get(10, TimeUnit.SECONDS) == "Closing..." } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..." + + then: "Expect the result to be completed in the specified time" + async.await(10) } def "andUpgradeToWebSocket, with no events, should emit onClose"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() { - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - future.complete(reason) + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().done().always() + and: + def future = new CompletableFuture() + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().closeHandler { _ -> + ws.result().close() + future.complete(ws.result().closeReason()) + } } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..." + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert future.get(10, TimeUnit.SECONDS) == "Closing..." + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } // https://github.com/fabric8io/mockwebserver/pull/66#issuecomment-944289335 def "andUpgradeToWebSocket, with multiple upgrades, should emit events for all websocket listeners"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always() - def latch = new CountDownLatch(15) - def wsListener = new WebSocketListener() { - @Override - void onMessage(WebSocket webSocket, String text) { - latch.countDown() + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always() + and: "A CountDown latch to verify the event count" + def latch = new CountDownLatch(15) + and: "A Vert.x WebSocket completion handler" + Handler> completionHandler = { ws -> + ws.result().textMessageHandler { text -> + latch.countDown() + } + ws.result().closeHandler { _ -> + ws.result().close() + } + } + and: "WebSocket requests" + IntStream.range(0, 15).forEach {i -> + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + wsReq.onComplete(completionHandler) + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + async.evaluate { + assert latch.await(10, TimeUnit.SECONDS) } - } - when: - def wss = IntStream.range(0, 15).mapToObj(i -> - client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), wsListener) - ).collect(Collectors.toList()) - then: - assert latch.await(10000L, TimeUnit.MILLISECONDS) - cleanup: - wss.forEach(ws -> ws.close(1000, "Test finished")) + + then: "Expect the result to be completed in the specified time" + async.await(10) } // https://github.com/fabric8io/mockwebserver/issues/77 def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should create response with matching header"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - future.complete(response.header("sec-websocket-protocol")) + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().done().always() + and: + def future = new CompletableFuture() + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(new WebSocketConnectOptions() + .setPort(server.port) + .setHost(server.getHostName()) + .setURI("/websocket") + .addSubProtocol("v4.channel.k8s.io")) + and: "A WebSocket listener" + wsReq.onComplete { ws -> + future.complete(ws.result().headers().get("sec-websocket-protocol")) } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "v4.channel.k8s.io" - cleanup: - ws.close(1000, "Test finished") - } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) - // https://github.com/fabric8io/mockwebserver/issues/77 - def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should not change existing response header"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket() - .open() - .done() - .withHeader("sec-websocket-protocol", "v3.channel.k8s.io,v4.channel.k8s.io") - .always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - future.complete(response.header("sec-websocket-protocol")) + when: "The request is sent and completed" + async.evaluate { + assert future.get(10, TimeUnit.SECONDS) == "v4.channel.k8s.io" } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "v3.channel.k8s.io,v4.channel.k8s.io" - cleanup: - ws.close(1000, "Test finished") + + then: "Expect the result to be completed in the specified time" + async.await(10) } }