Skip to content

Commit

Permalink
Merge pull request #1970 from OneSignal/ConcurrentModificationInEvent…
Browse files Browse the repository at this point in the history
…Producer

Concurrent modification in event producer
  • Loading branch information
jinliu9508 committed Jan 29, 2024
2 parents 709b441 + a74f95d commit 3f4651f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
* @param callback The callback will be invoked for each subscribed handler, allowing you to call the handler.
*/
fun fire(callback: (THandler) -> Unit) {
synchronized(subscribers) {
for (s in subscribers) {
callback(s)
}
val localList = subscribers.toList()
for (s in localList) {
callback(s)
}
}

Expand All @@ -61,7 +60,8 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
*/
fun fireOnMain(callback: (THandler) -> Unit) {
suspendifyOnMain {
for (s in subscribers) {
val localList = subscribers.toList()
for (s in localList) {
callback(s)
}
}
Expand All @@ -74,7 +74,8 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
* @param callback The callback will be invoked for each subscribed handler, allowing you to call the handler.
*/
suspend fun suspendingFire(callback: suspend (THandler) -> Unit) {
for (s in subscribers) {
val localList = subscribers.toList()
for (s in localList) {
callback(s)
}
}
Expand All @@ -87,7 +88,8 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
*/
suspend fun suspendingFireOnMain(callback: suspend (THandler) -> Unit) {
withContext(Dispatchers.Main) {
for (s in subscribers) {
val localList = subscribers.toList()
for (s in localList) {
callback(s)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.onesignal.mocks.MockPreferencesService
import com.onesignal.user.internal.subscriptions.SubscriptionModel
import com.onesignal.user.internal.subscriptions.SubscriptionModelStore
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.kotest.runner.junit4.KotestTestRunner
import junit.framework.TestCase
import org.junit.runner.RunWith
Expand Down Expand Up @@ -70,13 +71,11 @@ class ModelingTests : FunSpec({
val t1 =
Thread {
// acquire "ModelStore.models", then trigger the onChanged event
System.out.println("1")
modelStore.add(newSubscriptionModel)
}

val t2 =
Thread {
System.out.println("2")
// acquire "model.data", then wait for "ModelStore.models"
newSubscriptionModel.toJSON()
}
Expand Down Expand Up @@ -116,4 +115,29 @@ class ModelingTests : FunSpec({
// verify if the thread has been successfully terminated
TestCase.assertEquals(Thread.State.TERMINATED, t2.state)
}

test("Unsubscribing handler in change event may cause the concurrent modification exception") {
// Given an arbitrary model
val modelStore = MockHelper.configModelStore()
val model = modelStore.model

// subscribe to a change handler
model.subscribe(
object : IModelChangedHandler {
override fun onChanged(
args: ModelChangedArgs,
tag: String,
) {
// remove from "subscribers" while "subscribers" is being accessed
model.unsubscribe(this)
}
},
)

// this will trigger EventProducer.fire and loop through the list "subscribers"
model.setOptAnyProperty("key1", "value1")

// ensure no concurrent modification exception is thrown and "subcribers" is clear
model.hasSubscribers shouldBe false
}
})

0 comments on commit 3f4651f

Please sign in to comment.