-
Notifications
You must be signed in to change notification settings - Fork 305
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Check the bluetoothctl version with asyncio.create_subprocess_exec
- Loading branch information
Showing
8 changed files
with
189 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1 @@ | ||
import re | ||
import subprocess | ||
|
||
from ...exc import BleakError | ||
|
||
|
||
def check_bluez_version(major: int, minor: int) -> bool: | ||
""" | ||
Checks the BlueZ version. | ||
Returns: | ||
``True`` if the BlueZ major version is equal to *major* and the minor | ||
version is greater than or equal to *minor*, otherwise ``False``. | ||
""" | ||
# lazy-get the version and store it so we only have to run subprocess once | ||
if not hasattr(check_bluez_version, "version"): | ||
p = subprocess.Popen(["bluetoothctl", "--version"], stdout=subprocess.PIPE) | ||
out, _ = p.communicate() | ||
s = re.search(b"(\\d+).(\\d+)", out.strip(b"'")) | ||
|
||
if not s: | ||
raise BleakError(f"Could not determine BlueZ version: {out.decode()}") | ||
|
||
setattr(check_bluez_version, "version", tuple(map(int, s.groups()))) | ||
|
||
bluez_major, bluez_minor = getattr(check_bluez_version, "version") | ||
|
||
return bluez_major == major and bluez_minor >= minor | ||
"""BlueZ backend.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import asyncio | ||
import contextlib | ||
import logging | ||
import re | ||
from typing import Optional | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
async def _get_bluetoothctl_version(): | ||
"""Get the version of bluetoothctl.""" | ||
with contextlib.suppress(Exception): | ||
proc = await asyncio.create_subprocess_exec( | ||
"bluetoothctl", "--version", stdout=asyncio.subprocess.PIPE | ||
) | ||
out = await proc.stdout.read() | ||
version = re.search(b"(\\d+).(\\d+)", out.strip(b"'")) | ||
await proc.wait() | ||
return version | ||
return None | ||
|
||
|
||
class BlueZFeatures: | ||
"""Check which features are supported by the BlueZ backend.""" | ||
|
||
checked_bluez_version = False | ||
supported_version = True | ||
can_write_without_response = True | ||
write_without_response_workaround_needed = False | ||
hides_battery_characteristic = True | ||
hides_device_name_characteristic = True | ||
_check_bluez_event: Optional[asyncio.Event] = None | ||
|
||
@classmethod | ||
async def check_bluez_version(cls) -> None: | ||
"""Check the bluez version.""" | ||
if cls._check_bluez_event: | ||
# If there is already a check in progress | ||
# it wins, wait for it instead | ||
await cls._check_bluez_event.wait() | ||
return | ||
cls._check_bluez_event = asyncio.Event() | ||
version_output = await _get_bluetoothctl_version() | ||
if version_output: | ||
major, minor = tuple(map(int, version_output.groups())) | ||
cls.supported_version = major == 5 and minor >= 34 | ||
cls.can_write_without_response = major == 5 and minor >= 46 | ||
cls.write_without_response_workaround_needed = not ( | ||
major == 5 and minor >= 51 | ||
) | ||
cls.hides_battery_characteristic = major == 5 and minor >= 48 | ||
cls.hides_device_name_characteristic = major == 5 and minor >= 48 | ||
else: | ||
# Its possible they may be running inside a container where | ||
# bluetoothctl is not available and they only have access to the | ||
# BlueZ D-Bus API. | ||
logging.warning( | ||
"Could not determine BlueZ version, bluetoothctl not available, assuming 5.51+" | ||
) | ||
|
||
cls._check_bluez_event.set() | ||
cls.checked_bluez_version = True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
#!/usr/bin/env python | ||
|
||
"""Tests for `bleak.backends.bluezdbus.version` package.""" | ||
|
||
import sys | ||
from unittest.mock import Mock, patch | ||
|
||
import pytest | ||
|
||
if sys.version_info[:2] < (3, 8): | ||
from asynctest.mock import CoroutineMock as AsyncMock | ||
else: | ||
from unittest.mock import AsyncMock | ||
|
||
from bleak.backends.bluezdbus.version import BlueZFeatures | ||
|
||
|
||
@pytest.mark.asyncio | ||
@pytest.mark.parametrize( | ||
"version,can_write_without_response,write_without_response_workaround_needed,hides_battery_characteristic,hides_device_name_characteristic", | ||
[ | ||
(b"bluetoothctl: 5.34", False, False, False, False), | ||
(b"bluetoothctl: 5.46", True, False, False, False), | ||
(b"bluetoothctl: 5.48", True, False, True, True), | ||
(b"bluetoothctl: 5.51", True, True, True, True), | ||
(b"bluetoothctl: 5.63", True, True, True, True), | ||
(b"", True, True, True, True), | ||
], | ||
) | ||
async def test_bluez_version( | ||
version, | ||
can_write_without_response, | ||
write_without_response_workaround_needed, | ||
hides_battery_characteristic, | ||
hides_device_name_characteristic, | ||
): | ||
"""Test we can determine supported feature from bluetoothctl.""" | ||
mock_proc = Mock( | ||
wait=AsyncMock(), stdout=Mock(read=AsyncMock(return_value=version)) | ||
) | ||
with patch( | ||
"bleak.backends.bluezdbus.version.asyncio.create_subprocess_exec", | ||
AsyncMock(return_value=mock_proc), | ||
): | ||
BlueZFeatures._check_bluez_event = None | ||
await BlueZFeatures.check_bluez_version() | ||
assert BlueZFeatures.checked_bluez_version is True | ||
assert BlueZFeatures.can_write_without_response == can_write_without_response | ||
assert ( | ||
not BlueZFeatures.write_without_response_workaround_needed | ||
== write_without_response_workaround_needed | ||
) | ||
assert BlueZFeatures.hides_battery_characteristic == hides_battery_characteristic | ||
assert ( | ||
BlueZFeatures.hides_device_name_characteristic | ||
== hides_device_name_characteristic | ||
) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_bluez_version_only_happens_once(): | ||
"""Test we can determine supported feature from bluetoothctl.""" | ||
BlueZFeatures.checked_bluez_version = False | ||
BlueZFeatures._check_bluez_event = None | ||
mock_proc = Mock( | ||
wait=AsyncMock(), | ||
stdout=Mock(read=AsyncMock(return_value=b"bluetoothctl: 5.46")), | ||
) | ||
with patch( | ||
"bleak.backends.bluezdbus.version.asyncio.create_subprocess_exec", | ||
AsyncMock(return_value=mock_proc), | ||
): | ||
await BlueZFeatures.check_bluez_version() | ||
|
||
assert BlueZFeatures.checked_bluez_version is True | ||
|
||
with patch( | ||
"bleak.backends.bluezdbus.version.asyncio.create_subprocess_exec", | ||
side_effect=Exception, | ||
): | ||
await BlueZFeatures.check_bluez_version() | ||
|
||
assert BlueZFeatures.checked_bluez_version is True | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_exception_checking_bluez_features_does_not_block_forever(): | ||
"""Test an exception while checking BlueZ features does not stall a second check.""" | ||
BlueZFeatures.checked_bluez_version = False | ||
BlueZFeatures._check_bluez_event = None | ||
with patch( | ||
"bleak.backends.bluezdbus.version.asyncio.create_subprocess_exec", | ||
side_effect=OSError, | ||
): | ||
await BlueZFeatures.check_bluez_version() | ||
|
||
assert BlueZFeatures.checked_bluez_version is True | ||
|
||
with patch( | ||
"bleak.backends.bluezdbus.version.asyncio.create_subprocess_exec", | ||
side_effect=OSError, | ||
): | ||
await BlueZFeatures.check_bluez_version() | ||
|
||
assert BlueZFeatures.checked_bluez_version is True |