Skip to content

Commit

Permalink
KTOR-3455 Fix reading request body backpressure in Netty (#3361)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsinukov authored Jan 16, 2023
1 parent b3c696e commit 77fbd34
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.netty.buffer.*
import io.netty.channel.*
import io.netty.handler.codec.http.*
import io.netty.util.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.lang.Integer.*
Expand All @@ -18,6 +19,7 @@ internal class RequestBodyHandler(
val context: ChannelHandlerContext
) : ChannelInboundHandlerAdapter(), CoroutineScope {
private val handlerJob = CompletableDeferred<Nothing>()
private val buffersInProcessingCount = atomic(0)

private val queue = Channel<Any>(Channel.UNLIMITED)

Expand Down Expand Up @@ -137,7 +139,9 @@ internal class RequestBodyHandler(
}

private fun requestMoreEvents() {
context.read()
if (buffersInProcessingCount.decrementAndGet() == 0) {
context.read()
}
}

@OptIn(ExperimentalCoroutinesApi::class)
Expand Down Expand Up @@ -169,6 +173,7 @@ internal class RequestBodyHandler(
}

private fun handleBytesRead(content: ReferenceCounted) {
buffersInProcessingCount.incrementAndGet()
if (!queue.trySend(content).isSuccess) {
content.release()
throw IllegalStateException("Unable to process received buffer: queue offer failed")
Expand Down

0 comments on commit 77fbd34

Please sign in to comment.