Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize observation handling #193

Merged
merged 28 commits into from
Jan 18, 2022
Merged

Optimize observation handling #193

merged 28 commits into from
Jan 18, 2022

Conversation

twyatt
Copy link
Member

@twyatt twyatt commented Nov 22, 2021

Observation handling has been reworked/optimized and moved to common code.

@twyatt twyatt linked an issue Nov 22, 2021 that may be closed by this pull request
Base automatically changed from twyatt/apple/hmpp to main November 22, 2021 23:20
Copy link
Contributor

@davertay-j davertay-j left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome refactor, great work.

core/src/commonMain/kotlin/Observers.kt Show resolved Hide resolved
@twyatt twyatt requested a review from Phoenix7351 as a code owner December 8, 2021 19:03
) {

val characteristicChanges = MutableSharedFlow<ObservationEvent<T>>(extraBufferCapacity = extraBufferCapacity)
private val observations = synchronizedMapOf<Characteristic, Observation>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking that this works on Native. SynchronizedMap ensures that there isn't concurrent access, but unlike stately-isolate this doesn't enforce that all reads/writes take place on the same thread. Is Kable already using the new native memory model?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not using the new memory model, I think I tested MacOS w/ the change, but I'm not totally confident.
Thanks for the callout, I'll test before merging.

This comment was marked as outdated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested via SensorTag sample using this branch.

It worked on macOS!

Doh, I spoke too soon. 😢

Crash
Uncaught Kotlin exception: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen kotlin.Array@ed80c308
    at 0   sensortag.kexe                      0x000000010b8901c1 kfun:kotlin.Throwable#<init>(kotlin.String?){} + 97 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Throwable.kt:24:56)
    at 1   sensortag.kexe                      0x000000010b889e2d kfun:kotlin.Exception#<init>(kotlin.String?){} + 93 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:23:58)
    at 2   sensortag.kexe                      0x000000010b889fcd kfun:kotlin.RuntimeException#<init>(kotlin.String?){} + 93 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:34:58)
    at 3   sensortag.kexe                      0x000000010b8bac5d kfun:kotlin.native.concurrent.InvalidMutabilityException#<init>(kotlin.String){} + 93 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/native/concurrent/Freezing.kt:24:85)
    at 4   sensortag.kexe                      0x000000010b8bc2bf ThrowInvalidMutabilityException + 431 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/native/concurrent/Internal.kt:109:11)
    at 5   sensortag.kexe                      0x000000010ba06a69 Kotlin_Array_copyImpl + 153
    at 6   sensortag.kexe                      0x000000010b882135 kfun:kotlin.collections#copyInto__at__kotlin.Array<out|0:0>(kotlin.Array<0:0>;kotlin.Int;kotlin.Int;kotlin.Int){0§<kotlin.Any?>}kotlin.Array<0:0> + 277 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/generated/_ArraysNative.kt:1023:2)
    at 7   sensortag.kexe                      0x000000010b89d608 kfun:kotlin.collections.ArrayList.insertAtInternal#internal + 536 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/collections/ArrayList.kt:224:15)
    at 8   sensortag.kexe                      0x000000010b89d934 kfun:kotlin.collections.ArrayList.addAtInternal#internal + 644 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/collections/ArrayList.kt:234:13)
    at 9   sensortag.kexe                      0x000000010b89aa18 kfun:kotlin.collections.ArrayList#add(1:0){}kotlin.Boolean + 152 (/Users/teamcity1/teamcity_work/6326934d18cfe24e/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/collections/ArrayList.kt:81:16)
    at 10  sensortag.kexe                      0x000000010bb62c35 kfun:com.juul.kable.Observation.$onSubscriptionCOROUTINE$58#invokeSuspend(kotlin.Result<kotlin.Any?>){}kotlin.Any? + 2261 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observation.kt:37:20)
    at 11  sensortag.kexe                      0x000000010bb63d61 kfun:com.juul.kable.Observation#onSubscription(kotlin.coroutines.SuspendFunction0<kotlin.Unit>){} + 353 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observation.kt:36:13)
    at 12  sensortag.kexe                      0x000000010bb6c3ba kfun:com.juul.kable.Observers.$acquire$lambda-0COROUTINE$68.invokeSuspend#internal + 554 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observers.kt:57:43)
    at 13  sensortag.kexe                      0x000000010bb6c8d6 kfun:com.juul.kable.Observers.$acquire$lambda-0COROUTINE$68.invoke#internal + 310 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observers.kt:57:29)
    at 14  sensortag.kexe                      0x000000010baa0521 kfun:kotlinx.coroutines.flow.SubscribedFlowCollector.$onSubscriptionCOROUTINE$151#invokeSuspend(kotlin.Result<kotlin.Any?>){}kotlin.Any? + 1329 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Share.kt:419:27)
    at 15  sensortag.kexe                      0x000000010baa0ae4 kfun:kotlinx.coroutines.flow.SubscribedFlowCollector#onSubscription(){} + 276 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Share.kt:416:13)
    at 16  sensortag.kexe                      0x000000010ba7fb60 kfun:kotlinx.coroutines.flow.SharedFlowImpl.$collectCOROUTINE$74#invokeSuspend(kotlin.Result<kotlin.Any?>){}kotlin.Any? + 3760 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt:361:65)
    at 17  sensortag.kexe                      0x000000010ba7fea1 kfun:kotlinx.coroutines.flow.SharedFlowImpl#collect(kotlinx.coroutines.flow.FlowCollector<1:0>){}kotlin.Nothing + 353 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt:358:22)
    at 18  sensortag.kexe                      0x000000010ba9f7f1 kfun:kotlinx.coroutines.flow.SubscribedSharedFlow.$collectCOROUTINE$150.invokeSuspend#internal + 785 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Share.kt:409:20)
    at 19  sensortag.kexe                      0x000000010ba9fb51 kfun:kotlinx.coroutines.flow.SubscribedSharedFlow.collect#internal + 353 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Share.kt:408:22)
    at 20  sensortag.kexe                      0x000000010bb6d939 kfun:com.juul.kable.Observers.<no name provided>_1.$collect_2COROUTINE$63.invokeSuspend#internal + 809 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observers.kt:76:819)
    at 21  sensortag.kexe                      0x000000010bb6dc31 kfun:com.juul.kable.Observers.<no name provided>_1.collect_2#internal + 353 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observers.kt:76:819)
    at 22  sensortag.kexe                      0x000000010bb6eba7 kfun:com.juul.kable.Observers.<no name provided>_1.$collect_2COROUTINE$65.invokeSuspend#internal + 791 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observers.kt:76:819)
    at 23  sensortag.kexe                      0x000000010bb6ee91 kfun:com.juul.kable.Observers.<no name provided>_1.collect_2#internal.64 + 353 (/Users/travis/Projects/kable/core/src/commonMain/kotlin/Observers.kt:76:819)
    at 24  sensortag.kexe                      0x000000010ba9b468 kfun:kotlinx.coroutines.flow.<no name provided>_1.$collect_2COROUTINE$115.invokeSuspend#internal + 1064 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt:114:49)
    at 25  sensortag.kexe                      0x000000010ba9bda1 kfun:kotlinx.coroutines.flow.<no name provided>_1.collect_2#internal.128 + 353 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt:114:49)
    at 26  sensortag.kexe                      0x000000010bb34d47 kfun:com.juul.kable.ApplePeripheral.<no name provided>_1.$collect_2COROUTINE$21.invokeSuspend#internal + 791 (/Users/travis/Projects/kable/core/src/appleMain/kotlin/Peripheral.kt:82:36)
    at 27  sensortag.kexe                      0x000000010bb35031 kfun:com.juul.kable.ApplePeripheral.<no name provided>_1.collect_2#internal.35 + 353 (/Users/travis/Projects/kable/core/src/appleMain/kotlin/Peripheral.kt:82:36)
    at 28  sensortag.kexe                      0x000000010b86cd57 kfun:com.juul.sensortag.SensorTag.<no name provided>_1.$collect_2COROUTINE$0.invokeSuspend#internal + 791 (/Users/travis/Projects/sensortag/app/src/commonMain/kotlin/SensorTag.kt:82:1198)

I'll investigate and fix before merging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed crash by reverting back to using Stately for the subscription actions list:

private val subscribers = IsoMutableList<OnSubscriptionAction>()

I'm still not sure if synchronizedMapOf is safe for observations collection. I'm not sure how to test it considering you really only get a single thread in the Native Coroutines world, and that single thread is what I'm testing the Flow subscription from (which in turn accesses this map).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back to using Stately for observation collection handling; as it was a safer approach.
Will revisit and clean up later when we use the new memory model (#237).

@twyatt twyatt added this to the 0.11.0 milestone Jan 6, 2022
@twyatt

This comment has been minimized.

@twyatt twyatt requested a review from davertay-j January 12, 2022 09:52
@twyatt twyatt modified the milestones: 0.11.0, 0.12.0 Jan 12, 2022
core/src/commonMain/kotlin/Observation.kt Outdated Show resolved Hide resolved
core/src/commonTest/kotlin/ObservationTest.kt Show resolved Hide resolved
@twyatt

This comment has been minimized.

@twyatt twyatt merged commit 3597853 into main Jan 18, 2022
@twyatt twyatt deleted the twyatt/observe branch January 18, 2022 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Should not stop notifications/indications if not previously started
3 participants