Skip to content

Commit

Permalink
Begin refactor of observe handling
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt committed Nov 22, 2021
1 parent ed0e4f5 commit e6dd33e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 94 deletions.
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ kotlin {
dependencies {
api(coroutines())
api(uuid())
implementation(tuulbox("collections"))
}
}

Expand Down
118 changes: 39 additions & 79 deletions core/src/androidMain/kotlin/Observers.kt
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.juul.kable

import com.juul.kable.logs.Logger
import com.juul.kable.logs.Logging
import com.juul.tuulbox.collections.synchronizedMapOf
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.cancellation.CancellationException

internal sealed class AndroidObservationEvent {
Expand Down Expand Up @@ -50,104 +49,65 @@ internal sealed class AndroidObservationEvent {
*/
internal class Observers(
private val peripheral: AndroidPeripheral,
logging: Logging,
private val state: StateFlow<State>,
private val logging: Logging,
) {

private val logger = Logger(logging, tag = "Kable/Observers", peripheral.bluetoothDevice.address)

val characteristicChanges = MutableSharedFlow<AndroidObservationEvent>()
private val observations = Observations()
private val observations = synchronizedMapOf<Characteristic, Observation>()

fun acquire(
characteristic: Characteristic,
onSubscription: OnSubscriptionAction,
): Flow<ByteArray> = characteristicChanges
.onSubscription {
peripheral.suspendUntilAtLeast<State.Connecting.Observes>()
if (observations.add(characteristic, onSubscription) == 1) {
peripheral.startObservation(characteristic)
}
onSubscription()
}
.filter {
it.characteristic.characteristicUuid == characteristic.characteristicUuid &&
it.characteristic.serviceUuid == characteristic.serviceUuid
}
.map {
when (it) {
is AndroidObservationEvent.Error -> throw it.cause
is AndroidObservationEvent.CharacteristicChange -> it.data
): Flow<ByteArray> {
val handler = peripheral.observationHandler()
val identifier = peripheral.bluetoothDevice.address
val observation = observations.synchronized {
getOrPut(characteristic) {
Observation(state, handler, characteristic, logging, identifier)
}
}
.onCompletion {
if (observations.remove(characteristic, onSubscription) == 0) {
try {
peripheral.stopObservation(characteristic)
} catch (e: NotReadyException) {
// Silently ignore as it is assumed that failure is due to connection drop, in which case Android
// will clear the notifications.
logger.debug { message = "Stop notification failure ignored." }
}

return characteristicChanges
.onSubscription { observation.onSubscription(onSubscription) }
.filter {
it.characteristic.characteristicUuid == characteristic.characteristicUuid &&
it.characteristic.serviceUuid == characteristic.serviceUuid
}
}
.map(::dematerialize)
.onCompletion { observation.onCompletion(onSubscription) }
}

suspend fun rewire() {
observations.forEach { characteristic, onSubscriptionActions ->
suspend fun onConnected() {
observations.entries.forEach { (characteristic, observation) ->
try {
peripheral.startObservation(characteristic)
onSubscriptionActions.forEach { it() }
observation.onConnected()
} catch (cancellation: CancellationException) {
throw cancellation
} catch (t: Throwable) {
characteristicChanges.emit(AndroidObservationEvent.Error(characteristic, t))
} catch (e: Exception) {
characteristicChanges.emit(AndroidObservationEvent.Error(characteristic, e))
}
}
}
}

private class Observations {

private val lock = Mutex()
private val observations = mutableMapOf<Characteristic, MutableList<OnSubscriptionAction>>()

suspend inline fun forEach(
action: (Characteristic, List<OnSubscriptionAction>) -> Unit
) = lock.withLock {
observations.forEach { (characteristic, onSubscriptionActions) ->
action(characteristic, onSubscriptionActions)
fun onConnectionLost() {
observations.entries.forEach { (_, observation) ->
observation.onConnectionLost()
}
}
}

suspend fun add(
characteristic: Characteristic,
onSubscription: OnSubscriptionAction,
): Int = lock.withLock {
val actions = observations[characteristic]
if (actions == null) {
val newActions = mutableListOf(onSubscription)
observations[characteristic] = newActions
1
} else {
actions += onSubscription
actions.count()
}
private fun dematerialize(event: AndroidObservationEvent): ByteArray = when (event) {
is AndroidObservationEvent.Error -> throw event.cause
is AndroidObservationEvent.CharacteristicChange -> event.data
}

private fun AndroidPeripheral.observationHandler(): Observation.Handler = object : Observation.Handler {
override suspend fun startObservation(characteristic: Characteristic) {
this@observationHandler.startObservation(characteristic)
}

suspend fun remove(
characteristic: Characteristic,
onSubscription: OnSubscriptionAction,
): Int = lock.withLock {
val actions = observations[characteristic]
when {
actions == null -> -1 // No previous observation existed for characteristic.
actions.count() == 1 -> {
observations -= characteristic
0
}
else -> {
actions -= onSubscription
actions.count()
}
}
override suspend fun stopObservation(characteristic: Characteristic) {
this@observationHandler.stopObservation(characteristic)
}
}
29 changes: 16 additions & 13 deletions core/src/androidMain/kotlin/Peripheral.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -142,6 +143,7 @@ public class AndroidPeripheral internal constructor(
private val receiver = registerBluetoothStateBroadcastReceiver { state ->
if (state == STATE_OFF) {
closeConnection()
observers.onConnectionLost()
_state.value = State.Disconnected()
}
}
Expand All @@ -163,7 +165,7 @@ public class AndroidPeripheral internal constructor(
*/
public val mtu: StateFlow<Int?> = _mtu.asStateFlow()

private val observers = Observers(this, logging)
private val observers = Observers(this, _state, logging)

@Volatile
private var _platformServices: List<PlatformService>? = null
Expand Down Expand Up @@ -201,14 +203,15 @@ public class AndroidPeripheral internal constructor(
connection
.characteristicChanges
.onEach(observers.characteristicChanges::emit)
.onCompletion { observers.onConnectionLost() }
.launchIn(scope, start = UNDISPATCHED)

suspendUntilOrThrow<State.Connecting.Services>()
discoverServices()
onServicesDiscovered(ServicesDiscoveredPeripheral(this@AndroidPeripheral))
_state.value = State.Connecting.Observes
logger.verbose { message = "Configuring characteristic observations" }
observers.rewire()
observers.onConnected()
} catch (t: Throwable) {
closeConnection()
logger.error(t) { message = "Failed to connect" }
Expand All @@ -222,6 +225,9 @@ public class AndroidPeripheral internal constructor(
private fun closeConnection() {
_connection?.close()
_connection = null

observers.onConnectionLost()

// Avoid trampling existing `Disconnected` state (and its properties) by only updating if not already `Disconnected`.
_state.update { previous -> previous as? State.Disconnected ?: State.Disconnected() }
}
Expand Down Expand Up @@ -377,19 +383,16 @@ public class AndroidPeripheral internal constructor(

internal suspend fun stopObservation(characteristic: Characteristic) {
val platformCharacteristic = platformServices.findCharacteristic(characteristic)
setConfigDescriptor(platformCharacteristic, enable = false)

try {
setConfigDescriptor(platformCharacteristic, enable = false)
} finally {
logger.debug {
message = "setCharacteristicNotification"
detail(characteristic)
detail("value", "false")
}
connection
.bluetoothGatt
.setCharacteristicNotification(platformCharacteristic, false)
logger.debug {
message = "setCharacteristicNotification"
detail(characteristic)
detail("value", "false")
}
connection
.bluetoothGatt
.setCharacteristicNotification(platformCharacteristic, false)
}

private suspend fun setConfigDescriptor(
Expand Down
9 changes: 7 additions & 2 deletions core/src/commonMain/kotlin/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public expect open class IOException internal constructor(
cause: Throwable? = null,
) : Exception

public open class NotConnectedException internal constructor(
message: String? = null,
cause: Throwable? = null,
) : IOException(message, cause)

public class ConnectionRejectedException internal constructor(
message: String? = null,
cause: Throwable? = null,
Expand All @@ -31,7 +36,7 @@ public class ConnectionRejectedException internal constructor(
public class NotReadyException internal constructor(
message: String? = null,
cause: Throwable? = null,
) : IOException(message, cause)
) : NotConnectedException(message, cause)

public class GattStatusException internal constructor(
message: String? = null,
Expand All @@ -41,4 +46,4 @@ public class GattStatusException internal constructor(
public class ConnectionLostException internal constructor(
message: String? = null,
cause: Throwable? = null,
) : IOException(message, cause)
) : NotConnectedException(message, cause)
100 changes: 100 additions & 0 deletions core/src/commonMain/kotlin/Observations.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.juul.kable

import com.juul.kable.State.Connected
import com.juul.kable.logs.Logger
import com.juul.kable.logs.Logging
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class Observation(
private val state: StateFlow<State>,
private val handler: Handler,
private val characteristic: Characteristic,
logging: Logging,
identifier: String,
) {

interface Handler {
suspend fun startObservation(characteristic: Characteristic)
suspend fun stopObservation(characteristic: Characteristic)
}

private val logger = Logger(logging, tag = "Kable/Observation", identifier)

private val mutex = Mutex()
private val subscribers = mutableListOf<OnSubscriptionAction>()
private val isObservationEnabled = atomic(false)

private val isConnected: Boolean
get() = state.value is Connected

private val hasSubscribers: Boolean
get() = subscribers.isNotEmpty()

suspend fun onSubscription(action: OnSubscriptionAction) = mutex.withLock {
subscribers += action
enableObservationIfNeeded()
if (isObservationEnabled.value) {
// Ignore `NotConnectedException` to guard against potential race-condition where disconnect occurs
// immediately after checking `isObservationEnabled`.
suppressNotConnectedException {
action()
}
}
}

suspend fun onCompletion(action: OnSubscriptionAction) = mutex.withLock {
subscribers -= action
disableObservationIfNeeded()
}

suspend fun onConnected() = mutex.withLock {
enableObservationIfNeeded()
if (isObservationEnabled.value) {
// Ignore `NotConnectedException` to guard against potential race-condition where disconnect occurs
// immediately after checking `isObservationEnabled`.
suppressNotConnectedException {
subscribers.forEach { it() }
}
}
}

fun onConnectionLost() {
// We assume that remote peripheral and local BLE system implicitly clears notifications/indications.
isObservationEnabled.value = false
}

private suspend fun enableObservationIfNeeded() {
if (!isObservationEnabled.value && isConnected && hasSubscribers) {
suppressNotConnectedException {
handler.startObservation(characteristic)
isObservationEnabled.value = true
}
}
}

private suspend fun disableObservationIfNeeded() {
if (isObservationEnabled.value && isConnected && !hasSubscribers) {
suppressNotConnectedException {
handler.stopObservation(characteristic)
}
isObservationEnabled.value = false
}
}

/**
* It is assumed that observations are automatically cleared on disconnect, therefore in some situations
* [NotConnectedException]s can be ignored, as the corresponding [action] will be rendered unnecessary
* (e.g. clearing an observation is not needed if connection has been lost), or [action] will be re-attempted on
* [reconnect][onConnected].
*/
private inline fun suppressNotConnectedException(action: () -> Unit) {
try {
action.invoke()
} catch (e: NotConnectedException) {
logger.verbose { message = "Suppressed failure: ${e.message}" }
}
}
}

0 comments on commit e6dd33e

Please sign in to comment.