Skip to content

Commit

Permalink
refactor(mockwebserver): DefaultMockServerWebSocketTest uses Vert.x t…
Browse files Browse the repository at this point in the history
…o perform verifications

Signed-off-by: Marc Nuri <marc@marcnuri.com>
  • Loading branch information
manusa committed Dec 20, 2023
1 parent 64735c1 commit a5d5049
Showing 1 changed file with 143 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,153 +15,192 @@
*/
package io.fabric8.mockwebserver

import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import io.vertx.core.AsyncResult
import io.vertx.core.Handler
import io.vertx.core.Vertx
import io.vertx.core.http.HttpClient
import io.vertx.core.http.WebSocket
import io.vertx.core.http.WebSocketConnectOptions
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.AsyncConditions

import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import java.util.stream.IntStream

class DefaultMockServerWebSocketTest extends Specification {

@Shared
static def vertx = Vertx.vertx()

DefaultMockServer server

@Shared
OkHttpClient client = new OkHttpClient()
HttpClient httpClient

def setup() {
server = new DefaultMockServer()
server.start()
httpClient = vertx.createHttpClient()
}

def cleanup() {
server.shutdown()
httpClient.close()
}

def "andUpgradeToWebSocket, with configured events, should emit events"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
@Override
void onMessage(WebSocket webSocket, String text) {
future.complete(text)
given: "A WebSocket expectation"
server.expect().withPath("/websocket")
.andUpgradeToWebSocket()
.open()
.waitFor(10L).andEmit("A text message")
.done()
.always()
and:
Queue<String> messages = new ArrayBlockingQueue<>(1)
and: "A WebSocket request"
def wsReq = httpClient.webSocket(server.port, server.getHostName(), "/websocket")
and: "A WebSocket listener"
wsReq.onComplete { ws ->
ws.result().textMessageHandler { text ->
messages.add(text)
}
ws.result().closeHandler { _ ->
ws.result().close()
}
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "A text message"
cleanup:
ws.close(1000, "Test finished")
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The request is sent and completed"
async.evaluate {
assert messages.poll(10, TimeUnit.SECONDS) == "A text message"
}

then: "Expect the result to be completed in the specified time"
async.await(10)
}

def "andUpgradeToWebSocket, with configured events, should emit onClose when done"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().immediately().andEmit("event").done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
@Override
void onClosing(WebSocket webSocket, int code, String reason) {
future.complete(reason)
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().immediately().andEmit("event").done().always()
and:
def future = new CompletableFuture()
and: "A WebSocket request"
def wsReq = httpClient.webSocket(server.port, server.getHostName(), "/websocket")
and: "A WebSocket listener"
wsReq.onComplete { ws ->
ws.result().closeHandler { _ ->
ws.result().close()
future.complete(ws.result().closeReason())
}
}
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The request is sent and completed"
async.evaluate {
assert future.get(10, TimeUnit.SECONDS) == "Closing..."
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..."

then: "Expect the result to be completed in the specified time"
async.await(10)
}

def "andUpgradeToWebSocket, with no events, should emit onClose"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
@Override
void onClosing(WebSocket webSocket, int code, String reason) {
future.complete(reason)
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
and:
def future = new CompletableFuture()
and: "A WebSocket request"
def wsReq = httpClient.webSocket(server.port, server.getHostName(), "/websocket")
and: "A WebSocket listener"
wsReq.onComplete { ws ->
ws.result().closeHandler { _ ->
ws.result().close()
future.complete(ws.result().closeReason())
}
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..."
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The request is sent and completed"
async.evaluate {
assert future.get(10, TimeUnit.SECONDS) == "Closing..."
}

then: "Expect the result to be completed in the specified time"
async.await(10)
}

// https://github.com/fabric8io/mockwebserver/pull/66#issuecomment-944289335
def "andUpgradeToWebSocket, with multiple upgrades, should emit events for all websocket listeners"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always()
def latch = new CountDownLatch(15)
def wsListener = new WebSocketListener() {
@Override
void onMessage(WebSocket webSocket, String text) {
latch.countDown()
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always()
and: "A CountDown latch to verify the event count"
def latch = new CountDownLatch(15)
and: "A Vert.x WebSocket completion handler"
Handler<AsyncResult<WebSocket>> completionHandler = { ws ->
ws.result().textMessageHandler { text ->
latch.countDown()
}
ws.result().closeHandler { _ ->
ws.result().close()
}
}
and: "WebSocket requests"
IntStream.range(0, 15).forEach {i ->
def wsReq = httpClient.webSocket(server.port, server.getHostName(), "/websocket")
wsReq.onComplete(completionHandler)
}
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The requests are sent and completed"
async.evaluate {
assert latch.await(10, TimeUnit.SECONDS)
}
}
when:
def wss = IntStream.range(0, 15).mapToObj(i ->
client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), wsListener)
).collect(Collectors.toList())
then:
assert latch.await(10000L, TimeUnit.MILLISECONDS)
cleanup:
wss.forEach(ws -> ws.close(1000, "Test finished"))

then: "Expect the result to be completed in the specified time"
async.await(10)
}

// https://github.com/fabric8io/mockwebserver/issues/77
def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should create response with matching header"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() {
@Override
void onOpen(WebSocket webSocket, Response response) {
future.complete(response.header("sec-websocket-protocol"))
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
and:
def future = new CompletableFuture()
and: "A WebSocket request"
def wsReq = httpClient.webSocket(new WebSocketConnectOptions()
.setPort(server.port)
.setHost(server.getHostName())
.setURI("/websocket")
.addSubProtocol("v4.channel.k8s.io"))
and: "A WebSocket listener"
wsReq.onComplete { ws ->
future.complete(ws.result().headers().get("sec-websocket-protocol"))
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "v4.channel.k8s.io"
cleanup:
ws.close(1000, "Test finished")
}
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

// https://github.com/fabric8io/mockwebserver/issues/77
def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should not change existing response header"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket()
.open()
.done()
.withHeader("sec-websocket-protocol", "v3.channel.k8s.io,v4.channel.k8s.io")
.always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() {
@Override
void onOpen(WebSocket webSocket, Response response) {
future.complete(response.header("sec-websocket-protocol"))
when: "The request is sent and completed"
async.evaluate {
assert future.get(10, TimeUnit.SECONDS) == "v4.channel.k8s.io"
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "v3.channel.k8s.io,v4.channel.k8s.io"
cleanup:
ws.close(1000, "Test finished")

then: "Expect the result to be completed in the specified time"
async.await(10)
}
}

0 comments on commit a5d5049

Please sign in to comment.