From 5a190b3628b3ce47e9be666c3d078574bd8fc4b9 Mon Sep 17 00:00:00 2001 From: Jason Ernst Date: Fri, 13 Dec 2024 14:06:12 -0800 Subject: [PATCH 1/4] channels instead of sockets --- .../jasonernst/kanonproxy/AndroidClient.kt | 7 +++-- client/build.gradle.kts | 1 + .../com/jasonernst/kanonproxy/LinuxClient.kt | 27 ++++++++++++------- .../com/jasonernst/kanonproxy/KAnonProxy.kt | 1 + .../com/jasonernst/kanonproxy/Server.kt | 7 +++-- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/android/src/main/kotlin/com/jasonernst/kanonproxy/AndroidClient.kt b/android/src/main/kotlin/com/jasonernst/kanonproxy/AndroidClient.kt index 5eb7fa9..73bb943 100644 --- a/android/src/main/kotlin/com/jasonernst/kanonproxy/AndroidClient.kt +++ b/android/src/main/kotlin/com/jasonernst/kanonproxy/AndroidClient.kt @@ -6,15 +6,15 @@ import android.os.ParcelFileDescriptor.AutoCloseOutputStream import com.jasonernst.packetdumper.AbstractPacketDumper import com.jasonernst.packetdumper.DummyPacketDumper import java.net.InetAddress -import java.net.InetSocketAddress +import java.nio.channels.DatagramChannel class AndroidClient( - socketAddress: InetSocketAddress = InetSocketAddress("127.0.0.1", 8080), + datagramChannel: DatagramChannel, packetDumper: AbstractPacketDumper = DummyPacketDumper, vpnFileDescriptor: ParcelFileDescriptor, onlyDestinations: List = emptyList(), onlyProtocols: List = emptyList() -) : Client(socketAddress, packetDumper, onlyDestinations, onlyProtocols) { +) : Client(datagramChannel, packetDumper, onlyDestinations, onlyProtocols) { private val inputStream = AutoCloseInputStream(vpnFileDescriptor) private val outputStream = AutoCloseOutputStream(vpnFileDescriptor) @@ -27,5 +27,4 @@ class AndroidClient( outputStream.write(writeBytes) outputStream.flush() } - } \ No newline at end of file diff --git a/client/build.gradle.kts b/client/build.gradle.kts index d1651f7..827d9f3 100644 --- a/client/build.gradle.kts +++ b/client/build.gradle.kts @@ -34,6 +34,7 @@ jacoco { } dependencies { + implementation(project(":core")) // only really for the DEFAULT_PORT implementation(libs.jna) implementation(libs.jnr.enxio) implementation(libs.knet) diff --git a/client/src/main/kotlin/com/jasonernst/kanonproxy/LinuxClient.kt b/client/src/main/kotlin/com/jasonernst/kanonproxy/LinuxClient.kt index 41bf74b..d2bc790 100644 --- a/client/src/main/kotlin/com/jasonernst/kanonproxy/LinuxClient.kt +++ b/client/src/main/kotlin/com/jasonernst/kanonproxy/LinuxClient.kt @@ -1,16 +1,18 @@ package com.jasonernst.kanonproxy +import com.jasonernst.kanonproxy.KAnonProxy.Companion.DEFAULT_PORT import com.jasonernst.kanonproxy.tuntap.TunTapDevice import com.jasonernst.packetdumper.AbstractPacketDumper import com.jasonernst.packetdumper.DummyPacketDumper import com.jasonernst.packetdumper.serverdumper.PcapNgTcpServerPacketDumper import sun.misc.Signal import java.net.InetSocketAddress +import java.nio.channels.DatagramChannel class LinuxClient( - socketAddress: InetSocketAddress = InetSocketAddress("127.0.0.1", 8080), + datagramChannel: DatagramChannel, packetDumper: AbstractPacketDumper = DummyPacketDumper, -) : Client(socketAddress, packetDumper) { +) : Client(datagramChannel, packetDumper) { private val tunTapDevice = TunTapDevice() init { @@ -22,10 +24,14 @@ class LinuxClient( fun main(args: Array) { val packetDumper = PcapNgTcpServerPacketDumper() packetDumper.start() + val client = if (args.isEmpty()) { - println("Using default server: 127.0.0.1 8080") - LinuxClient(packetDumper = packetDumper) + println("Using default server: 127.0.0.1 $DEFAULT_PORT") + val datagramChannel = DatagramChannel.open() + datagramChannel.configureBlocking(false) + datagramChannel.connect(InetSocketAddress("127.0.0.1", DEFAULT_PORT)) + LinuxClient(datagramChannel = datagramChannel, packetDumper = packetDumper) } else { if (args.size != 2) { println("Usage: Client ") @@ -33,12 +39,15 @@ class LinuxClient( } val server = args[0] val port = args[1].toInt() - LinuxClient(socketAddress = InetSocketAddress(server, port), packetDumper = packetDumper) + val datagramChannel = DatagramChannel.open() + datagramChannel.configureBlocking(false) + datagramChannel.connect(InetSocketAddress(server, port)) + LinuxClient(datagramChannel = datagramChannel, packetDumper = packetDumper) } - client.connect() + client.start() Signal.handle(Signal("INT")) { - client.close() + client.stop() packetDumper.stop() } @@ -55,8 +64,8 @@ class LinuxClient( tunTapDevice.write(writeBytes) } - override fun close() { + override fun stop() { tunTapDevice.close() - super.close() + super.stop() } } diff --git a/core/src/main/kotlin/com/jasonernst/kanonproxy/KAnonProxy.kt b/core/src/main/kotlin/com/jasonernst/kanonproxy/KAnonProxy.kt index 787826e..ada7a9e 100644 --- a/core/src/main/kotlin/com/jasonernst/kanonproxy/KAnonProxy.kt +++ b/core/src/main/kotlin/com/jasonernst/kanonproxy/KAnonProxy.kt @@ -65,6 +65,7 @@ class KAnonProxy( companion object { const val STALE_SESSION_MS = 5000L + const val DEFAULT_PORT = 8080 } fun start() { diff --git a/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt b/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt index 9c6cb02..da3a34e 100644 --- a/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt +++ b/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt @@ -6,7 +6,6 @@ import com.jasonernst.knet.Packet import com.jasonernst.packetdumper.AbstractPacketDumper import com.jasonernst.packetdumper.DummyPacketDumper import com.jasonernst.packetdumper.serverdumper.PcapNgTcpServerPacketDumper -import com.jasonernst.packetdumper.serverdumper.PcapNgTcpServerPacketDumper.Companion.DEFAULT_PORT import kotlinx.coroutines.CompletableJob import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -24,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean class Server( icmp: Icmp, - private val port: Int = 8080, + private val port: Int = KAnonProxy.DEFAULT_PORT, private val packetDumper: AbstractPacketDumper = DummyPacketDumper, protector: VpnProtector = DummyProtector, ) : ProxySessionManager { @@ -44,10 +43,10 @@ class Server( @JvmStatic fun main(args: Array) { // listen on one port higher so we don't conflict with the client - val packetDumper = PcapNgTcpServerPacketDumper(listenPort = DEFAULT_PORT + 1) + val packetDumper = PcapNgTcpServerPacketDumper(listenPort = PcapNgTcpServerPacketDumper.DEFAULT_PORT + 1) val server = if (args.isEmpty()) { - println("Using default port: 8080") + println("Using default port: ${KAnonProxy.DEFAULT_PORT}") Server(IcmpLinux) } else { if (args.size != 1) { From df40008e34c43d5e789677ef4f92f87a7f33083f Mon Sep 17 00:00:00 2001 From: Jason Ernst Date: Fri, 13 Dec 2024 14:06:50 -0800 Subject: [PATCH 2/4] non-blocking client + tun-tap device --- .../com/jasonernst/kanonproxy/Client.kt | 207 +++++++++++------- .../kanonproxy/tuntap/TunTapDevice.kt | 142 ++++++++++-- 2 files changed, 257 insertions(+), 92 deletions(-) diff --git a/client/src/main/kotlin/com/jasonernst/kanonproxy/Client.kt b/client/src/main/kotlin/com/jasonernst/kanonproxy/Client.kt index d80768f..a6c2d27 100644 --- a/client/src/main/kotlin/com/jasonernst/kanonproxy/Client.kt +++ b/client/src/main/kotlin/com/jasonernst/kanonproxy/Client.kt @@ -1,6 +1,7 @@ package com.jasonernst.kanonproxy -import com.jasonernst.knet.Packet +import com.jasonernst.kanonproxy.ChangeRequest.Companion.CHANGE_OPS +import com.jasonernst.kanonproxy.ChangeRequest.Companion.REGISTER import com.jasonernst.knet.Packet.Companion.parseStream import com.jasonernst.knet.SentinelPacket import com.jasonernst.packetdumper.AbstractPacketDumper @@ -13,71 +14,74 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.slf4j.LoggerFactory -import java.net.DatagramPacket -import java.net.DatagramSocket import java.net.InetAddress -import java.net.InetSocketAddress import java.nio.ByteBuffer +import java.nio.channels.DatagramChannel +import java.nio.channels.SelectionKey.OP_READ +import java.nio.channels.SelectionKey.OP_WRITE +import java.nio.channels.Selector +import java.util.LinkedList +import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicBoolean import kotlin.math.min /** * Abstract client that can support Linux, Android, etc implementations that are specific to their * tun/tap device. + * + * @param datagramChannel - A datagram channel which has already been set into non-blocking mode + * and connected to the server (ie, just have the server destination addressed associated with + * the channel since UDP sockets can't "connect"). */ abstract class Client( - private val socketAddress: InetSocketAddress = InetSocketAddress("127.0.0.1", 8080), + private val datagramChannel: DatagramChannel, private val packetDumper: AbstractPacketDumper = DummyPacketDumper, private val onlyDestinations: List = emptyList(), private val onlyProtocols: List = emptyList(), ) { private val logger = LoggerFactory.getLogger(javaClass) - private val socket = DatagramSocket() - private val isConnected = AtomicBoolean(false) + private lateinit var selector: Selector + private lateinit var selectorJob: CompletableJob + private lateinit var selectorScope: CoroutineScope + private val outgoingQueue = LinkedBlockingDeque() // queue of data for the server + private val changeRequests = LinkedList() + private val fromProxyStream: ByteBuffer = ByteBuffer.allocate(MAX_STREAM_BUFFER_SIZE) + + private val isRunning = AtomicBoolean(false) private lateinit var readFromTunJob: CompletableJob private lateinit var readFromTunJobScope: CoroutineScope - private lateinit var readFromProxyJob: CompletableJob - private lateinit var readFromProxyJobScope: CoroutineScope companion object { private const val MAX_STREAM_BUFFER_SIZE = 1048576 // max we can write into the stream without parsing private const val MAX_RECEIVE_BUFFER_SIZE = 1500 // max amount we can recv in one read (should be the MTU or bigger probably) } - fun connect() { - if (isConnected.get()) { - logger.debug("Client is already connected") + fun start() { + if (isRunning.get()) { + logger.warn("Already running") return } + isRunning.set(true) + selector = Selector.open() - readFromProxyJob = SupervisorJob() - readFromProxyJobScope = CoroutineScope(Dispatchers.IO + readFromProxyJob) - readFromProxyJobScope.launch { - logger.debug("Connecting to server: {}", socketAddress) - try { - socket.connect(socketAddress) - logger.debug("Connected to server: {}", socketAddress) - isConnected.set(true) - - readFromTunJob = SupervisorJob() - readFromTunJobScope = CoroutineScope(Dispatchers.IO + readFromTunJob) - readFromTunJobScope.launch { - readFromTunWriteToProxy() - } + selectorJob = SupervisorJob() + selectorScope = CoroutineScope(Dispatchers.IO + selectorJob) + selectorScope.launch { + selectorLoop() + } - readFromProxyWriteToTun() - } catch (e: Exception) { - logger.error("Failed to connect to server") - } + readFromTunJob = SupervisorJob() + readFromTunJobScope = CoroutineScope(Dispatchers.IO + readFromTunJob) + readFromTunJobScope.launch { + readFromTunWriteToProxy() } - readFromProxyJob.complete() } fun waitUntilShutdown() { // block until the read jobs are finished runBlocking { - readFromProxyJob.join() + selectorJob.join() readFromTunJob.join() } } @@ -89,47 +93,76 @@ abstract class Client( abstract fun tunWrite(writeBytes: ByteArray) - private fun readFromProxyWriteToTun() { - val buffer = ByteArray(MAX_RECEIVE_BUFFER_SIZE) - val datagram = DatagramPacket(buffer, buffer.size) - val stream = ByteBuffer.allocate(MAX_STREAM_BUFFER_SIZE) + private fun selectorLoop() { + datagramChannel.register(selector, OP_READ) + + while (isRunning.get()) { + synchronized(changeRequests) { + for (changeRequest in changeRequests) { + when (changeRequest.type) { + REGISTER -> { + logger.debug("Processing REGISTER: ${changeRequest.ops}") + changeRequest.channel.register(selector, changeRequest.ops) + } + CHANGE_OPS -> { + logger.debug("Processing CHANGE_OPS: ${changeRequest.ops}") + val key = changeRequest.channel.keyFor(selector) + key.interestOps(changeRequest.ops) + } + } + } + changeRequests.clear() + } - while (isConnected.get()) { - // logger.debug("Waiting for response from server") try { - socket.receive(datagram) + val numKeys = selector.select() + // we won't get any keys if we wakeup the selector before we select + // (ie, when we make changes to the keys or interest-ops) + if (numKeys > 0) { + val selectedKeys = selector.selectedKeys() + val keyStream = selectedKeys.parallelStream() + keyStream.forEach { + if (it.isReadable && it.isValid) { + readFromProxy() + } + if (it.isWritable && it.isValid) { + if (outgoingQueue.isNotEmpty()) { + val buffer = outgoingQueue.take() + while (buffer.hasRemaining()) { + datagramChannel.write(buffer) + } + } else { + it.interestOps(OP_READ) + } + } + } + selectedKeys.clear() + } } catch (e: Exception) { - logger.error("Error receiving from server: $e") + logger.warn("Exception on select, probably shutting down: $e") break } - stream.put(buffer, 0, datagram.length) - stream.flip() - val packets = parseStream(stream) - for (packet in packets) { - if (packet is SentinelPacket) { - logger.debug("Sentinel packet, skip") - continue - } - logger.debug("From proxy: $packet") - packetDumper.dumpBuffer(ByteBuffer.wrap(packet.toByteArray()), etherType = EtherType.DETECT) - tunWrite(packet.toByteArray()) - } } - logger.warn("No longer reading from server") + selectorJob.complete() } - private fun writePackets(packets: List) { - packets.forEach { packet -> - val buffer = packet.toByteArray() - val datagramPacket = DatagramPacket(buffer, buffer.size, socketAddress) - packetDumper.dumpBuffer(ByteBuffer.wrap(buffer), etherType = EtherType.DETECT) - try { - socket.send(datagramPacket) - } catch (e: Exception) { - logger.warn("IO error writing to proxy, probably shutting down") - return@forEach + private fun readFromProxy() { + val recvBuffer = ByteBuffer.allocate(MAX_RECEIVE_BUFFER_SIZE) + datagramChannel.read(recvBuffer) + recvBuffer.flip() + + fromProxyStream.put(recvBuffer) + fromProxyStream.flip() + + val packets = parseStream(fromProxyStream) + for (packet in packets) { + if (packet is SentinelPacket) { + logger.debug("Sentinel packet, skip") + continue } - // logger.debug("From OS: $packet") + logger.debug("From proxy: $packet") + packetDumper.dumpBuffer(ByteBuffer.wrap(packet.toByteArray()), etherType = EtherType.DETECT) + tunWrite(packet.toByteArray()) } } @@ -142,7 +175,7 @@ abstract class Client( logger.warn("Filters enabled, not sending all packets to proxy") } - while (isConnected.get()) { + while (isRunning.get()) { val bytesToRead = min(MAX_RECEIVE_BUFFER_SIZE, stream.remaining()) val bytesRead = try { @@ -162,49 +195,71 @@ abstract class Client( // logger.debug("After flip: position: {} remaining {}", stream.position(), stream.remaining()) val packets = parseStream(stream) + var numPackets = 0 if (filters) { - val packetsToForward: MutableList = mutableListOf() for (packet in packets) { if (onlyDestinations.isNotEmpty()) { if (packet.ipHeader?.destinationAddress in onlyDestinations) { if (onlyProtocols.isNotEmpty()) { if (packet.ipHeader?.protocol in onlyProtocols) { - packetsToForward.add(packet) + outgoingQueue.add(ByteBuffer.wrap(packet.toByteArray())) + numPackets++ // logger.debug("To proxy: $packet") } } else { - packetsToForward.add(packet) + outgoingQueue.add(ByteBuffer.wrap(packet.toByteArray())) + numPackets++ // logger.debug("To proxy: $packet") } } } else { if (onlyProtocols.isNotEmpty()) { if (packet.ipHeader?.protocol in onlyProtocols) { - packetsToForward.add(packet) + outgoingQueue.add(ByteBuffer.wrap(packet.toByteArray())) + numPackets++ // logger.debug("To proxy: $packet") } } } } - writePackets(packetsToForward) } else { - writePackets(packets) + for (packet in packets) { + outgoingQueue.add(ByteBuffer.wrap(packet.toByteArray())) + numPackets++ + } + } + if (numPackets > 0) { + logger.debug("Added packets, switching to WRITE mode") + synchronized(changeRequests) { + changeRequests.add(ChangeRequest(datagramChannel, CHANGE_OPS, OP_WRITE)) + } + selector.wakeup() } } } + logger.warn("No longer reading from TUN adapter") readFromTunJob.complete() } - open fun close() { + open fun stop() { + if (isRunning.get().not()) { + logger.warn("Trying to stop when we're not running") + return + } logger.debug("Stopping client") - isConnected.set(false) - socket.close() + isRunning.set(false) + selector.close() + try { + datagramChannel.close() + } catch (e: Exception) { + logger.warn("Error closing datagram channel: $e") + } runBlocking { logger.debug("Waiting for tun reader to stop") readFromTunJob.join() - logger.debug("Stopped, waiting for proxy reader to stop") - readFromProxyJob.join() + logger.debug("Stopped, waiting for selector job to stop") + selectorJob.join() logger.debug("Stopped") } logger.debug("Client stopped") diff --git a/client/src/main/kotlin/com/jasonernst/kanonproxy/tuntap/TunTapDevice.kt b/client/src/main/kotlin/com/jasonernst/kanonproxy/tuntap/TunTapDevice.kt index 28b663f..a6c47eb 100644 --- a/client/src/main/kotlin/com/jasonernst/kanonproxy/tuntap/TunTapDevice.kt +++ b/client/src/main/kotlin/com/jasonernst/kanonproxy/tuntap/TunTapDevice.kt @@ -1,12 +1,34 @@ package com.jasonernst.kanonproxy.tuntap +import com.jasonernst.kanonproxy.ChangeRequest +import com.jasonernst.kanonproxy.ChangeRequest.Companion.CHANGE_OPS +import com.jasonernst.kanonproxy.ChangeRequest.Companion.REGISTER import com.sun.jna.Native import com.sun.jna.NativeLong +import jnr.enxio.channels.NativeSelectorProvider import jnr.enxio.channels.NativeSocketChannel +import kotlinx.coroutines.CompletableJob +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import org.slf4j.LoggerFactory import java.nio.ByteBuffer +import java.nio.channels.SelectionKey.OP_READ +import java.nio.channels.SelectionKey.OP_WRITE +import java.nio.channels.Selector +import java.util.LinkedList +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.atomic.AtomicBoolean import kotlin.experimental.or +import kotlin.math.min +/** + * Note: with the linux tun/tap device, we could probably add the channel to the selector directly + * in the client and use a single thread, however, the Android tun/tap doesn't easily adapt to a + * SelectableChannel which is why I'm doing things this way. + */ class TunTapDevice { companion object { // the function call for configuring a TUN interface in ioctl from if_tun.h @@ -17,12 +39,26 @@ class TunTapDevice { // from if_tun.h - tells the kernel to not add packet information header before the packet private const val FLAGS_IFF_NO_PI: Short = 0x1000 + + private const val MAX_RECEIVE_BUFFER_SIZE = 1500 // max amount we can recv in one read (should be the MTU or bigger probably) } private val logger = LoggerFactory.getLogger(javaClass) private lateinit var nativeSocketChannel: NativeSocketChannel + private val isRunning = AtomicBoolean(false) + private lateinit var selector: Selector + private lateinit var selectorJob: CompletableJob + private lateinit var selectorScope: CoroutineScope + private val changeRequests = LinkedList() + private val outgoingQueue = LinkedBlockingDeque() // queue of data to be read + private val incomingQueue = LinkedBlockingDeque() // queue of data to be written + fun open() { + if (isRunning.get()) { + logger.error("Already opened") + return + } val fd = LibC.open("/dev/net/tun", O_RDWR) if (fd < 0) { throw RuntimeException("Error opening TUN/TAP device: $fd ${Native.getLastError()}") @@ -39,36 +75,110 @@ class TunTapDevice { throw RuntimeException("Error creating TUN/TAP device: $tunCreateResult ${Native.getLastError()}") } logger.debug("Created TUN/TAP device") + isRunning.set(true) nativeSocketChannel = NativeSocketChannel(fd) + + // if we don't put this into non-blocking, it gets stuck on the read even when we try to + // close, so we use a selector instead + nativeSocketChannel.configureBlocking(false) + + selector = NativeSelectorProvider.getInstance().openSelector() + selectorJob = SupervisorJob() + selectorScope = CoroutineScope(Dispatchers.IO + selectorJob) + selectorScope.launch { + selectorLoop() + } + } + + private fun selectorLoop() { + nativeSocketChannel.register(selector, OP_READ) + + while (isRunning.get()) { + synchronized(changeRequests) { + for (changeRequest in changeRequests) { + when (changeRequest.type) { + REGISTER -> { + logger.debug("Processing REGISTER: ${changeRequest.ops}") + changeRequest.channel.register(selector, changeRequest.ops) + } + CHANGE_OPS -> { + logger.debug("Processing CHANGE_OPS: ${changeRequest.ops}") + val key = changeRequest.channel.keyFor(selector) + key.interestOps(changeRequest.ops) + } + } + } + changeRequests.clear() + } + + try { + val numKeys = selector.select() + // we won't get any keys if we wakeup the selector before we select + // (ie, when we make changes to the keys or interest-ops) + if (numKeys > 0) { + val selectedKeys = selector.selectedKeys() + val keyStream = selectedKeys.parallelStream() + keyStream.forEach { + if (it.isReadable && it.isValid) { + val recvBuffer = ByteBuffer.allocate(MAX_RECEIVE_BUFFER_SIZE) + nativeSocketChannel.read(recvBuffer) + recvBuffer.flip() + outgoingQueue.add(recvBuffer) + } + if (it.isWritable && it.isValid) { + if (incomingQueue.isNotEmpty()) { + val buffer = incomingQueue.take() + while (buffer.hasRemaining()) { + nativeSocketChannel.write(buffer) + } + } else { + it.interestOps(OP_READ) + } + } + } + selectedKeys.clear() + } + } catch (e: Exception) { + logger.warn("Exception on select, probably shutting down: $e") + break + } + } + selectorJob.complete() } + /** + * This should not be called from multiple threads, or each thread will get different data. + */ fun read( readBytes: ByteArray, bytesToRead: Int, ): Int { - // why are we doing this and not using nativeSocketChannel read? - // return LibC.read(nativeSocketChannel.fd, readBytes, NativeLong(bytesToRead.toLong())) - val buffer = ByteBuffer.allocate(bytesToRead) - val result = nativeSocketChannel.read(buffer) - logger.debug("Read $result bytes from nativesocket") - if (result > 0) { - buffer.rewind() - buffer.get(readBytes) - buffer.clear() - } else { - logger.debug("Read $result bytes") - } - return result + // this will block until the selector puts something here, to unblock when we're shutting + // down, just stick an empty buffer in the outgoing queue + val buffer = outgoingQueue.take() + val bytesToTake = min(bytesToRead, buffer.remaining()) + logger.debug( + "About to read: $bytesToTake bytes from buffer, position: ${buffer.position()}, limit: ${buffer.limit()}, remaining: ${buffer.remaining()}", + ) + buffer.get(readBytes, 0, bytesToTake) + return bytesToTake } fun write(writeBytes: ByteArray) { - val writeBuffer = ByteBuffer.wrap(writeBytes) - while (writeBuffer.hasRemaining()) { - nativeSocketChannel.write(writeBuffer) + incomingQueue.add(ByteBuffer.wrap(writeBytes)) + synchronized(changeRequests) { + changeRequests.add(ChangeRequest(nativeSocketChannel, CHANGE_OPS, OP_WRITE)) } + selector.wakeup() } fun close() { + isRunning.set(false) + selector.close() nativeSocketChannel.close() + outgoingQueue.put(ByteBuffer.allocate(0)) // unstick any blocking reads + runBlocking { + selectorJob.join() + } } } From e56da3555102841d4c2ba72dbe2dfcc01789f989 Mon Sep 17 00:00:00 2001 From: Jason Ernst Date: Fri, 13 Dec 2024 14:06:57 -0800 Subject: [PATCH 3/4] cleanup --- settings.gradle.kts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/settings.gradle.kts b/settings.gradle.kts index d663960..0df81de 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -16,7 +16,7 @@ dependencyResolutionManagement { } } rootProject.name = "kanonproxy" -include("core") -include("server") -include("client") +include(":core") +include(":server") +include(":client") include(":android") From 5f89dac400c7ca71d2b75dd77197d914877a1d48 Mon Sep 17 00:00:00 2001 From: Jason Ernst Date: Fri, 13 Dec 2024 14:15:34 -0800 Subject: [PATCH 4/4] added the server packet dumper --- .../kotlin/com/jasonernst/kanonproxy/ProxySession.kt | 6 ++++++ .../src/main/kotlin/com/jasonernst/kanonproxy/Server.kt | 9 +++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/server/src/main/kotlin/com/jasonernst/kanonproxy/ProxySession.kt b/server/src/main/kotlin/com/jasonernst/kanonproxy/ProxySession.kt index 85c2754..2833de6 100644 --- a/server/src/main/kotlin/com/jasonernst/kanonproxy/ProxySession.kt +++ b/server/src/main/kotlin/com/jasonernst/kanonproxy/ProxySession.kt @@ -1,6 +1,9 @@ package com.jasonernst.kanonproxy import com.jasonernst.knet.SentinelPacket +import com.jasonernst.packetdumper.AbstractPacketDumper +import com.jasonernst.packetdumper.DummyPacketDumper +import com.jasonernst.packetdumper.ethernet.EtherType import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob @@ -11,6 +14,7 @@ import org.slf4j.LoggerFactory import java.net.DatagramPacket import java.net.DatagramSocket import java.net.InetSocketAddress +import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicBoolean class ProxySession( @@ -18,6 +22,7 @@ class ProxySession( private val kAnonProxy: KAnonProxy, private val socket: DatagramSocket, private val sessionManager: ProxySessionManager, + private val packetDumper: AbstractPacketDumper = DummyPacketDumper, ) { private val logger = LoggerFactory.getLogger(javaClass) private val readFromProxyJob = SupervisorJob() @@ -46,6 +51,7 @@ class ProxySession( } // logger.debug("Received response from proxy for client: $clientAddress, sending datagram back") val buffer = response.toByteArray() + packetDumper.dumpBuffer(ByteBuffer.wrap(buffer), etherType = EtherType.DETECT) val datagramPacket = DatagramPacket(buffer, buffer.size, clientAddress) try { socket.send(datagramPacket) diff --git a/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt b/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt index da3a34e..2b06d6b 100644 --- a/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt +++ b/server/src/main/kotlin/com/jasonernst/kanonproxy/Server.kt @@ -5,6 +5,7 @@ import com.jasonernst.icmp.linux.IcmpLinux import com.jasonernst.knet.Packet import com.jasonernst.packetdumper.AbstractPacketDumper import com.jasonernst.packetdumper.DummyPacketDumper +import com.jasonernst.packetdumper.ethernet.EtherType import com.jasonernst.packetdumper.serverdumper.PcapNgTcpServerPacketDumper import kotlinx.coroutines.CompletableJob import kotlinx.coroutines.CoroutineScope @@ -107,15 +108,15 @@ class Server( stream.put(buffer, 0, packet.length) stream.flip() val packets = Packet.parseStream(stream) -// for (packet in packets) { -// logger.debug("From Client: packet $packet") -// } + for (p in packets) { + packetDumper.dumpBuffer(ByteBuffer.wrap(p.toByteArray()), etherType = EtherType.DETECT) + } val clientAddress = InetSocketAddress(packet.address, packet.port) kAnonProxy.handlePackets(packets, clientAddress) var newSession = false sessions.getOrPut(clientAddress) { newSession = true - val session = ProxySession(clientAddress, kAnonProxy, socket, this) + val session = ProxySession(clientAddress, kAnonProxy, socket, this, packetDumper) session.start() session }