diff --git a/core/api/core.api b/core/api/core.api index b990b60ca..871b7bcaf 100644 --- a/core/api/core.api +++ b/core/api/core.api @@ -17,7 +17,7 @@ public final class com/juul/kable/AndroidPeripheral : com/juul/kable/Peripheral public fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun getMtu ()Lkotlinx/coroutines/flow/StateFlow; public fun getServices ()Ljava/util/List; - public fun getState ()Lkotlinx/coroutines/flow/Flow; + public fun getState ()Lkotlinx/coroutines/flow/StateFlow; public fun observe (Lcom/juul/kable/Characteristic;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public fun read (Lcom/juul/kable/Characteristic;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun read (Lcom/juul/kable/Descriptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -58,7 +58,7 @@ public final class com/juul/kable/CharacteristicKt { public static final fun characteristicOf (Ljava/lang/String;Ljava/lang/String;)Lcom/juul/kable/Characteristic; } -public final class com/juul/kable/ConnectionLostException : java/io/IOException { +public final class com/juul/kable/ConnectionLostException : com/juul/kable/NotConnectedException { public fun ()V } @@ -169,7 +169,11 @@ public final class com/juul/kable/ManufacturerData { public final fun getData ()[B } -public final class com/juul/kable/NotReadyException : java/io/IOException { +public class com/juul/kable/NotConnectedException : java/io/IOException { + public fun ()V +} + +public final class com/juul/kable/NotReadyException : com/juul/kable/NotConnectedException { public fun ()V } @@ -183,7 +187,7 @@ public abstract interface class com/juul/kable/Peripheral { public abstract fun connect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getServices ()Ljava/util/List; - public abstract fun getState ()Lkotlinx/coroutines/flow/Flow; + public abstract fun getState ()Lkotlinx/coroutines/flow/StateFlow; public abstract fun observe (Lcom/juul/kable/Characteristic;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public abstract fun read (Lcom/juul/kable/Characteristic;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun read (Lcom/juul/kable/Descriptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/core/build.gradle.kts b/core/build.gradle.kts index aa8e83604..0e0d81db7 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -34,6 +34,8 @@ kotlin { dependencies { api(libs.kotlinx.coroutines.core) api(libs.uuid) + implementation(libs.tuulbox.collections) + implementation(libs.stately.collections) } } @@ -41,6 +43,8 @@ kotlin { dependencies { implementation(kotlin("test-common")) implementation(kotlin("test-annotations-common")) + implementation(libs.tuulbox.logging) + implementation(libs.kotlinx.coroutines.test) } } @@ -72,9 +76,6 @@ kotlin { val appleMain by creating { dependsOn(commonMain) - dependencies { - implementation(libs.stately) - } } val appleTest by creating diff --git a/core/src/androidMain/kotlin/Connection.kt b/core/src/androidMain/kotlin/Connection.kt index 50079a31a..f9e93640a 100644 --- a/core/src/androidMain/kotlin/Connection.kt +++ b/core/src/androidMain/kotlin/Connection.kt @@ -2,7 +2,7 @@ package com.juul.kable import android.bluetooth.BluetoothGatt import android.bluetooth.BluetoothGatt.GATT_SUCCESS -import com.juul.kable.AndroidObservationEvent.CharacteristicChange +import com.juul.kable.ObservationEvent.CharacteristicChange import com.juul.kable.gatt.Callback import com.juul.kable.gatt.GattStatus import kotlinx.coroutines.CoroutineDispatcher diff --git a/core/src/androidMain/kotlin/Observations.kt b/core/src/androidMain/kotlin/Observations.kt new file mode 100644 index 000000000..889313af3 --- /dev/null +++ b/core/src/androidMain/kotlin/Observations.kt @@ -0,0 +1,11 @@ +package com.juul.kable + +internal actual fun Peripheral.observationHandler(): Observation.Handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) { + (this@observationHandler as AndroidPeripheral).startObservation(characteristic) + } + + override suspend fun stopObservation(characteristic: Characteristic) { + (this@observationHandler as AndroidPeripheral).stopObservation(characteristic) + } +} diff --git a/core/src/androidMain/kotlin/Observers.kt b/core/src/androidMain/kotlin/Observers.kt deleted file mode 100644 index fd663bbe2..000000000 --- a/core/src/androidMain/kotlin/Observers.kt +++ /dev/null @@ -1,153 +0,0 @@ -package com.juul.kable - -import com.juul.kable.logs.Logger -import com.juul.kable.logs.Logging -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.flow.onSubscription -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlin.coroutines.cancellation.CancellationException - -internal sealed class AndroidObservationEvent { - - abstract val characteristic: Characteristic - - data class CharacteristicChange( - override val characteristic: Characteristic, - val data: ByteArray, - ) : AndroidObservationEvent() - - data class Error( - override val characteristic: Characteristic, - val cause: Throwable, - ) : AndroidObservationEvent() -} - -/** - * Manages observations for the specified [peripheral]. - * - * The [characteristicChanges] property is expected to be fed with all characteristic changes associated with the - * [peripheral]. The changes are then fanned out to individual [Flow]s created via [acquire] (associated with a specific - * characteristic). - * - * For example, if you have a sequence of characteristic changes represented by characteristic A, B and C with their - * corresponding change uniquely identified by a change number postfix (in other words: characteristic A emitting 3 - * different changes would be represented as A1, A2 and A3): - * - * ``` - * .--- acquire(A) --> A1, A2, A3 - * .----------------------. / - * A1, B1, C1, A2, A3, B2 --> | characteristicChange | ----- acquire(B) --> B1, B2 - * '----------------------' \ - * '--- acquire(C) --> C1 - * ``` - * - * @param peripheral to perform notification actions against to enable/disable the observations. - */ -internal class Observers( - private val peripheral: AndroidPeripheral, - logging: Logging, -) { - - private val logger = Logger(logging, tag = "Kable/Observers", peripheral.bluetoothDevice.address) - - val characteristicChanges = MutableSharedFlow() - private val observations = Observations() - - fun acquire( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Flow = characteristicChanges - .onSubscription { - peripheral.suspendUntilAtLeast() - if (observations.add(characteristic, onSubscription) == 1) { - peripheral.startObservation(characteristic) - } - onSubscription() - } - .filter { - it.characteristic.characteristicUuid == characteristic.characteristicUuid && - it.characteristic.serviceUuid == characteristic.serviceUuid - } - .map { - when (it) { - is AndroidObservationEvent.Error -> throw it.cause - is AndroidObservationEvent.CharacteristicChange -> it.data - } - } - .onCompletion { - if (observations.remove(characteristic, onSubscription) == 0) { - try { - peripheral.stopObservation(characteristic) - } catch (e: NotReadyException) { - // Silently ignore as it is assumed that failure is due to connection drop, in which case Android - // will clear the notifications. - logger.debug { message = "Stop notification failure ignored." } - } - } - } - - suspend fun rewire() { - observations.forEach { characteristic, onSubscriptionActions -> - try { - peripheral.startObservation(characteristic) - onSubscriptionActions.forEach { it() } - } catch (cancellation: CancellationException) { - throw cancellation - } catch (t: Throwable) { - characteristicChanges.emit(AndroidObservationEvent.Error(characteristic, t)) - } - } - } -} - -private class Observations { - - private val lock = Mutex() - private val observations = mutableMapOf>() - - suspend inline fun forEach( - action: (Characteristic, List) -> Unit - ) = lock.withLock { - observations.forEach { (characteristic, onSubscriptionActions) -> - action(characteristic, onSubscriptionActions) - } - } - - suspend fun add( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Int = lock.withLock { - val actions = observations[characteristic] - if (actions == null) { - val newActions = mutableListOf(onSubscription) - observations[characteristic] = newActions - 1 - } else { - actions += onSubscription - actions.count() - } - } - - suspend fun remove( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Int = lock.withLock { - val actions = observations[characteristic] - when { - actions == null -> -1 // No previous observation existed for characteristic. - actions.count() == 1 -> { - observations -= characteristic - 0 - } - else -> { - actions -= onSubscription - actions.count() - } - } - } -} diff --git a/core/src/androidMain/kotlin/Peripheral.kt b/core/src/androidMain/kotlin/Peripheral.kt index b542da1f6..73a78d5e5 100644 --- a/core/src/androidMain/kotlin/Peripheral.kt +++ b/core/src/androidMain/kotlin/Peripheral.kt @@ -127,7 +127,7 @@ public enum class Priority { Low, Balanced, High } public class AndroidPeripheral internal constructor( parentCoroutineContext: CoroutineContext, - internal val bluetoothDevice: BluetoothDevice, + private val bluetoothDevice: BluetoothDevice, private val transport: Transport, private val phy: Phy, private val onServicesDiscovered: ServicesDiscoveredAction, @@ -136,8 +136,10 @@ public class AndroidPeripheral internal constructor( private val logger = Logger(logging, tag = "Kable/Peripheral", identifier = bluetoothDevice.address) + internal val platformIdentifier = bluetoothDevice.address + private val _state = MutableStateFlow(State.Disconnected()) - public override val state: Flow = _state.asStateFlow() + public override val state: StateFlow = _state.asStateFlow() private val receiver = registerBluetoothStateBroadcastReceiver { state -> if (state == STATE_OFF) { @@ -166,7 +168,7 @@ public class AndroidPeripheral internal constructor( */ public val mtu: StateFlow = _mtu.asStateFlow() - private val observers = Observers(this, logging) + private val observers = Observers(this, logging) @Volatile private var _platformServices: List? = null @@ -212,7 +214,7 @@ public class AndroidPeripheral internal constructor( onServicesDiscovered(ServicesDiscoveredPeripheral(this@AndroidPeripheral)) _state.value = State.Connecting.Observes logger.verbose { message = "Configuring characteristic observations" } - observers.rewire() + observers.onConnected() } catch (t: Throwable) { closeConnection() logger.error(t) { message = "Failed to connect" } @@ -226,6 +228,7 @@ public class AndroidPeripheral internal constructor( private fun closeConnection() { _connection?.close() _connection = null + // Avoid trampling existing `Disconnected` state (and its properties) by only updating if not already `Disconnected`. _state.update { previous -> previous as? State.Disconnected ?: State.Disconnected() } } @@ -381,19 +384,16 @@ public class AndroidPeripheral internal constructor( internal suspend fun stopObservation(characteristic: Characteristic) { val platformCharacteristic = platformServices.findCharacteristic(characteristic) + setConfigDescriptor(platformCharacteristic, enable = false) - try { - setConfigDescriptor(platformCharacteristic, enable = false) - } finally { - logger.debug { - message = "setCharacteristicNotification" - detail(characteristic) - detail("value", "false") - } - connection - .bluetoothGatt - .setCharacteristicNotification(platformCharacteristic, false) + logger.debug { + message = "setCharacteristicNotification" + detail(characteristic) + detail("value", "false") } + connection + .bluetoothGatt + .setCharacteristicNotification(platformCharacteristic, false) } private suspend fun setConfigDescriptor( @@ -506,3 +506,6 @@ private fun checkBluetoothAdapterState( throw BluetoothDisabledException("Bluetooth adapter state is $actualName ($actual), but $expectedName ($expected) was required.") } } + +internal actual val Peripheral.identifier: String + get() = (this as AndroidPeripheral).platformIdentifier diff --git a/core/src/appleMain/kotlin/Observations.kt b/core/src/appleMain/kotlin/Observations.kt new file mode 100644 index 000000000..9585dda2d --- /dev/null +++ b/core/src/appleMain/kotlin/Observations.kt @@ -0,0 +1,11 @@ +package com.juul.kable + +internal actual fun Peripheral.observationHandler(): Observation.Handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) { + (this@observationHandler as ApplePeripheral).startNotifications(characteristic) + } + + override suspend fun stopObservation(characteristic: Characteristic) { + (this@observationHandler as ApplePeripheral).stopNotifications(characteristic) + } +} diff --git a/core/src/appleMain/kotlin/Observers.kt b/core/src/appleMain/kotlin/Observers.kt deleted file mode 100644 index 9dba77ce1..000000000 --- a/core/src/appleMain/kotlin/Observers.kt +++ /dev/null @@ -1,158 +0,0 @@ -package com.juul.kable - -import co.touchlab.stately.ensureNeverFrozen -import co.touchlab.stately.isolate.IsolateState -import com.juul.kable.logs.Logger -import com.juul.kable.logs.detail -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.flow.onSubscription -import platform.Foundation.NSData -import kotlin.coroutines.cancellation.CancellationException - -internal sealed class AppleObservationEvent { - - abstract val characteristic: Characteristic - - data class CharacteristicChange( - override val characteristic: Characteristic, - val data: NSData, - ) : AppleObservationEvent() - - data class Error( - override val characteristic: Characteristic, - val cause: Throwable, - ) : AppleObservationEvent() -} - -/** - * Manages observations for the specified [peripheral]. - * - * The [characteristicChanges] property is expected to be fed with all characteristic changes associated with the - * [peripheral]. The changes are then fanned-out to individual [Flow]s created via [acquire] (associated with a specific - * characteristic). - * - * For example, if you have a sequence of characteristic changes represented by characteristic A, B and C with their - * corresponding change uniquely identified by a sequence number postfix (in other words: characteristic A emitting 3 - * different changes would be represented as A1, A2 and A3): - * - * ``` - * .--- acquire(A) --> A1, A2, A3 - * .-----------------------. / - * A1, B1, C1, A2, A3, B2 --> | characteristicChanges | ----- acquire(B) --> B1, B2 - * '-----------------------' \ - * '--- acquire(C) --> C1 - * ``` - * - * @param peripheral to perform notification actions against to enable/disable the observations. - */ -internal class Observers( - private val peripheral: ApplePeripheral, - private val logger: Logger, -) { - - val characteristicChanges = MutableSharedFlow() - private val observations = Observations() - - fun acquire( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Flow { - return characteristicChanges - .onSubscription { - peripheral.suspendUntilAtLeast() - if (observations.add(characteristic, onSubscription) == 1) { - peripheral.startNotifications(characteristic) - } - onSubscription() - } - .filter { - it.characteristic.characteristicUuid == characteristic.characteristicUuid && - it.characteristic.serviceUuid == characteristic.serviceUuid - } - .map { - when (it) { - is AppleObservationEvent.Error -> throw it.cause - is AppleObservationEvent.CharacteristicChange -> it.data - } - } - .onCompletion { - if (observations.remove(characteristic, onSubscription) == 0) { - try { - peripheral.stopNotifications(characteristic) - } catch (e: NotReadyException) { - // Silently ignore as it is assumed that failure is due to connection drop, in which case the - // system will clear the notifications. - logger.warn(e) { - message = "Stop notification failure ignored." - detail(characteristic) - } - } - } - } - } - - suspend fun rewire() { - observations.entries.forEach { (characteristic, observationStartedActions) -> - try { - peripheral.startNotifications(characteristic) - observationStartedActions.forEach { it() } - } catch (cancellation: CancellationException) { - throw cancellation - } catch (t: Throwable) { - characteristicChanges.emit(AppleObservationEvent.Error(characteristic, t)) - } - } - } -} - -private class Observations : IsolateState>>( - producer = { mutableMapOf() } -) { - - val entries: Map> - get() = access { - mutableMapOf>().also { copy -> - it.forEach { (key, value) -> - copy[key] = value.toList() - } - }.toMap() - } - - fun add( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction - ): Int = access { - val actions = it[characteristic] - if (actions == null) { - val newActions = mutableListOf(onSubscription) - newActions.ensureNeverFrozen() - it[characteristic] = newActions - 1 - } else { - actions += onSubscription - actions.count() - } - } - - fun remove( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction - ): Int = access { - val actions = it[characteristic] - when { - actions == null -> -1 // No previous observation existed for characteristic. - actions.count() == 1 -> { - it -= characteristic - 0 - } - else -> { - actions -= onSubscription - actions.count() - } - } - } -} diff --git a/core/src/appleMain/kotlin/Peripheral.kt b/core/src/appleMain/kotlin/Peripheral.kt index bd29c3278..d55c1a715 100644 --- a/core/src/appleMain/kotlin/Peripheral.kt +++ b/core/src/appleMain/kotlin/Peripheral.kt @@ -39,6 +39,7 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first @@ -100,7 +101,11 @@ public class ApplePeripheral internal constructor( private val logger = Logger(logging, identifier = cbPeripheral.identifier.UUIDString) private val _state = MutableStateFlow(State.Disconnected()) - override val state: Flow = _state.asStateFlow() + override val state: StateFlow = _state.asStateFlow() + + private val observers = Observers(this, logging) + + internal val platformIdentifier = cbPeripheral.identifier.UUIDString init { centralManager.delegate @@ -117,8 +122,6 @@ public class ApplePeripheral internal constructor( .launchIn(scope) } - private val observers = Observers(this, logger) - private val _platformServices = atomic?>(null) private val platformServices: List get() = checkNotNull(_platformServices.value) { @@ -164,7 +167,7 @@ public class ApplePeripheral internal constructor( .takeWhile { it !== Closed } .mapNotNull { it as? Data } .map { - AppleObservationEvent.CharacteristicChange( + ObservationEvent.CharacteristicChange( characteristic = it.cbCharacteristic.toLazyCharacteristic(), data = it.data ) @@ -180,7 +183,7 @@ public class ApplePeripheral internal constructor( _state.value = State.Connecting.Observes logger.verbose { message = "Configuring characteristic observations" } - observers.rewire() + observers.onConnected() } catch (t: Throwable) { logger.error(t) { message = "Failed to connect" } withContext(NonCancellable) { @@ -408,3 +411,6 @@ private fun NSError.toStatus(): State.Disconnected.Status = when (code) { CBErrorEncryptionTimedOut -> EncryptionTimedOut else -> Unknown(code.toInt()) } + +internal actual val Peripheral.identifier: String + get() = (this as ApplePeripheral).platformIdentifier diff --git a/core/src/commonMain/kotlin/Exceptions.kt b/core/src/commonMain/kotlin/Exceptions.kt index 885ea5ffe..db4f277cc 100644 --- a/core/src/commonMain/kotlin/Exceptions.kt +++ b/core/src/commonMain/kotlin/Exceptions.kt @@ -23,6 +23,11 @@ public expect open class IOException internal constructor( cause: Throwable? = null, ) : Exception +public open class NotConnectedException internal constructor( + message: String? = null, + cause: Throwable? = null, +) : IOException(message, cause) + public class ConnectionRejectedException internal constructor( message: String? = null, cause: Throwable? = null, @@ -31,7 +36,7 @@ public class ConnectionRejectedException internal constructor( public class NotReadyException internal constructor( message: String? = null, cause: Throwable? = null, -) : IOException(message, cause) +) : NotConnectedException(message, cause) public class GattStatusException internal constructor( message: String? = null, @@ -41,4 +46,4 @@ public class GattStatusException internal constructor( public class ConnectionLostException internal constructor( message: String? = null, cause: Throwable? = null, -) : IOException(message, cause) +) : NotConnectedException(message, cause) diff --git a/core/src/commonMain/kotlin/Observation.kt b/core/src/commonMain/kotlin/Observation.kt new file mode 100644 index 000000000..cb7eef187 --- /dev/null +++ b/core/src/commonMain/kotlin/Observation.kt @@ -0,0 +1,95 @@ +package com.juul.kable + +import com.juul.kable.State.Connecting.Observes +import com.juul.kable.logs.Logger +import com.juul.kable.logs.Logging +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +internal class Observation( + private val state: StateFlow, + private val handler: Handler, + private val characteristic: Characteristic, + logging: Logging, + identifier: String, + private val subscribers: MutableList = mutableListOf(), +) { + + interface Handler { + suspend fun startObservation(characteristic: Characteristic) + suspend fun stopObservation(characteristic: Characteristic) + } + + private val logger = Logger(logging, tag = "Kable/Observation", identifier) + + private val mutex = Mutex() + + private val _didStartObservation = atomic(false) + private var didStartObservation: Boolean + get() = _didStartObservation.value + set(value) { _didStartObservation.value = value } + + private val isConnected: Boolean + get() = state.value.isAtLeast() + + private val hasSubscribers: Boolean + get() = subscribers.isNotEmpty() + + suspend fun onSubscription(action: OnSubscriptionAction) = mutex.withLock { + subscribers += action + val shouldStartObservation = !didStartObservation && hasSubscribers && isConnected + if (shouldStartObservation) { + suppressConnectionExceptions { + startObservation() + action() + } + } + } + + suspend fun onCompletion(action: OnSubscriptionAction) = mutex.withLock { + subscribers -= action + val shouldStopObservation = didStartObservation && !hasSubscribers && isConnected + if (shouldStopObservation) stopObservation() + } + + suspend fun onConnected() = mutex.withLock { + if (hasSubscribers && isConnected) { + suppressConnectionExceptions { + startObservation() + subscribers.forEach { it() } + } + } + } + + private suspend fun startObservation() { + handler.startObservation(characteristic) + didStartObservation = true + } + + private suspend fun stopObservation() { + suppressConnectionExceptions { + handler.stopObservation(characteristic) + } + didStartObservation = false + } + + /** + * While spinning up or down an observation the connection may drop, resulting in an unnecessary connection related + * exception being thrown. + * + * Since it is assumed that observations are automatically cleared on disconnect, these exceptions can be ignored, + * as the corresponding [action] will be rendered unnecessary (clearing an observation is not needed if connection + * has been lost, or [action] will be re-attempted on [reconnect][onConnected]). + */ + private inline fun suppressConnectionExceptions(action: () -> Unit) { + try { + action.invoke() + } catch (e: NotConnectedException) { + logger.verbose { message = "Suppressed failure: $e" } + } catch (e: BluetoothException) { + logger.verbose { message = "Suppressed failure: $e" } + } + } +} diff --git a/core/src/commonMain/kotlin/ObservationEvent.kt b/core/src/commonMain/kotlin/ObservationEvent.kt new file mode 100644 index 000000000..c066ff22f --- /dev/null +++ b/core/src/commonMain/kotlin/ObservationEvent.kt @@ -0,0 +1,25 @@ +package com.juul.kable + +internal sealed class ObservationEvent { + + abstract val characteristic: Characteristic + + data class CharacteristicChange( + override val characteristic: Characteristic, + val data: T, + ) : ObservationEvent() + + data class Error( + override val characteristic: Characteristic, + val cause: Throwable, + ) : ObservationEvent() +} + +internal fun dematerialize(event: ObservationEvent): T = when (event) { + is ObservationEvent.Error -> throw event.cause + is ObservationEvent.CharacteristicChange -> event.data +} + +internal fun ObservationEvent.isAssociatedWith(characteristic: Characteristic): Boolean = + this.characteristic.characteristicUuid == characteristic.characteristicUuid && + this.characteristic.serviceUuid == characteristic.serviceUuid diff --git a/core/src/commonMain/kotlin/Observers.kt b/core/src/commonMain/kotlin/Observers.kt new file mode 100644 index 000000000..46ea3d7ed --- /dev/null +++ b/core/src/commonMain/kotlin/Observers.kt @@ -0,0 +1,101 @@ +package com.juul.kable + +import co.touchlab.stately.collections.IsoMutableList +import co.touchlab.stately.isolate.IsolateState +import com.juul.kable.logs.Logging +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onSubscription +import kotlin.coroutines.cancellation.CancellationException + +internal expect fun Peripheral.observationHandler(): Observation.Handler + +/** + * Manages observations for the specified [peripheral]. + * + * The [characteristicChanges] property is expected to be fed with all characteristic changes associated with the + * [peripheral]. The changes are then fanned out to individual [Flow]s created via [acquire] (associated with a specific + * characteristic). + * + * For example, if you have a sequence of characteristic changes represented by characteristic A, B and C with their + * corresponding change uniquely identified by a change number postfix (in other words: characteristic A emitting 3 + * different changes would be represented as A1, A2 and A3): + * + * ``` + * .--- acquire(A) --> A1, A2, A3 + * .-----------------------. / + * A1, B1, C1, A2, A3, B2 --> | characteristicChanges | ----- acquire(B) --> B1, B2 + * '-----------------------' \ + * '--- acquire(C) --> C1 + * ``` + * + * @param peripheral to perform notification actions against to enable/disable the observations. + */ +internal class Observers( + private val peripheral: Peripheral, + private val logging: Logging, + extraBufferCapacity: Int = 0, +) { + + val characteristicChanges = MutableSharedFlow>(extraBufferCapacity = extraBufferCapacity) + private val observations = Observations() + + fun acquire( + characteristic: Characteristic, + onSubscription: OnSubscriptionAction, + ): Flow { + val state = peripheral.state + val handler = peripheral.observationHandler() + val identifier = peripheral.identifier + + // `IsoMutableList` created outside of `getOrPut`, because it would deadlock on Native if created in + // `Observation` constructor. + val subscribers = IsoMutableList() + + val observation = observations.getOrPut(characteristic) { + Observation(state, handler, characteristic, logging, identifier, subscribers) + } + + return characteristicChanges + .onSubscription { observation.onSubscription(onSubscription) } + .filter { event -> event.isAssociatedWith(characteristic) } + .map(::dematerialize) + .onCompletion { observation.onCompletion(onSubscription) } + } + + suspend fun onConnected() { + observations.entries.forEach { (characteristic, observation) -> + // Pipe failures to `characteristicChanges` while honoring in-flight connection cancellations. + try { + observation.onConnected() + } catch (cancellation: CancellationException) { + throw cancellation + } catch (e: Exception) { + characteristicChanges.emit(ObservationEvent.Error(characteristic, e)) + } + } + } +} + +private class Observations : IsolateState>( + producer = { mutableMapOf() } +) { + + val entries: List> + get() = access { observations -> + // `map` used as a means to copy entries, to prevent freeze exceptions on Native. + observations.entries.map { (characteristic, observation) -> + characteristic to observation + } + } + + fun getOrPut( + characteristic: Characteristic, + defaultValue: () -> Observation + ): Observation = access { observations -> + observations.getOrPut(characteristic, defaultValue) + } +} diff --git a/core/src/commonMain/kotlin/Peripheral.kt b/core/src/commonMain/kotlin/Peripheral.kt index e2c7167b4..19048a8c8 100644 --- a/core/src/commonMain/kotlin/Peripheral.kt +++ b/core/src/commonMain/kotlin/Peripheral.kt @@ -6,6 +6,7 @@ package com.juul.kable import com.juul.kable.WriteType.WithoutResponse import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.onEach import kotlin.coroutines.cancellation.CancellationException @@ -58,7 +59,7 @@ public interface Peripheral { * '---------------' '--------------' * ``` */ - public val state: Flow + public val state: StateFlow /** * Initiates a connection, suspending until connected, or failure occurs. Multiple concurrent invocations will all @@ -184,3 +185,5 @@ internal suspend inline fun Peripheral.suspendUntilOrThrow() .onEach { if (it is State.Disconnected) throw ConnectionLostException() } .first { it is T } } + +internal expect val Peripheral.identifier: String diff --git a/core/src/commonTest/kotlin/ObservationTest.kt b/core/src/commonTest/kotlin/ObservationTest.kt new file mode 100644 index 000000000..95e3eba4a --- /dev/null +++ b/core/src/commonTest/kotlin/ObservationTest.kt @@ -0,0 +1,452 @@ +package com.juul.kable + +import com.benasher44.uuid.uuid4 +import com.juul.kable.State.Connected +import com.juul.kable.State.Connecting +import com.juul.kable.State.Disconnected +import com.juul.kable.logs.LogEngine +import com.juul.kable.logs.Logging +import com.juul.kable.logs.Logging.Format.Compact +import com.juul.kable.logs.Logging.Level.Data +import com.juul.tuulbox.logging.ConsoleLogger +import com.juul.tuulbox.logging.Log +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.test.runTest +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse + +private fun generateCharacteristic() = characteristicOf( + service = uuid4().toString(), + characteristic = uuid4().toString(), +) + +@OptIn(ExperimentalCoroutinesApi::class) +class ObservationTest { + + private val logging = Logging().apply { + level = Data + } + + @BeforeTest + fun setup() { + Log.dispatcher.install(ConsoleLogger) + } + + @AfterTest + fun tearDown() { + Log.dispatcher.clear() + } + + @Test + fun manySubscribers_startsObservationOnce() = runTest { + val state = MutableStateFlow(Connected) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + + repeat(10) { + observation.onSubscription { } + } + counter.assert( + startCount = 1, + stopCount = 0, + ) + } + + @Test + fun subscribersGoesToZero_stopsObservationOnce() = runTest { + val state = MutableStateFlow(Disconnected()) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + val onSubscriptionActions = List(10) { suspend { } } + + state.value = Connected + onSubscriptionActions.forEach { action -> + observation.onSubscription(action) + } + counter.assert( + startCount = 1, + stopCount = 0, + ) + + onSubscriptionActions.forEach { action -> + observation.onCompletion(action) + } + counter.assert( + startCount = 1, + stopCount = 1, + ) + } + + @Test + fun subscribersGoesToZero_whileDisconnected_doesNotStopObservation() = runTest { + val state = MutableStateFlow(Connected) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + val onSubscriptionActions = List(10) { suspend { } } + + onSubscriptionActions.forEach { action -> + observation.onSubscription(action) + } + onSubscriptionActions.take(5).forEach { action -> + observation.onCompletion(action) + } + + state.value = Disconnected() + + onSubscriptionActions.drop(5).forEach { action -> + observation.onCompletion(action) + } + counter.assert( + startCount = 1, + stopCount = 0, + ) + } + + @Test + fun hasSubscribers_reconnects_reObservesOnce() = runTest { + val state = MutableStateFlow(Connected) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + + repeat(10) { + observation.onSubscription { } + } + counter.assert( + startCount = 1, + stopCount = 0, + ) + + // Simulate reconnect. + state.value = Connecting.Observes + observation.onConnected() + counter.assert( + startCount = 2, + stopCount = 0, + ) + } + + @Test + fun addingSubscribersDuringConnect_startsObserveOnce() = runTest { + val state = MutableStateFlow(Disconnected()) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + + repeat(5) { + observation.onSubscription { } + } + counter.assert( + startCount = 0, + stopCount = 0, + ) + + state.value = Connecting.Observes + observation.onConnected() + counter.assert( + startCount = 1, + stopCount = 0, + ) + + // Simulate additional subscribers before state has been updated to `Connected`. + repeat(5) { + observation.onSubscription { } + } + state.value = Connected + repeat(5) { + observation.onSubscription { } + } + + counter.assert( + startCount = 1, + stopCount = 0, + ) + } + + @Test + fun noSubscribers_onConnected_doesNotStartObservation() = runTest { + val state = MutableStateFlow(Disconnected()) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + + state.value = Connecting.Observes + repeat(10) { + observation.onConnected() + } + + counter.assert( + startCount = 0, + stopCount = 0, + ) + } + + @Test + fun notConnected_attemptToStartObservation_actionIsNotExecuted() = runTest { + val state = MutableStateFlow(Disconnected()) + val handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) = + throw NotConnectedException() + + override suspend fun stopObservation(characteristic: Characteristic) {} + } + val characteristic = generateCharacteristic() + val logEngine = RecordingLogEngine() + val logging = Logging().apply { + level = Data + format = Compact + engine = logEngine + } + val identifier = "test" + val observation = Observation(state, handler, characteristic, logging, identifier) + + state.value = Connecting.Observes + var didExecuteAction = false + observation.onSubscription { + didExecuteAction = true + } + assertFalse(didExecuteAction) + + assertEquals( + expected = listOf( + RecordingLogEngine.Record.Verbose( + throwable = null, + tag = "Kable/Observation", + message = "$identifier Suppressed failure: ${NotConnectedException()}", + ) + ), + actual = logEngine.records.toList(), + ) + } + + @Test + fun onConnectedWithSubscriber_multipleTimes_startsObservationMultipleTimes() = runTest { + val state = MutableStateFlow(Disconnected()) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + + observation.onSubscription { } + counter.assert( + startCount = 0, + stopCount = 0, + ) + + // Simulate numerous reconnects. + state.value = Connecting.Observes + repeat(10) { + observation.onConnected() + } + + counter.assert( + startCount = 10, + stopCount = 0, + ) + } + + @Test + fun connectionDropsWhileConnecting_doesNotThrow() = runTest { + val state = MutableStateFlow(Disconnected()) + val characteristic = generateCharacteristic() + val handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) = + throw NotConnectedException() + + override suspend fun stopObservation(characteristic: Characteristic) = + throw NotConnectedException() + } + val logEngine = RecordingLogEngine() + val logging = Logging().apply { + level = Data + format = Compact + engine = logEngine + } + val identifier = "test" + val observation = Observation(state, handler, characteristic, logging, identifier) + + observation.onSubscription { } + state.value = Connecting.Observes + observation.onConnected() + + assertEquals( + expected = listOf( + RecordingLogEngine.Record.Verbose( + throwable = null, + tag = "Kable/Observation", + message = "$identifier Suppressed failure: ${NotConnectedException()}", + ) + ), + actual = logEngine.records.toList() + ) + } + + @Test + fun failureDuringStartObservation_propagates() = runTest { + val state = MutableStateFlow(Disconnected()) + val characteristic = generateCharacteristic() + val handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) = error("start") + override suspend fun stopObservation(characteristic: Characteristic) {} + } + val observation = Observation(state, handler, characteristic, logging, identifier = "test") + + observation.onSubscription { } + state.value = Connecting.Observes + val failure = assertFailsWith { + observation.onConnected() + } + assertEquals( + expected = "start", + actual = failure.message, + ) + } + + @Test + fun failureDuringStopObservation_propagates() = runTest { + val state = MutableStateFlow(Connected) + val characteristic = generateCharacteristic() + val handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) {} + override suspend fun stopObservation(characteristic: Characteristic) = error("stop") + } + val observation = Observation(state, handler, characteristic, logging, identifier = "test") + + val onSubscriptionAction = suspend { } + observation.onSubscription(onSubscriptionAction) + val failure = assertFailsWith { + observation.onCompletion(onSubscriptionAction) + } + assertEquals( + expected = "stop", + actual = failure.message, + ) + } + + @Test + fun failureInSubscriptionAction_propagates() = runTest { + val state = MutableStateFlow(Connected) + val characteristic = generateCharacteristic() + val counter = ObservationCounter(characteristic) + val observation = Observation(state, counter, characteristic, logging, identifier = "test") + + val onSubscriptionAction = suspend { error("action") } + val failure = assertFailsWith { + observation.onSubscription(onSubscriptionAction) + } + assertEquals( + expected = "action", + actual = failure.message, + ) + } +} + +private class ObservationCounter( + private val characteristic: Characteristic, +) : Observation.Handler { + + var startCount = 0 + var stopCount = 0 + + override suspend fun startObservation(characteristic: Characteristic) { + if (this.characteristic == characteristic) startCount++ + } + + override suspend fun stopObservation(characteristic: Characteristic) { + if (this.characteristic == characteristic) stopCount++ + } + + fun assert( + startCount: Int, + stopCount: Int, + ) { + assertEquals( + expected = startCount, + actual = this.startCount, + message = "Start observation count", + ) + assertEquals( + expected = stopCount, + actual = this.stopCount, + message = "Stop observation count", + ) + } +} + +private class RecordingLogEngine : LogEngine { + + val records = mutableListOf() + + sealed class Record { + abstract val throwable: Throwable? + abstract val tag: String + abstract val message: String + + data class Verbose( + override val throwable: Throwable?, + override val tag: String, + override val message: String, + ) : Record() + + data class Debug( + override val throwable: Throwable?, + override val tag: String, + override val message: String, + ) : Record() + + data class Info( + override val throwable: Throwable?, + override val tag: String, + override val message: String, + ) : Record() + + data class Warn( + override val throwable: Throwable?, + override val tag: String, + override val message: String, + ) : Record() + + data class Error( + override val throwable: Throwable?, + override val tag: String, + override val message: String, + ) : Record() + + data class Assert( + override val throwable: Throwable?, + override val tag: String, + override val message: String, + ) : Record() + } + + override fun verbose(throwable: Throwable?, tag: String, message: String) { + records += Record.Verbose(throwable, tag, message) + } + + override fun debug(throwable: Throwable?, tag: String, message: String) { + records += Record.Debug(throwable, tag, message) + } + + override fun info(throwable: Throwable?, tag: String, message: String) { + records += Record.Info(throwable, tag, message) + } + + override fun warn(throwable: Throwable?, tag: String, message: String) { + records += Record.Warn(throwable, tag, message) + } + + override fun error(throwable: Throwable?, tag: String, message: String) { + records += Record.Error(throwable, tag, message) + } + + override fun assert(throwable: Throwable?, tag: String, message: String) { + records += Record.Assert(throwable, tag, message) + } +} diff --git a/core/src/jsMain/kotlin/Observations.kt b/core/src/jsMain/kotlin/Observations.kt new file mode 100644 index 000000000..71e6d030f --- /dev/null +++ b/core/src/jsMain/kotlin/Observations.kt @@ -0,0 +1,11 @@ +package com.juul.kable + +internal actual fun Peripheral.observationHandler(): Observation.Handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) { + (this@observationHandler as JsPeripheral).startObservation(characteristic) + } + + override suspend fun stopObservation(characteristic: Characteristic) { + (this@observationHandler as JsPeripheral).stopObservation(characteristic) + } +} diff --git a/core/src/jsMain/kotlin/Observers.kt b/core/src/jsMain/kotlin/Observers.kt deleted file mode 100644 index f4035279d..000000000 --- a/core/src/jsMain/kotlin/Observers.kt +++ /dev/null @@ -1,112 +0,0 @@ -package com.juul.kable - -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.flow.onSubscription -import org.khronos.webgl.DataView -import kotlin.coroutines.cancellation.CancellationException - -internal sealed class JsObservationEvent { - - abstract val characteristic: Characteristic - - data class CharacteristicChange( - override val characteristic: Characteristic, - val data: DataView, - ) : JsObservationEvent() - - data class Error( - override val characteristic: Characteristic, - val cause: Throwable, - ) : JsObservationEvent() -} - -internal class Observers( - private val peripheral: JsPeripheral, -) { - - val characteristicChanges = MutableSharedFlow(extraBufferCapacity = 64) - private val observations = Observations() - - fun acquire( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Flow = characteristicChanges - .onSubscription { - peripheral.suspendUntilAtLeast() - if (observations.add(characteristic, onSubscription) == 1) { - peripheral.startObservation(characteristic) - } - onSubscription() - } - .filter { - it.characteristic.characteristicUuid == characteristic.characteristicUuid && - it.characteristic.serviceUuid == characteristic.serviceUuid - } - .map { - when (it) { - is JsObservationEvent.Error -> throw it.cause - is JsObservationEvent.CharacteristicChange -> it.data - } - } - .onCompletion { - if (observations.remove(characteristic, onSubscription) == 0) { - peripheral.stopObservation(characteristic) - } - } - - suspend fun rewire() { - observations.entries.forEach { (characteristic, onSubscriptionActions) -> - try { - peripheral.startObservation(characteristic) - onSubscriptionActions.forEach { it() } - } catch (cancellation: CancellationException) { - throw cancellation - } catch (t: Throwable) { - characteristicChanges.emit(JsObservationEvent.Error(characteristic, t)) - } - } - } -} - -private class Observations { - - private val observations = mutableMapOf>() - val entries get() = observations.entries - - fun add( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Int { - val actions = observations[characteristic] - return if (actions == null) { - val newActions = mutableListOf(onSubscription) - observations[characteristic] = newActions - 1 - } else { - actions += onSubscription - actions.count() - } - } - - fun remove( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Int { - val actions = observations[characteristic] - return when { - actions == null -> -1 // No previous observation existed for characteristic. - actions.count() == 1 -> { - observations -= characteristic - 0 - } - else -> { - actions -= onSubscription - actions.count() - } - } - } -} diff --git a/core/src/jsMain/kotlin/Peripheral.kt b/core/src/jsMain/kotlin/Peripheral.kt index 624313781..12a1280bd 100644 --- a/core/src/jsMain/kotlin/Peripheral.kt +++ b/core/src/jsMain/kotlin/Peripheral.kt @@ -21,6 +21,7 @@ import kotlinx.coroutines.async import kotlinx.coroutines.await import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.update @@ -75,8 +76,10 @@ public class JsPeripheral internal constructor( private val ioLock = Mutex() + internal val platformIdentifier = bluetoothDevice.id + private val _state = MutableStateFlow(State.Disconnected()) - public override val state: Flow = _state.asStateFlow() + public override val state: StateFlow = _state.asStateFlow() private var _platformServices: List? = null private val platformServices: List @@ -138,7 +141,7 @@ public class JsPeripheral internal constructor( onServicesDiscovered(ServicesDiscoveredPeripheral(this@JsPeripheral)) _state.value = State.Connecting.Observes logger.verbose { message = "Configuring characteristic observations" } - observers.rewire() + observers.onConnected() } catch (t: Throwable) { logger.error(t) { message = "Failed to connect" } disconnectGatt() @@ -261,7 +264,7 @@ public class JsPeripheral internal constructor( .buffer .toByteArray() - private val observers = Observers(this) + private val observers = Observers(this, logging, extraBufferCapacity = 64) public fun observeDataView( characteristic: Characteristic, @@ -345,7 +348,7 @@ public class JsPeripheral internal constructor( detail(this@createListener) detail(data) } - val characteristicChange = JsObservationEvent.CharacteristicChange(this, data) + val characteristicChange = ObservationEvent.CharacteristicChange(this, data) if (!observers.characteristicChanges.tryEmit(characteristicChange)) console.error("Failed to emit $characteristicChange") @@ -372,3 +375,6 @@ public class JsPeripheral internal constructor( override fun toString(): String = "Peripheral(bluetoothDevice=${bluetoothDevice.string()})" } + +internal actual val Peripheral.identifier: String + get() = (this as JsPeripheral).platformIdentifier diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d20d7e972..613445ed1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -5,6 +5,7 @@ atomicfu = "0.17.0" coroutines = "1.6.0" extensions = "1.0.1-pre.290-kotlin-1.6.10" kotlin = "1.6.10" +tuulbox = "5.1.0" [libraries] androidx-startup = { module = "androidx.startup:startup-runtime", version = "1.1.0" } @@ -12,8 +13,10 @@ atomicfu = { module = "org.jetbrains.kotlinx:atomicfu-jvm", version.ref = "atomi kotlin-extensions = { module = "org.jetbrains.kotlin-wrappers:kotlin-extensions", version.ref = "extensions" } kotlinx-coroutines-android = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-android", version.ref = "coroutines" } kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" } -stately = { module = "co.touchlab:stately-isolate", version = "1.2.1" } -tuulbox-logging = { module = "com.juul.tuulbox:logging", version = "5.1.0" } +kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "coroutines" } +stately-collections = { module = "co.touchlab:stately-iso-collections", version = "1.2.1" } +tuulbox-logging = { module = "com.juul.tuulbox:logging", version.ref = "tuulbox" } +tuulbox-collections = { module = "com.juul.tuulbox:collections", version.ref = "tuulbox" } uuid = { module = "com.benasher44:uuid", version = "0.4.0" } [plugins]