Skip to content

Commit

Permalink
Decode packets in netty event loop group (#1500)
Browse files Browse the repository at this point in the history
* Decode packets in netty event loop group

* Update mirai-core/src/commonTest/kotlin/network/framework/AbstractNettyNHTest.kt

Co-authored-by: Him188 <Him188@mamoe.net>

Co-authored-by: Him188 <Him188@mamoe.net>
  • Loading branch information
Karlatemp and Him188 authored Sep 1, 2021
1 parent 4d76aa8 commit 88b66d7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ internal open class NettyNetworkHandler(
.addLast(RawIncomingPacketCollector(decodePipeline))
}

protected open fun createDummyDecodePipeline() = PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)

// can be overridden for tests
protected open suspend fun createConnection(decodePipeline: PacketDecodePipeline): NettyChannel {
protected open suspend fun createConnection(): NettyChannel {
packetLogger.debug { "Connecting to $address" }

val contextResult = CompletableDeferred<NettyChannel>()
val eventLoopGroup = NioEventLoopGroup()
val decodePipeline = PacketDecodePipeline(
this@NettyNetworkHandler.coroutineContext
.plus(eventLoopGroup.asCoroutineDispatcher())
)

val future = Bootstrap().group(eventLoopGroup)
.channel(NioSocketChannel::class.java)
Expand Down Expand Up @@ -159,8 +165,6 @@ internal open class NettyNetworkHandler(
return contextResult.await()
}

protected val decodePipeline = PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)

protected inner class PacketDecodePipeline(parentContext: CoroutineContext) :
CoroutineScope by parentContext.childScope() {
private val packetCodec: PacketCodec by lazy { context[PacketCodec] }
Expand Down Expand Up @@ -241,7 +245,7 @@ internal open class NettyNetworkHandler(
private val collectiveExceptions: ExceptionCollector,
) : NettyState(State.CONNECTING) {
private val connection = async {
createConnection(decodePipeline)
createConnection()
}

@Suppress("JoinDeclarationAndAssignment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ internal abstract class TestNettyNH(
address: SocketAddress,
) : NettyNetworkHandler(context, address), ITestNetworkHandler {

abstract override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel
protected abstract suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel
final override suspend fun createConnection(): Channel {
return createConnection(createDummyDecodePipeline())
}

override fun setStateClosed(exception: Throwable?): NetworkHandlerSupport.BaseStateImpl? {
return setState { StateClosed(exception) }
Expand Down

0 comments on commit 88b66d7

Please sign in to comment.