Skip to content

Commit

Permalink
Merge pull request #52 from compscidr/jason/mutex-bidirectionalbytech…
Browse files Browse the repository at this point in the history
…annel

Added mutex in bidrectional channel
  • Loading branch information
compscidr authored Dec 2, 2024
2 parents 900726b + c9cf30e commit 9cdd3af
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlin.math.min
class BidirectionalByteChannel : ByteChannel {
private val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)
private var isOpen = true

private val readyToWrite = MutableStateFlow(true)
private val readyToRead = MutableStateFlow(false)

override fun isOpen(): Boolean = isOpen
Expand All @@ -21,6 +21,12 @@ class BidirectionalByteChannel : ByteChannel {
}

override fun write(src: ByteBuffer): Int {
if (readyToWrite.value.not()) {
runBlocking {
readyToWrite.takeWhile { !it }.collect {}
}
}
readyToRead.value = false
val availableBytes = min(buffer.remaining(), src.remaining())
buffer.put(src.array(), src.position(), availableBytes)
src.position(src.position() + availableBytes)
Expand All @@ -39,6 +45,13 @@ class BidirectionalByteChannel : ByteChannel {
if (!isOpen) {
return 0
}
readyToWrite.value = false
// just in case its flipped in the time it took us to get here
if (readyToRead.value.not()) {
readyToWrite.value = true
return 0
}

// flip the buffer to get it from write mode to read mode
buffer.flip()
val availableBytes = min(buffer.remaining(), dst.remaining())
Expand All @@ -49,6 +62,7 @@ class BidirectionalByteChannel : ByteChannel {
}
// compact to get us back into read mode
buffer.compact()
readyToWrite.value = true
return availableBytes
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class TcpHandlingTest {
fun setup() {
tcpEchoServer.start()
packetDumper.start()
staticLogger.debug("Delaying to connect to wireshark")
Thread.sleep(5000)
// staticLogger.debug("Delaying to connect to wireshark")
// Thread.sleep(5000)
}

@JvmStatic
Expand Down

0 comments on commit 9cdd3af

Please sign in to comment.