Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed bidirectional getting stuck #53

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,75 +1,69 @@
package com.jasonernst.kanonproxy

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import java.nio.ByteBuffer
import java.nio.channels.ByteChannel
import kotlin.math.min

class BidirectionalByteChannel : ByteChannel {
private val logger = LoggerFactory.getLogger(javaClass)
private val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)
private var isOpen = true
private val readyToWrite = MutableStateFlow(true)
private val readyToRead = MutableStateFlow(false)
// private val readyToRead = MutableStateFlow(false)

override fun isOpen(): Boolean = isOpen

override fun close() {
this.isOpen = false
readyToRead.value = true
// readyToRead.value = true
}

override fun write(src: ByteBuffer): Int {
if (readyToWrite.value.not()) {
runBlocking {
readyToWrite.takeWhile { !it }.collect {}
}
logger.debug("Waiting to write: ${src.limit()} bytes")
synchronized(buffer) {
val availableBytes = min(buffer.remaining(), src.remaining())
buffer.put(src.array(), src.position(), availableBytes)
src.position(src.position() + availableBytes)
// readyToRead.value = true
logger.debug("Wrote $availableBytes bytes")
return availableBytes
}
readyToRead.value = false
val availableBytes = min(buffer.remaining(), src.remaining())
buffer.put(src.array(), src.position(), availableBytes)
src.position(src.position() + availableBytes)
readyToRead.value = true
return availableBytes
}

override fun read(dst: ByteBuffer): Int {
// when this function is called, we expect the buffer is pointing to the end of what was written to it
// if its at zero, there is nothing to read
if (buffer.position() == 0) {
runBlocking {
readyToRead.takeWhile { !it }.collect {}
}
}
// if (buffer.position() == 0) {
// runBlocking {
// readyToRead.takeWhile { !it }.collect {}
// }
// }
if (!isOpen) {
return 0
}
readyToWrite.value = false
// just in case its flipped in the time it took us to get here
if (readyToRead.value.not()) {
readyToWrite.value = true
return 0
}

// flip the buffer to get it from write mode to read mode
buffer.flip()
val availableBytes = min(buffer.remaining(), dst.remaining())
dst.put(buffer.array(), buffer.position(), availableBytes)
buffer.position(buffer.position() + availableBytes)
if (!buffer.hasRemaining()) {
readyToRead.value = false
synchronized(buffer) {
// flip the buffer to get it from write mode to read mode
buffer.flip()
val availableBytes = min(buffer.remaining(), dst.remaining())
dst.put(buffer.array(), buffer.position(), availableBytes)
buffer.position(buffer.position() + availableBytes)
// if (!buffer.hasRemaining()) {
// readyToRead.value = false
// }
// compact to get us back into read mode
buffer.compact()
return availableBytes
}
// compact to get us back into read mode
buffer.compact()
readyToWrite.value = true
return availableBytes
}

fun available(): Int =
if (readyToRead.value.not()) {
0
} else {
buffer.position() // because we haven't flipped yet, this will be how many bytes there are to read
// if (readyToRead.value.not()) {
// 0
// } else {
// buffer.position() // because we haven't flipped yet, this will be how many bytes there are to read
// }
synchronized(buffer) {
return buffer.position()
}
}
11 changes: 8 additions & 3 deletions core/src/test/kotlin/com/jasonernst/kanonproxy/tcp/TcpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,14 @@ class TcpClient(
fun recv(buffer: ByteBuffer) {
logger.debug("Waiting for up to ${buffer.remaining()} bytes")
while (buffer.hasRemaining()) {
val byteRead = channel.read(buffer)
logger.debug("READ: $byteRead")
if (isPsh.get()) {
val bytesRead = channel.read(buffer)
if (bytesRead > 0) {
logger.debug("READ: $bytesRead")
}
if (isPsh.get() && buffer.position() == 0) {
logger.warn("PSH received but haven't received any data yet, still waiting")
}
if (isPsh.get() && buffer.position() > 0) {
isPsh.set(false)
logger.debug("PSH received, returning from read before buffer full")
break
Expand Down
Loading