Skip to content

Commit

Permalink
fixed bidirectional getting stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
compscidr committed Dec 3, 2024
1 parent 9cdd3af commit b9d987a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,75 +1,69 @@
package com.jasonernst.kanonproxy

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import java.nio.ByteBuffer
import java.nio.channels.ByteChannel
import kotlin.math.min

class BidirectionalByteChannel : ByteChannel {
private val logger = LoggerFactory.getLogger(javaClass)
private val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)
private var isOpen = true
private val readyToWrite = MutableStateFlow(true)
private val readyToRead = MutableStateFlow(false)
// private val readyToRead = MutableStateFlow(false)

override fun isOpen(): Boolean = isOpen

override fun close() {
this.isOpen = false
readyToRead.value = true
// readyToRead.value = true
}

override fun write(src: ByteBuffer): Int {
if (readyToWrite.value.not()) {
runBlocking {
readyToWrite.takeWhile { !it }.collect {}
}
logger.debug("Waiting to write: ${src.limit()} bytes")
synchronized(buffer) {
val availableBytes = min(buffer.remaining(), src.remaining())
buffer.put(src.array(), src.position(), availableBytes)
src.position(src.position() + availableBytes)
// readyToRead.value = true
logger.debug("Wrote $availableBytes bytes")
return availableBytes
}
readyToRead.value = false
val availableBytes = min(buffer.remaining(), src.remaining())
buffer.put(src.array(), src.position(), availableBytes)
src.position(src.position() + availableBytes)
readyToRead.value = true
return availableBytes
}

override fun read(dst: ByteBuffer): Int {
// when this function is called, we expect the buffer is pointing to the end of what was written to it
// if its at zero, there is nothing to read
if (buffer.position() == 0) {
runBlocking {
readyToRead.takeWhile { !it }.collect {}
}
}
// if (buffer.position() == 0) {
// runBlocking {
// readyToRead.takeWhile { !it }.collect {}
// }
// }
if (!isOpen) {
return 0
}
readyToWrite.value = false
// just in case its flipped in the time it took us to get here
if (readyToRead.value.not()) {
readyToWrite.value = true
return 0
}

// flip the buffer to get it from write mode to read mode
buffer.flip()
val availableBytes = min(buffer.remaining(), dst.remaining())
dst.put(buffer.array(), buffer.position(), availableBytes)
buffer.position(buffer.position() + availableBytes)
if (!buffer.hasRemaining()) {
readyToRead.value = false
synchronized(buffer) {
// flip the buffer to get it from write mode to read mode
buffer.flip()
val availableBytes = min(buffer.remaining(), dst.remaining())
dst.put(buffer.array(), buffer.position(), availableBytes)
buffer.position(buffer.position() + availableBytes)
// if (!buffer.hasRemaining()) {
// readyToRead.value = false
// }
// compact to get us back into read mode
buffer.compact()
return availableBytes
}
// compact to get us back into read mode
buffer.compact()
readyToWrite.value = true
return availableBytes
}

fun available(): Int =
if (readyToRead.value.not()) {
0
} else {
buffer.position() // because we haven't flipped yet, this will be how many bytes there are to read
// if (readyToRead.value.not()) {
// 0
// } else {
// buffer.position() // because we haven't flipped yet, this will be how many bytes there are to read
// }
synchronized(buffer) {
return buffer.position()
}
}
11 changes: 8 additions & 3 deletions core/src/test/kotlin/com/jasonernst/kanonproxy/tcp/TcpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,14 @@ class TcpClient(
fun recv(buffer: ByteBuffer) {
logger.debug("Waiting for up to ${buffer.remaining()} bytes")
while (buffer.hasRemaining()) {
val byteRead = channel.read(buffer)
logger.debug("READ: $byteRead")
if (isPsh.get()) {
val bytesRead = channel.read(buffer)
if (bytesRead > 0) {
logger.debug("READ: $bytesRead")
}
if (isPsh.get() && buffer.position() == 0) {
logger.warn("PSH received but haven't received any data yet, still waiting")
}
if (isPsh.get() && buffer.position() > 0) {
isPsh.set(false)
logger.debug("PSH received, returning from read before buffer full")
break
Expand Down

0 comments on commit b9d987a

Please sign in to comment.