Skip to content

Commit

Permalink
Introduce type-safe handlers for on events
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Oct 29, 2024
1 parent 99dbdeb commit a73601b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.io.*
import kotlin.coroutines.*
import io.ktor.network.sockets.nodejs.Socket as NodejsSocket

internal class ServerSocketContext(
private val server: Server,
Expand All @@ -27,31 +26,28 @@ internal class ServerSocketContext(
incomingSockets.cancel()
}

server.on("connection", fun(socket: NodejsSocket) {
server.onConnection { socket ->
val context = SocketContext(socket, localAddress, serverContext)
context.initiate(null)
incomingSockets.trySend(context.createSocket())
})
server.on("close", fun() {
}
server.onClose {
if (cont.isActive) {
cont.resumeWithException(IOException("Failed to bind"))
} else {
serverContext.job.cancel("Server closed")
}
})
server.on("error", fun(error: JsError) {
}
server.onError { error ->
if (cont.isActive) {
cont.resumeWithException(IOException("Failed to bind", error.toThrowable()))
} else {
serverContext.job.cancel("Server failed", error.toThrowable())
}
})
server.on("drop", fun(_: ServerConnectionDrop) {
// TODO: handle drop?
})
server.on("listening", fun() {
}
server.onListening {
cont.resume(ServerSocketImpl(server.address()!!.toSocketAddress(), serverContext, incomingSockets, server))
})
}
server.listen(ServerListenOptions(localAddress))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,32 @@ internal class SocketContext(
socketContext.cancel()
incomingFrames.cancel()
}
socket.on("error", fun(error: JsError) {
socket.onError { error ->
when (connectCont?.isActive) {
true -> connectCont.resumeWithException(IOException("Failed to connect", error.toThrowable()))
else -> socketContext.job.cancel("Socket error", error.toThrowable())
}
})
socket.on("timeout", fun() {
}
socket.onTimeout {
when (connectCont?.isActive) {
true -> connectCont.resumeWithException(SocketTimeoutException("timeout"))
else -> socketContext.job.cancel("Socket timeout", SocketTimeoutException("timeout"))
}
})
socket.on("end", fun() {
}
socket.onEnd {
incomingFrames.close()
})
socket.on("close", fun(_: Boolean) {
}
socket.onClose {
socketContext.job.cancel("Socket closed")
})
socket.on("data", fun(data: JsBuffer) {
}
socket.onData { data ->
incomingFrames.trySend(data)
})
}

if (connectCont != null) {
socket.on("connect", fun() {
socket.onConnect {
connectCont.resume(createSocket())
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,18 @@ internal external interface Socket {
fun end()

fun on(event: String /* "close" */, listener: (hadError: Boolean) -> Unit)
fun on(event: String /* "connect", "drain", "end", "timeout", */, listener: () -> Unit)
fun on(event: String /* "connect", "end", "timeout", */, listener: () -> Unit)
fun on(event: String /* "data" */, listener: (data: JsBuffer) -> Unit)
fun on(event: String /* "error" */, listener: (error: JsError) -> Unit)
}

internal fun Socket.onClose(block: (hadError: Boolean) -> Unit): Unit = on("close", block)
internal fun Socket.onConnect(block: () -> Unit): Unit = on("connect", block)
internal fun Socket.onEnd(block: () -> Unit): Unit = on("end", block)
internal fun Socket.onTimeout(block: () -> Unit): Unit = on("timeout", block)
internal fun Socket.onData(block: (data: JsBuffer) -> Unit): Unit = on("data", block)
internal fun Socket.onError(block: (error: JsError) -> Unit): Unit = on("error", block)

internal expect fun CreateServerOptions(
block: CreateServerOptions.() -> Unit
): CreateServerOptions
Expand All @@ -124,10 +131,12 @@ internal external interface Server {
fun on(event: String /* "close", "listening" */, listener: () -> Unit)
fun on(event: String /* "connection" */, listener: (socket: Socket) -> Unit)
fun on(event: String /* "error" */, listener: (error: JsError) -> Unit)
fun on(event: String /* "drop" */, listener: (drop: ServerConnectionDrop) -> Unit)
}

internal external interface ServerConnectionDrop
internal fun Server.onClose(block: () -> Unit): Unit = on("close", block)
internal fun Server.onListening(block: () -> Unit): Unit = on("listening", block)
internal fun Server.onConnection(block: (socket: Socket) -> Unit): Unit = on("connection", block)
internal fun Server.onError(block: (error: JsError) -> Unit): Unit = on("error", block)

internal fun ServerListenOptions(localAddress: SocketAddress?): ServerListenOptions = ServerListenOptions {
when (localAddress) {
Expand Down

0 comments on commit a73601b

Please sign in to comment.