Skip to content

Commit

Permalink
KTOR-6004 Support TCP and Unix socket for wasm-js and js via NodeJS (#…
Browse files Browse the repository at this point in the history
…4411)

* Drop `jvmAndNix` shared source set
* Commonize `ktor-network` and `ktor-network-tls`
* Support TCP and Unix sockets for wasm-js and js on Node
* Move `supportsUnixDomainSockets` to posix and use Platform instead of expect/actual
  • Loading branch information
whyoleg authored and osipxd committed Jan 9, 2025
1 parent 42954ea commit 239a7f0
Show file tree
Hide file tree
Showing 63 changed files with 785 additions and 239 deletions.
26 changes: 16 additions & 10 deletions ktor-network/api/ktor-network.klib.api
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Klib ABI Dump
// Targets: [androidNativeArm32, androidNativeArm64, androidNativeX64, androidNativeX86, iosArm64, iosSimulatorArm64, iosX64, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64]
// Targets: [androidNativeArm32, androidNativeArm64, androidNativeX64, androidNativeX86, iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64]
// Alias: native => [androidNativeArm32, androidNativeArm64, androidNativeX64, androidNativeX86, iosArm64, iosSimulatorArm64, iosX64, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
Expand Down Expand Up @@ -36,11 +37,6 @@ abstract interface <#A: out io.ktor.network.sockets/Configurable<#A, #B>, #B: io
open fun configure(kotlin/Function1<#B, kotlin/Unit>): #A // io.ktor.network.sockets/Configurable.configure|configure(kotlin.Function1<1:1,kotlin.Unit>){}[0]
}

abstract interface io.ktor.network.selector/Selectable { // io.ktor.network.selector/Selectable|null[0]
abstract val descriptor // io.ktor.network.selector/Selectable.descriptor|{}descriptor[0]
abstract fun <get-descriptor>(): kotlin/Int // io.ktor.network.selector/Selectable.descriptor.<get-descriptor>|<get-descriptor>(){}[0]
}

abstract interface io.ktor.network.selector/SelectorManager : io.ktor.utils.io.core/Closeable, kotlinx.coroutines/CoroutineScope { // io.ktor.network.selector/SelectorManager|null[0]
abstract fun notifyClosed(io.ktor.network.selector/Selectable) // io.ktor.network.selector/SelectorManager.notifyClosed|notifyClosed(io.ktor.network.selector.Selectable){}[0]
abstract suspend fun select(io.ktor.network.selector/Selectable, io.ktor.network.selector/SelectInterest) // io.ktor.network.selector/SelectorManager.select|select(io.ktor.network.selector.Selectable;io.ktor.network.selector.SelectInterest){}[0]
Expand Down Expand Up @@ -103,10 +99,6 @@ final class io.ktor.network.selector/ClosedChannelCancellationException : kotlin
constructor <init>() // io.ktor.network.selector/ClosedChannelCancellationException.<init>|<init>(){}[0]
}

final class io.ktor.network.selector/SocketError : kotlin/IllegalStateException { // io.ktor.network.selector/SocketError|null[0]
constructor <init>() // io.ktor.network.selector/SocketError.<init>|<init>(){}[0]
}

final class io.ktor.network.sockets/Connection { // io.ktor.network.sockets/Connection|null[0]
constructor <init>(io.ktor.network.sockets/Socket, io.ktor.utils.io/ByteReadChannel, io.ktor.utils.io/ByteWriteChannel) // io.ktor.network.sockets/Connection.<init>|<init>(io.ktor.network.sockets.Socket;io.ktor.utils.io.ByteReadChannel;io.ktor.utils.io.ByteWriteChannel){}[0]

Expand Down Expand Up @@ -284,3 +276,17 @@ final fun <#A: io.ktor.network.sockets/Configurable<#A, *>> (#A).io.ktor.network
final fun io.ktor.network.selector/SelectorManager(kotlin.coroutines/CoroutineContext = ...): io.ktor.network.selector/SelectorManager // io.ktor.network.selector/SelectorManager|SelectorManager(kotlin.coroutines.CoroutineContext){}[0]
final fun io.ktor.network.sockets/aSocket(io.ktor.network.selector/SelectorManager): io.ktor.network.sockets/SocketBuilder // io.ktor.network.sockets/aSocket|aSocket(io.ktor.network.selector.SelectorManager){}[0]
final suspend fun (io.ktor.network.sockets/ASocket).io.ktor.network.sockets/awaitClosed() // io.ktor.network.sockets/awaitClosed|awaitClosed@io.ktor.network.sockets.ASocket(){}[0]

// Targets: [native]
abstract interface io.ktor.network.selector/Selectable { // io.ktor.network.selector/Selectable|null[0]
abstract val descriptor // io.ktor.network.selector/Selectable.descriptor|{}descriptor[0]
abstract fun <get-descriptor>(): kotlin/Int // io.ktor.network.selector/Selectable.descriptor.<get-descriptor>|<get-descriptor>(){}[0]
}

// Targets: [native]
final class io.ktor.network.selector/SocketError : kotlin/IllegalStateException { // io.ktor.network.selector/SocketError|null[0]
constructor <init>() // io.ktor.network.selector/SocketError.<init>|<init>(){}[0]
}

// Targets: [js, wasmJs]
abstract interface io.ktor.network.selector/Selectable // io.ktor.network.selector/Selectable|null[0]
4 changes: 2 additions & 2 deletions ktor-network/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ kotlin {
}

sourceSets {
jvmAndPosixMain {
commonMain {
dependencies {
api(project(":ktor-utils"))
}
}

jvmAndPosixTest {
commonTest {
dependencies {
api(project(":ktor-test-dispatcher"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import kotlin.coroutines.*
/**
* Creates the selector manager for current platform.
*/
@Suppress("FunctionName")
public expect fun SelectorManager(
dispatcher: CoroutineContext = EmptyCoroutineContext
): SelectorManager
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets
Expand Down
32 changes: 32 additions & 0 deletions ktor-network/common/src/io/ktor/network/sockets/SocketEngine.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

import io.ktor.network.selector.*

internal expect suspend fun tcpConnect(
selector: SelectorManager,
remoteAddress: SocketAddress,
socketOptions: SocketOptions.TCPClientSocketOptions
): Socket

internal expect suspend fun tcpBind(
selector: SelectorManager,
localAddress: SocketAddress?,
socketOptions: SocketOptions.AcceptorOptions
): ServerSocket

internal expect suspend fun udpConnect(
selector: SelectorManager,
remoteAddress: SocketAddress,
localAddress: SocketAddress?,
options: SocketOptions.UDPSocketOptions
): ConnectedDatagramSocket

internal expect suspend fun udpBind(
selector: SelectorManager,
localAddress: SocketAddress?,
options: SocketOptions.UDPSocketOptions
): BoundDatagramSocket
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.ktor.network.sockets

internal const val INFINITE_TIMEOUT_MS = Long.MAX_VALUE
private const val INFINITE_TIMEOUT_MS = Long.MAX_VALUE

/**
* Socket options builder
Expand All @@ -29,7 +29,7 @@ public sealed class SocketOptions(
}
}

internal fun acceptor(): AcceptorOptions {
internal fun tcpAccept(): AcceptorOptions {
return AcceptorOptions(HashMap(customOptions)).apply {
copyCommon(this@SocketOptions)
}
Expand Down Expand Up @@ -112,7 +112,7 @@ public sealed class SocketOptions(
}
}

internal fun tcp(): TCPClientSocketOptions {
internal fun tcpConnect(): TCPClientSocketOptions {
return TCPClientSocketOptions(HashMap(customOptions)).apply {
copyCommon(this@PeerSocketOptions)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ public class TcpSocketBuilder internal constructor(
public suspend fun connect(
remoteAddress: SocketAddress,
configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): Socket = connect(selector, remoteAddress, options.tcp().apply(configure))
): Socket = tcpConnect(selector, remoteAddress, options.tcpConnect().apply(configure))

/**
* Bind server socket to listen to [localAddress].
*/
public suspend fun bind(
localAddress: SocketAddress? = null,
configure: SocketOptions.AcceptorOptions.() -> Unit = {}
): ServerSocket = bind(selector, localAddress, options.acceptor().apply(configure))
): ServerSocket = tcpBind(selector, localAddress, options.tcpAccept().apply(configure))
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class UDPSocketBuilder internal constructor(
public suspend fun bind(
localAddress: SocketAddress? = null,
configure: SocketOptions.UDPSocketOptions.() -> Unit = {}
): BoundDatagramSocket = bindUDP(selector, localAddress, options.udp().apply(configure))
): BoundDatagramSocket = udpBind(selector, localAddress, options.udp().apply(configure))

/**
* Create a datagram socket to listen datagrams at [localAddress] and set to [remoteAddress].
Expand All @@ -28,18 +28,5 @@ public class UDPSocketBuilder internal constructor(
remoteAddress: SocketAddress,
localAddress: SocketAddress? = null,
configure: SocketOptions.UDPSocketOptions.() -> Unit = {}
): ConnectedDatagramSocket = connectUDP(selector, remoteAddress, localAddress, options.udp().apply(configure))
): ConnectedDatagramSocket = udpConnect(selector, remoteAddress, localAddress, options.udp().apply(configure))
}

internal expect fun connectUDP(
selector: SelectorManager,
remoteAddress: SocketAddress,
localAddress: SocketAddress?,
options: SocketOptions.UDPSocketOptions
): ConnectedDatagramSocket

internal expect fun bindUDP(
selector: SelectorManager,
localAddress: SocketAddress?,
options: SocketOptions.UDPSocketOptions
): BoundDatagramSocket
Original file line number Diff line number Diff line change
Expand Up @@ -62,48 +62,49 @@ class TCPSocketTest {
if (!supportsUnixDomainSockets()) return@testSockets

val socketPath = createTempFilePath("ktor-echo-test")
try {
val tcp = aSocket(selector).tcp()
val server = tcp.bind(UnixSocketAddress(socketPath))

val tcp = aSocket(selector).tcp()
val server = tcp.bind(UnixSocketAddress(socketPath))
val serverConnectionPromise = async {
server.accept()
}

val serverConnectionPromise = async {
server.accept()
}
val clientConnection = tcp.connect(UnixSocketAddress(socketPath))
val serverConnection = serverConnectionPromise.await()

val clientConnection = tcp.connect(UnixSocketAddress(socketPath))
val serverConnection = serverConnectionPromise.await()
val clientOutput = clientConnection.openWriteChannel()
try {
clientOutput.writeStringUtf8("Hello, world\n")
clientOutput.flush()
} finally {
clientOutput.flushAndClose()
}

val clientOutput = clientConnection.openWriteChannel()
try {
clientOutput.writeStringUtf8("Hello, world\n")
clientOutput.flush()
} finally {
clientOutput.flushAndClose()
}
val serverInput = serverConnection.openReadChannel()
val message = serverInput.readUTF8Line()
assertEquals("Hello, world", message)

val serverInput = serverConnection.openReadChannel()
val message = serverInput.readUTF8Line()
assertEquals("Hello, world", message)
val serverOutput = serverConnection.openWriteChannel()
try {
serverOutput.writeStringUtf8("Hello From Server\n")
serverOutput.flush()

val serverOutput = serverConnection.openWriteChannel()
try {
serverOutput.writeStringUtf8("Hello From Server\n")
serverOutput.flush()
val clientInput = clientConnection.openReadChannel()
val echo = clientInput.readUTF8Line()

val clientInput = clientConnection.openReadChannel()
val echo = clientInput.readUTF8Line()
assertEquals("Hello From Server", echo)
} finally {
serverOutput.flushAndClose()
}

assertEquals("Hello From Server", echo)
serverConnection.close()
clientConnection.close()

server.close()
} finally {
serverOutput.flushAndClose()
removeFile(socketPath)
}

serverConnection.close()
clientConnection.close()

server.close()

removeFile(socketPath)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets.tests

import io.ktor.network.selector.*
import io.ktor.test.dispatcher.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.test.TestResult
import kotlinx.io.files.Path
import kotlinx.io.files.SystemFileSystem
import kotlinx.io.files.SystemTemporaryDirectory
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid

internal fun testSockets(
timeout: Duration = 1.minutes,
block: suspend CoroutineScope.(SelectorManager) -> Unit
): TestResult = runTestWithRealTime(timeout = timeout) {
SelectorManager().use { selector ->
block(selector)
}
}

internal expect fun Any.supportsUnixDomainSockets(): Boolean

@OptIn(ExperimentalUuidApi::class)
internal fun createTempFilePath(basename: String): String {
return Path(SystemTemporaryDirectory, "$basename-${Uuid.random()}").toString()
}

internal fun removeFile(path: String) {
SystemFileSystem.delete(Path(path), mustExist = false)
}
5 changes: 5 additions & 0 deletions ktor-network/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#
# Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
#
target.js.browser=false
target.wasmJs.browser=false

This file was deleted.

45 changes: 45 additions & 0 deletions ktor-network/js/src/io/ktor/network/sockets/nodejs/node.net.js.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets.nodejs

import io.ktor.network.sockets.*
import org.khronos.webgl.*

internal actual fun nodeNet(): NodeNet? = js("eval('require')('node:net')").unsafeCast<NodeNet?>()

internal actual fun TcpCreateConnectionOptions(
block: TcpCreateConnectionOptions.() -> Unit
): TcpCreateConnectionOptions = createObject(block)

internal actual fun IpcCreateConnectionOptions(
block: IpcCreateConnectionOptions.() -> Unit
): IpcCreateConnectionOptions = createObject(block)

internal actual fun CreateServerOptions(
block: CreateServerOptions.() -> Unit
): CreateServerOptions = createObject(block)

internal actual fun ServerListenOptions(
block: ServerListenOptions.() -> Unit
): ServerListenOptions = createObject(block)

private fun <T> createObject(block: T.() -> Unit): T = js("{}").unsafeCast<T>().apply(block)

internal actual fun JsError.toThrowable(): Throwable = unsafeCast<Throwable>()
internal actual fun Throwable.toJsError(): JsError? = unsafeCast<JsError>()

internal actual fun ByteArray.toJsBuffer(fromIndex: Int, toIndex: Int): JsBuffer {
return unsafeCast<Int8Array>().subarray(fromIndex, toIndex).unsafeCast<JsBuffer>()
}

internal actual fun JsBuffer.toByteArray(): ByteArray {
return Int8Array(unsafeCast<ArrayBuffer>()).unsafeCast<ByteArray>()
}

internal actual fun ServerLocalAddressInfo.toSocketAddress(): SocketAddress {
if (jsTypeOf(this) == "string") return UnixSocketAddress(unsafeCast<String>())
val info = unsafeCast<TcpServerLocalAddressInfo>()
return InetSocketAddress(info.address, info.port)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector

public actual interface Selectable
Loading

0 comments on commit 239a7f0

Please sign in to comment.