Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(mockwebserver): DefaultMockServerWebSocketTest uses Vert.x to perform verifications #5663

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,153 +15,192 @@
*/
package io.fabric8.mockwebserver

import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import io.vertx.core.AsyncResult
import io.vertx.core.Handler
import io.vertx.core.Vertx
import io.vertx.core.http.WebSocket
import io.vertx.core.http.WebSocketClient
import io.vertx.core.http.WebSocketConnectOptions
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.AsyncConditions

import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import java.util.stream.IntStream

class DefaultMockServerWebSocketTest extends Specification {

@Shared
static def vertx = Vertx.vertx()

DefaultMockServer server

@Shared
OkHttpClient client = new OkHttpClient()
WebSocketClient wsClient

def setup() {
server = new DefaultMockServer()
server.start()
wsClient = vertx.createWebSocketClient()
}

def cleanup() {
server.shutdown()
wsClient.close()
}

def "andUpgradeToWebSocket, with configured events, should emit events"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
@Override
void onMessage(WebSocket webSocket, String text) {
future.complete(text)
given: "A WebSocket expectation"
server.expect().withPath("/websocket")
.andUpgradeToWebSocket()
.open()
.waitFor(10L).andEmit("A text message")
.done()
.always()
and:
Queue<String> messages = new ArrayBlockingQueue<>(1)
and: "A WebSocket request"
def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket")
and: "A WebSocket listener"
wsReq.onComplete { ws ->
ws.result().textMessageHandler { text ->
messages.add(text)
}
ws.result().closeHandler { _ ->
ws.result().close()
}
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "A text message"
cleanup:
ws.close(1000, "Test finished")
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The request is sent and completed"
async.evaluate {
assert messages.poll(10, TimeUnit.SECONDS) == "A text message"
}

then: "Expect the result to be completed in the specified time"
async.await(10)
}

def "andUpgradeToWebSocket, with configured events, should emit onClose when done"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().immediately().andEmit("event").done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
@Override
void onClosing(WebSocket webSocket, int code, String reason) {
future.complete(reason)
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().immediately().andEmit("event").done().always()
and:
def future = new CompletableFuture()
and: "A WebSocket request"
def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket")
and: "A WebSocket listener"
wsReq.onComplete { ws ->
ws.result().closeHandler { _ ->
ws.result().close()
future.complete(ws.result().closeReason())
}
}
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The request is sent and completed"
async.evaluate {
assert future.get(10, TimeUnit.SECONDS) == "Closing..."
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..."

then: "Expect the result to be completed in the specified time"
async.await(10)
}

def "andUpgradeToWebSocket, with no events, should emit onClose"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
@Override
void onClosing(WebSocket webSocket, int code, String reason) {
future.complete(reason)
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
and:
def future = new CompletableFuture()
and: "A WebSocket request"
def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket")
and: "A WebSocket listener"
wsReq.onComplete { ws ->
ws.result().closeHandler { _ ->
ws.result().close()
future.complete(ws.result().closeReason())
}
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..."
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The request is sent and completed"
async.evaluate {
assert future.get(10, TimeUnit.SECONDS) == "Closing..."
}

then: "Expect the result to be completed in the specified time"
async.await(10)
}

// https://github.com/fabric8io/mockwebserver/pull/66#issuecomment-944289335
def "andUpgradeToWebSocket, with multiple upgrades, should emit events for all websocket listeners"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always()
def latch = new CountDownLatch(15)
def wsListener = new WebSocketListener() {
@Override
void onMessage(WebSocket webSocket, String text) {
latch.countDown()
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always()
and: "A CountDown latch to verify the event count"
def latch = new CountDownLatch(15)
and: "A Vert.x WebSocket completion handler"
Handler<AsyncResult<WebSocket>> completionHandler = { ws ->
ws.result().textMessageHandler { text ->
latch.countDown()
}
ws.result().closeHandler { _ ->
ws.result().close()
}
}
and: "WebSocket requests"
IntStream.range(0, 15).forEach {i ->
def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket")
wsReq.onComplete(completionHandler)
}
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

when: "The requests are sent and completed"
async.evaluate {
assert latch.await(10, TimeUnit.SECONDS)
}
}
when:
def wss = IntStream.range(0, 15).mapToObj(i ->
client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), wsListener)
).collect(Collectors.toList())
then:
assert latch.await(10000L, TimeUnit.MILLISECONDS)
cleanup:
wss.forEach(ws -> ws.close(1000, "Test finished"))

then: "Expect the result to be completed in the specified time"
async.await(10)
}

// https://github.com/fabric8io/mockwebserver/issues/77
def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should create response with matching header"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() {
@Override
void onOpen(WebSocket webSocket, Response response) {
future.complete(response.header("sec-websocket-protocol"))
given: "A WebSocket expectation"
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().done().always()
and:
def future = new CompletableFuture()
and: "A WebSocket request"
def wsReq = wsClient.webSocket().connect(new WebSocketConnectOptions()
.setPort(server.port)
.setHost(server.getHostName())
.setURI("/websocket")
.addSubProtocol("v4.channel.k8s.io"))
and: "A WebSocket listener"
wsReq.onComplete { ws ->
future.complete(ws.result().headers().get("sec-websocket-protocol"))
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "v4.channel.k8s.io"
cleanup:
ws.close(1000, "Test finished")
}
and: "An instance of AsyncConditions"
def async = new AsyncConditions(1)

// https://github.com/fabric8io/mockwebserver/issues/77
def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should not change existing response header"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket()
.open()
.done()
.withHeader("sec-websocket-protocol", "v3.channel.k8s.io,v4.channel.k8s.io")
.always()
def future = new CompletableFuture()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() {
@Override
void onOpen(WebSocket webSocket, Response response) {
future.complete(response.header("sec-websocket-protocol"))
when: "The request is sent and completed"
async.evaluate {
assert future.get(10, TimeUnit.SECONDS) == "v4.channel.k8s.io"
}
})
then:
assert future.get(100L, TimeUnit.MILLISECONDS) == "v3.channel.k8s.io,v4.channel.k8s.io"
cleanup:
ws.close(1000, "Test finished")

then: "Expect the result to be completed in the specified time"
async.await(10)
}
}
Loading