-
Notifications
You must be signed in to change notification settings - Fork 88
/
Copy pathObservation.kt
96 lines (82 loc) · 3.24 KB
/
Observation.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.juul.kable
import co.touchlab.stately.collections.IsoMutableList
import com.juul.kable.State.Connecting.Observes
import com.juul.kable.logs.Logger
import com.juul.kable.logs.Logging
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
internal class Observation(
private val state: StateFlow<State>,
private val handler: Handler,
private val characteristic: Characteristic,
logging: Logging,
identifier: String,
) {
interface Handler {
suspend fun startObservation(characteristic: Characteristic)
suspend fun stopObservation(characteristic: Characteristic)
}
private val logger = Logger(logging, tag = "Kable/Observation", identifier)
private val mutex = Mutex()
private val subscribers = IsoMutableList<OnSubscriptionAction>()
private val _didStartObservation = atomic(false)
private var didStartObservation: Boolean
get() = _didStartObservation.value
set(value) { _didStartObservation.value = value }
private val isConnected: Boolean
get() = state.value.isAtLeast<Observes>()
private val hasSubscribers: Boolean
get() = subscribers.isNotEmpty()
suspend fun onSubscription(action: OnSubscriptionAction) = mutex.withLock {
subscribers += action
val shouldStartObservation = !didStartObservation && hasSubscribers && isConnected
if (shouldStartObservation) {
suppressConnectionExceptions {
startObservation()
action()
}
}
}
suspend fun onCompletion(action: OnSubscriptionAction) = mutex.withLock {
subscribers -= action
val shouldStopObservation = didStartObservation && !hasSubscribers && isConnected
if (shouldStopObservation) stopObservation()
}
suspend fun onConnected() = mutex.withLock {
if (isConnected && hasSubscribers) {
suppressConnectionExceptions {
startObservation()
subscribers.forEach { it() }
}
}
}
private suspend fun startObservation() {
handler.startObservation(characteristic)
didStartObservation = true
}
private suspend fun stopObservation() {
suppressConnectionExceptions {
handler.stopObservation(characteristic)
}
didStartObservation = false
}
/**
* While spinning up or down an observation the connection may drop, resulting in an unnecessary connection related
* exception being thrown.
*
* Since it is assumed that observations are automatically cleared on disconnect, these exceptions can be ignored,
* as the corresponding [action] will be rendered unnecessary (clearing an observation is not needed if connection
* has been lost, or [action] will be re-attempted on [reconnect][onConnected]).
*/
private inline fun suppressConnectionExceptions(action: () -> Unit) {
try {
action.invoke()
} catch (e: NotConnectedException) {
logger.verbose { message = "Suppressed failure: $e" }
} catch (e: BluetoothException) {
logger.verbose { message = "Suppressed failure: $e" }
}
}
}