From e6dd33ea47ab7f1d290352f57e422caca9b1385c Mon Sep 17 00:00:00 2001 From: Travis Wyatt Date: Fri, 19 Nov 2021 14:28:09 -0800 Subject: [PATCH] Begin refactor of `observe` handling --- core/build.gradle.kts | 1 + core/src/androidMain/kotlin/Observers.kt | 118 +++++++-------------- core/src/androidMain/kotlin/Peripheral.kt | 29 ++--- core/src/commonMain/kotlin/Exceptions.kt | 9 +- core/src/commonMain/kotlin/Observations.kt | 100 +++++++++++++++++ 5 files changed, 163 insertions(+), 94 deletions(-) create mode 100644 core/src/commonMain/kotlin/Observations.kt diff --git a/core/build.gradle.kts b/core/build.gradle.kts index ead889f31..7062217c8 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -34,6 +34,7 @@ kotlin { dependencies { api(coroutines()) api(uuid()) + implementation(tuulbox("collections")) } } diff --git a/core/src/androidMain/kotlin/Observers.kt b/core/src/androidMain/kotlin/Observers.kt index fd663bbe2..f08120d75 100644 --- a/core/src/androidMain/kotlin/Observers.kt +++ b/core/src/androidMain/kotlin/Observers.kt @@ -1,15 +1,14 @@ package com.juul.kable -import com.juul.kable.logs.Logger import com.juul.kable.logs.Logging +import com.juul.tuulbox.collections.synchronizedMapOf import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onSubscription -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlin.coroutines.cancellation.CancellationException internal sealed class AndroidObservationEvent { @@ -50,104 +49,65 @@ internal sealed class AndroidObservationEvent { */ internal class Observers( private val peripheral: AndroidPeripheral, - logging: Logging, + private val state: StateFlow, + private val logging: Logging, ) { - private val logger = Logger(logging, tag = "Kable/Observers", peripheral.bluetoothDevice.address) - val characteristicChanges = MutableSharedFlow() - private val observations = Observations() + private val observations = synchronizedMapOf() fun acquire( characteristic: Characteristic, onSubscription: OnSubscriptionAction, - ): Flow = characteristicChanges - .onSubscription { - peripheral.suspendUntilAtLeast() - if (observations.add(characteristic, onSubscription) == 1) { - peripheral.startObservation(characteristic) - } - onSubscription() - } - .filter { - it.characteristic.characteristicUuid == characteristic.characteristicUuid && - it.characteristic.serviceUuid == characteristic.serviceUuid - } - .map { - when (it) { - is AndroidObservationEvent.Error -> throw it.cause - is AndroidObservationEvent.CharacteristicChange -> it.data + ): Flow { + val handler = peripheral.observationHandler() + val identifier = peripheral.bluetoothDevice.address + val observation = observations.synchronized { + getOrPut(characteristic) { + Observation(state, handler, characteristic, logging, identifier) } } - .onCompletion { - if (observations.remove(characteristic, onSubscription) == 0) { - try { - peripheral.stopObservation(characteristic) - } catch (e: NotReadyException) { - // Silently ignore as it is assumed that failure is due to connection drop, in which case Android - // will clear the notifications. - logger.debug { message = "Stop notification failure ignored." } - } + + return characteristicChanges + .onSubscription { observation.onSubscription(onSubscription) } + .filter { + it.characteristic.characteristicUuid == characteristic.characteristicUuid && + it.characteristic.serviceUuid == characteristic.serviceUuid } - } + .map(::dematerialize) + .onCompletion { observation.onCompletion(onSubscription) } + } - suspend fun rewire() { - observations.forEach { characteristic, onSubscriptionActions -> + suspend fun onConnected() { + observations.entries.forEach { (characteristic, observation) -> try { - peripheral.startObservation(characteristic) - onSubscriptionActions.forEach { it() } + observation.onConnected() } catch (cancellation: CancellationException) { throw cancellation - } catch (t: Throwable) { - characteristicChanges.emit(AndroidObservationEvent.Error(characteristic, t)) + } catch (e: Exception) { + characteristicChanges.emit(AndroidObservationEvent.Error(characteristic, e)) } } } -} - -private class Observations { - private val lock = Mutex() - private val observations = mutableMapOf>() - - suspend inline fun forEach( - action: (Characteristic, List) -> Unit - ) = lock.withLock { - observations.forEach { (characteristic, onSubscriptionActions) -> - action(characteristic, onSubscriptionActions) + fun onConnectionLost() { + observations.entries.forEach { (_, observation) -> + observation.onConnectionLost() } } +} - suspend fun add( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Int = lock.withLock { - val actions = observations[characteristic] - if (actions == null) { - val newActions = mutableListOf(onSubscription) - observations[characteristic] = newActions - 1 - } else { - actions += onSubscription - actions.count() - } +private fun dematerialize(event: AndroidObservationEvent): ByteArray = when (event) { + is AndroidObservationEvent.Error -> throw event.cause + is AndroidObservationEvent.CharacteristicChange -> event.data +} + +private fun AndroidPeripheral.observationHandler(): Observation.Handler = object : Observation.Handler { + override suspend fun startObservation(characteristic: Characteristic) { + this@observationHandler.startObservation(characteristic) } - suspend fun remove( - characteristic: Characteristic, - onSubscription: OnSubscriptionAction, - ): Int = lock.withLock { - val actions = observations[characteristic] - when { - actions == null -> -1 // No previous observation existed for characteristic. - actions.count() == 1 -> { - observations -= characteristic - 0 - } - else -> { - actions -= onSubscription - actions.count() - } - } + override suspend fun stopObservation(characteristic: Characteristic) { + this@observationHandler.stopObservation(characteristic) } } diff --git a/core/src/androidMain/kotlin/Peripheral.kt b/core/src/androidMain/kotlin/Peripheral.kt index 8c970d912..5710e60a4 100644 --- a/core/src/androidMain/kotlin/Peripheral.kt +++ b/core/src/androidMain/kotlin/Peripheral.kt @@ -41,6 +41,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.update import kotlinx.coroutines.withContext @@ -142,6 +143,7 @@ public class AndroidPeripheral internal constructor( private val receiver = registerBluetoothStateBroadcastReceiver { state -> if (state == STATE_OFF) { closeConnection() + observers.onConnectionLost() _state.value = State.Disconnected() } } @@ -163,7 +165,7 @@ public class AndroidPeripheral internal constructor( */ public val mtu: StateFlow = _mtu.asStateFlow() - private val observers = Observers(this, logging) + private val observers = Observers(this, _state, logging) @Volatile private var _platformServices: List? = null @@ -201,6 +203,7 @@ public class AndroidPeripheral internal constructor( connection .characteristicChanges .onEach(observers.characteristicChanges::emit) + .onCompletion { observers.onConnectionLost() } .launchIn(scope, start = UNDISPATCHED) suspendUntilOrThrow() @@ -208,7 +211,7 @@ public class AndroidPeripheral internal constructor( onServicesDiscovered(ServicesDiscoveredPeripheral(this@AndroidPeripheral)) _state.value = State.Connecting.Observes logger.verbose { message = "Configuring characteristic observations" } - observers.rewire() + observers.onConnected() } catch (t: Throwable) { closeConnection() logger.error(t) { message = "Failed to connect" } @@ -222,6 +225,9 @@ public class AndroidPeripheral internal constructor( private fun closeConnection() { _connection?.close() _connection = null + + observers.onConnectionLost() + // Avoid trampling existing `Disconnected` state (and its properties) by only updating if not already `Disconnected`. _state.update { previous -> previous as? State.Disconnected ?: State.Disconnected() } } @@ -377,19 +383,16 @@ public class AndroidPeripheral internal constructor( internal suspend fun stopObservation(characteristic: Characteristic) { val platformCharacteristic = platformServices.findCharacteristic(characteristic) + setConfigDescriptor(platformCharacteristic, enable = false) - try { - setConfigDescriptor(platformCharacteristic, enable = false) - } finally { - logger.debug { - message = "setCharacteristicNotification" - detail(characteristic) - detail("value", "false") - } - connection - .bluetoothGatt - .setCharacteristicNotification(platformCharacteristic, false) + logger.debug { + message = "setCharacteristicNotification" + detail(characteristic) + detail("value", "false") } + connection + .bluetoothGatt + .setCharacteristicNotification(platformCharacteristic, false) } private suspend fun setConfigDescriptor( diff --git a/core/src/commonMain/kotlin/Exceptions.kt b/core/src/commonMain/kotlin/Exceptions.kt index 885ea5ffe..db4f277cc 100644 --- a/core/src/commonMain/kotlin/Exceptions.kt +++ b/core/src/commonMain/kotlin/Exceptions.kt @@ -23,6 +23,11 @@ public expect open class IOException internal constructor( cause: Throwable? = null, ) : Exception +public open class NotConnectedException internal constructor( + message: String? = null, + cause: Throwable? = null, +) : IOException(message, cause) + public class ConnectionRejectedException internal constructor( message: String? = null, cause: Throwable? = null, @@ -31,7 +36,7 @@ public class ConnectionRejectedException internal constructor( public class NotReadyException internal constructor( message: String? = null, cause: Throwable? = null, -) : IOException(message, cause) +) : NotConnectedException(message, cause) public class GattStatusException internal constructor( message: String? = null, @@ -41,4 +46,4 @@ public class GattStatusException internal constructor( public class ConnectionLostException internal constructor( message: String? = null, cause: Throwable? = null, -) : IOException(message, cause) +) : NotConnectedException(message, cause) diff --git a/core/src/commonMain/kotlin/Observations.kt b/core/src/commonMain/kotlin/Observations.kt new file mode 100644 index 000000000..2fdb10a22 --- /dev/null +++ b/core/src/commonMain/kotlin/Observations.kt @@ -0,0 +1,100 @@ +package com.juul.kable + +import com.juul.kable.State.Connected +import com.juul.kable.logs.Logger +import com.juul.kable.logs.Logging +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +internal class Observation( + private val state: StateFlow, + private val handler: Handler, + private val characteristic: Characteristic, + logging: Logging, + identifier: String, +) { + + interface Handler { + suspend fun startObservation(characteristic: Characteristic) + suspend fun stopObservation(characteristic: Characteristic) + } + + private val logger = Logger(logging, tag = "Kable/Observation", identifier) + + private val mutex = Mutex() + private val subscribers = mutableListOf() + private val isObservationEnabled = atomic(false) + + private val isConnected: Boolean + get() = state.value is Connected + + private val hasSubscribers: Boolean + get() = subscribers.isNotEmpty() + + suspend fun onSubscription(action: OnSubscriptionAction) = mutex.withLock { + subscribers += action + enableObservationIfNeeded() + if (isObservationEnabled.value) { + // Ignore `NotConnectedException` to guard against potential race-condition where disconnect occurs + // immediately after checking `isObservationEnabled`. + suppressNotConnectedException { + action() + } + } + } + + suspend fun onCompletion(action: OnSubscriptionAction) = mutex.withLock { + subscribers -= action + disableObservationIfNeeded() + } + + suspend fun onConnected() = mutex.withLock { + enableObservationIfNeeded() + if (isObservationEnabled.value) { + // Ignore `NotConnectedException` to guard against potential race-condition where disconnect occurs + // immediately after checking `isObservationEnabled`. + suppressNotConnectedException { + subscribers.forEach { it() } + } + } + } + + fun onConnectionLost() { + // We assume that remote peripheral and local BLE system implicitly clears notifications/indications. + isObservationEnabled.value = false + } + + private suspend fun enableObservationIfNeeded() { + if (!isObservationEnabled.value && isConnected && hasSubscribers) { + suppressNotConnectedException { + handler.startObservation(characteristic) + isObservationEnabled.value = true + } + } + } + + private suspend fun disableObservationIfNeeded() { + if (isObservationEnabled.value && isConnected && !hasSubscribers) { + suppressNotConnectedException { + handler.stopObservation(characteristic) + } + isObservationEnabled.value = false + } + } + + /** + * It is assumed that observations are automatically cleared on disconnect, therefore in some situations + * [NotConnectedException]s can be ignored, as the corresponding [action] will be rendered unnecessary + * (e.g. clearing an observation is not needed if connection has been lost), or [action] will be re-attempted on + * [reconnect][onConnected]. + */ + private inline fun suppressNotConnectedException(action: () -> Unit) { + try { + action.invoke() + } catch (e: NotConnectedException) { + logger.verbose { message = "Suppressed failure: ${e.message}" } + } + } +}