Skip to content

Commit

Permalink
Prevent concurrent I/O operations on JavaScript (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt authored May 25, 2021
1 parent c2fb43d commit 57edad1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 35 deletions.
59 changes: 41 additions & 18 deletions core/src/jsMain/kotlin/Observers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.await
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.withLock
import org.khronos.webgl.DataView
import org.w3c.dom.events.Event

Expand Down Expand Up @@ -41,7 +42,9 @@ internal class Observers(
if (++observation.count == 1) {
bluetoothRemoteGATTCharacteristic.apply {
addEventListener(CHARACTERISTIC_VALUE_CHANGED, observation.listener)
startNotifications().await()
peripheral.ioLock.withLock {
startNotifications().await()
}
}
}

Expand All @@ -51,24 +54,42 @@ internal class Observers(
emit(it.data)
}
}
} finally {
if (--observation.count < 1) {
bluetoothRemoteGATTCharacteristic.apply {
/* Throws `DOMException` if connection is closed:
*
* DOMException: Failed to execute 'stopNotifications' on 'BluetoothRemoteGATTCharacteristic':
* Characteristic with UUID [...] is no longer valid. Remember to retrieve the characteristic
* again after reconnecting.
*
* Wrapped in `runCatching` to silently ignore failure, as notification will already be
* invalidated due to the connection being closed.
*/
runCatching { stopNotifications().await() }

removeEventListener(CHARACTERISTIC_VALUE_CHANGED, observation.listener)
} catch (t: Throwable) {
// Unnecessary `catch` block as workaround for KT-37279 (needed until we switch to IR compiler).
// https://youtrack.jetbrains.com/issue/KT-37279
// Once KT-37279 is fixed, this `catch` should be replaced with `finally`.
// See previous logic (before workaround) in:
// https://github.com/JuulLabs/kable/blob/151e54d255bf5595c67023927084d083e6180706/core/src/jsMain/kotlin/Observers.kt#L48-L72
observation.teardown(bluetoothRemoteGATTCharacteristic, characteristic)
return@flow
}
observation.teardown(bluetoothRemoteGATTCharacteristic, characteristic)
}

private suspend fun Observation.teardown(
bluetoothRemoteGATTCharacteristic: BluetoothRemoteGATTCharacteristic,
characteristic: Characteristic
) {
if (--count < 1) {
bluetoothRemoteGATTCharacteristic.apply {
/* Throws `DOMException` if connection is closed:
*
* DOMException: Failed to execute 'stopNotifications' on 'BluetoothRemoteGATTCharacteristic':
* Characteristic with UUID [...] is no longer valid. Remember to retrieve the characteristic
* again after reconnecting.
*
* Wrapped in `runCatching` to silently ignore failure, as notification will already be
* invalidated due to the connection being closed.
*/
runCatching {
peripheral.ioLock.withLock {
stopNotifications().await()
}
}
observers.remove(characteristic)

removeEventListener(CHARACTERISTIC_VALUE_CHANGED, listener)
}
observers.remove(characteristic)
}
}

Expand All @@ -89,7 +110,9 @@ internal class Observers(
platformCharacteristic
.bluetoothRemoteGATTCharacteristic
.apply {
startNotifications().await()
peripheral.ioLock.withLock {
startNotifications().await()
}
addEventListener(CHARACTERISTIC_VALUE_CHANGED, platformCharacteristic.createListener())
}
}
Expand Down
46 changes: 29 additions & 17 deletions core/src/jsMain/kotlin/Peripheral.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.job
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.khronos.webgl.DataView
import kotlin.coroutines.CoroutineContext
import org.w3c.dom.events.Event as JsEvent
Expand All @@ -47,6 +49,8 @@ public class JsPeripheral internal constructor(

private val scope = CoroutineScope(parentCoroutineContext + job)

internal val ioLock = Mutex()

private val _state = MutableStateFlow<State?>(null)
public override val state: Flow<State> = _state.filterNotNull()

Expand Down Expand Up @@ -136,9 +140,9 @@ public class JsPeripheral internal constructor(
}

private suspend fun discoverServices(): List<PlatformService> {
val services = gatt.getPrimaryServices()
.await()
.map { it.toPlatformService() }
val services = ioLock.withLock {
gatt.getPrimaryServices().await()
}.map { it.toPlatformService() }
_platformServices = services
return services
}
Expand All @@ -148,19 +152,23 @@ public class JsPeripheral internal constructor(
data: ByteArray,
writeType: WriteType,
) {
bluetoothRemoteGATTCharacteristicFrom(characteristic).run {
val jsCharacteristic = bluetoothRemoteGATTCharacteristicFrom(characteristic)
ioLock.withLock {
when (writeType) {
WithResponse -> writeValueWithResponse(data)
WithoutResponse -> writeValueWithoutResponse(data)
}
}.await()
WithResponse -> jsCharacteristic.writeValueWithResponse(data)
WithoutResponse -> jsCharacteristic.writeValueWithoutResponse(data)
}.await()
}
}

public suspend fun readAsDataView(
characteristic: Characteristic
): DataView = bluetoothRemoteGATTCharacteristicFrom(characteristic)
.readValue()
.await()
): DataView {
val jsCharacteristic = bluetoothRemoteGATTCharacteristicFrom(characteristic)
return ioLock.withLock {
jsCharacteristic.readValue().await()
}
}

public override suspend fun read(
characteristic: Characteristic
Expand All @@ -172,16 +180,20 @@ public class JsPeripheral internal constructor(
descriptor: Descriptor,
data: ByteArray
) {
bluetoothRemoteGATTDescriptorFrom(descriptor)
.writeValue(data)
.await()
val jsDescriptor = bluetoothRemoteGATTDescriptorFrom(descriptor)
ioLock.withLock {
jsDescriptor.writeValue(data).await()
}
}

public suspend fun readAsDataView(
descriptor: Descriptor
): DataView = bluetoothRemoteGATTDescriptorFrom(descriptor)
.readValue()
.await()
): DataView {
val jsDescriptor = bluetoothRemoteGATTDescriptorFrom(descriptor)
return ioLock.withLock {
jsDescriptor.readValue().await()
}
}

public override suspend fun read(
descriptor: Descriptor
Expand Down

0 comments on commit 57edad1

Please sign in to comment.