Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent modification in event producer #1970

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
* @param callback The callback will be invoked for each subscribed handler, allowing you to call the handler.
*/
fun fire(callback: (THandler) -> Unit) {
synchronized(subscribers) {
for (s in subscribers) {
callback(s)
}
val localList = subscribers.toList()
jkasten2 marked this conversation as resolved.
Show resolved Hide resolved
for (s in localList) {
callback(s)
}
}

Expand All @@ -61,7 +60,8 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
*/
fun fireOnMain(callback: (THandler) -> Unit) {
suspendifyOnMain {
for (s in subscribers) {
val localList = subscribers.toList()
for (s in localList) {
callback(s)
}
}
Expand All @@ -74,7 +74,8 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
* @param callback The callback will be invoked for each subscribed handler, allowing you to call the handler.
*/
suspend fun suspendingFire(callback: suspend (THandler) -> Unit) {
for (s in subscribers) {
val localList = subscribers.toList()
for (s in localList) {
callback(s)
}
}
Expand All @@ -87,7 +88,8 @@ open class EventProducer<THandler> : IEventNotifier<THandler> {
*/
suspend fun suspendingFireOnMain(callback: suspend (THandler) -> Unit) {
withContext(Dispatchers.Main) {
for (s in subscribers) {
val localList = subscribers.toList()
for (s in localList) {
callback(s)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.onesignal.mocks.MockPreferencesService
import com.onesignal.user.internal.subscriptions.SubscriptionModel
import com.onesignal.user.internal.subscriptions.SubscriptionModelStore
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.kotest.runner.junit4.KotestTestRunner
import junit.framework.TestCase
import org.junit.runner.RunWith
Expand Down Expand Up @@ -70,13 +71,11 @@ class ModelingTests : FunSpec({
val t1 =
Thread {
// acquire "ModelStore.models", then trigger the onChanged event
System.out.println("1")
modelStore.add(newSubscriptionModel)
}

val t2 =
Thread {
System.out.println("2")
// acquire "model.data", then wait for "ModelStore.models"
newSubscriptionModel.toJSON()
}
Expand Down Expand Up @@ -116,4 +115,29 @@ class ModelingTests : FunSpec({
// verify if the thread has been successfully terminated
TestCase.assertEquals(Thread.State.TERMINATED, t2.state)
}

test("Unsubscribing handler in change event may cause the concurrent modification exception") {
// Given an arbitrary model
val modelStore = MockHelper.configModelStore()
val model = modelStore.model

// subscribe to a change handler
model.subscribe(
object : IModelChangedHandler {
override fun onChanged(
args: ModelChangedArgs,
tag: String,
) {
// remove from "subscribers" while "subscribers" is being accessed
model.unsubscribe(this)
}
},
)

// this will trigger EventProducer.fire and loop through the list "subscribers"
model.setOptAnyProperty("key1", "value1")

// ensure no concurrent modification exception is thrown and "subcribers" is clear
model.hasSubscribers shouldBe false
}
})
Loading