Skip to content

Commit

Permalink
fix: ConcurrentModificationException when disconnecting from session …
Browse files Browse the repository at this point in the history
…while in a screen-command context
  • Loading branch information
asforest committed Jan 7, 2022
1 parent 577eea9 commit 032149b
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 14 deletions.
75 changes: 66 additions & 9 deletions src/main/kotlin/event/Event.kt
Original file line number Diff line number Diff line change
@@ -1,39 +1,95 @@
package com.github.asforest.mshell.event

import com.github.asforest.mshell.exception.EventLoopReachCountLimitException
import com.github.asforest.mshell.exception.ListenerAlreadyAddedException
import com.github.asforest.mshell.exception.ListenerNotFoundException
import kotlin.reflect.jvm.reflect

class Event<TContext, TCallback>(val context: TContext)
: Iterable<Event.Listener<TCallback>> where TCallback : Function<Unit>
{
val listeners = mutableListOf<Listener<TCallback>>()

fun always(cb: TCallback)
fun always(label: String? = null, cb: TCallback)
{
this += Listener(cb, ListenerType.ALWAYS)
addListener(cb, label, ListenerType.ALWAYS)
}

fun once(cb: TCallback)
fun once(label: String? = null, cb: TCallback)
{
this += Listener(cb, ListenerType.ONCE)
addListener(cb, label, ListenerType.ONCE)
}

fun invokeSuspend(calling: (arg: TCallback) -> Unit)
private fun addListener(callback: TCallback, label: String? = null, type: ListenerType)
{
listeners.forEach { calling(it.callback) }
listeners.removeIf { it.type == ListenerType.ONCE }
val _label = label ?: (callback.reflect()?.name ?: callback.hashCode().toString())
println(_label)
this += Listener(callback, _label, type)
}

operator fun plusAssign(listener: Listener<TCallback>)
{
if(listener in this)
throw ListenerAlreadyAddedException(listener.label)

listeners += listener
}

operator fun minusAssign(listener: Listener<TCallback>)
{
if(listener !in this)
throw ListenerNotFoundException(listener.label)

listeners -= listener
}

operator fun minusAssign(label: String)
{
if(label !in this)
throw ListenerNotFoundException(label)

listeners.removeIf { it.label == label }
}

operator fun invoke(calling: (arg: TCallback) -> Unit)
{
invokeSuspend(calling)
var limit = 100000

while (true)
{
val listenersToTrigger = listeners.filter { it.type != ListenerType.NEVER }

for (listener in listenersToTrigger)
{
calling(listener.callback)

if(listener.type == ListenerType.ONCE)
listener.type = ListenerType.NEVER
}

if(listenersToTrigger.isEmpty())
break

if(limit-- <= 0)
throw EventLoopReachCountLimitException(100000)
}

listeners.removeIf { it.type == ListenerType.NEVER }
}

operator fun contains(listener: Listener<TCallback>): Boolean
{
return listeners.any { it.label == listener.label }
}

operator fun contains(label: String): Boolean
{
return listeners.any { it.label == label }
}

operator fun get(label: String): Listener<TCallback>?
{
return listeners.firstOrNull { it.label == label }
}

override fun iterator(): MutableIterator<Listener<TCallback>>
Expand All @@ -43,7 +99,8 @@ class Event<TContext, TCallback>(val context: TContext)

data class Listener<Callback>(
val callback: Callback,
val type: ListenerType
val label: String,
var type: ListenerType,
)

enum class ListenerType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.github.asforest.mshell.exception

class EventLoopReachCountLimitException(maxCount: Int)
: BaseException("Event loop reached the max count($maxCount) of Event class")
4 changes: 4 additions & 0 deletions src/main/kotlin/exception/ListenerAlreadyAddedException.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.github.asforest.mshell.exception

class ListenerAlreadyAddedException(label: String)
: BaseException("The Listener($label) has already been added to the event object")
6 changes: 6 additions & 0 deletions src/main/kotlin/exception/ListenerNotFoundException.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.asforest.mshell.exception

import com.github.asforest.mshell.event.Event

class ListenerNotFoundException(label: String)
: BaseException("The Listener(label) not found in event object")
18 changes: 13 additions & 5 deletions src/main/kotlin/stream/BatchingWriter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package com.github.asforest.mshell.stream

import com.github.asforest.mshell.configuration.MShellConfig
import com.github.asforest.mshell.event.Event
import com.github.asforest.mshell.exception.ListenerAlreadyAddedException
import com.github.asforest.mshell.util.AnsiEscapeUtil
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import java.io.Writer
import java.lang.Exception
import java.util.concurrent.LinkedBlockingQueue
Expand All @@ -31,7 +29,15 @@ class BatchingWriter(
{
// 如果队里里没有消息,则将协程暂时挂起,进行等待
if(buffer.isEmpty())
suspendCoroutine<Unit> { onNewDataArrive.once { it.resume(Unit) } }
{
suspendCoroutine<Unit> {
// 更新Continuation对象
if("onNewDataArriveListenser" in onNewDataArrive)
onNewDataArrive -= "onNewDataArriveListenser"

onNewDataArrive.once("onNewDataArriveListenser") { it.resume(Unit) }
}
}

// 进行消息合批
val buffered = StringBuffer()
Expand Down Expand Up @@ -77,6 +83,7 @@ class BatchingWriter(
override fun flush()
{
buffer += MessageBuffered()

onNewDataArrive { it() }
}

Expand All @@ -87,6 +94,7 @@ class BatchingWriter(
} catch (e: IllegalStateException) {
throw BatchingBwriterBufferOverflowException("The buffer of BatchingWriter overflows")
}

onNewDataArrive { it() }
}

Expand Down

0 comments on commit 032149b

Please sign in to comment.