Skip to content

Commit

Permalink
Brute force encryption key automatically (#52)
Browse files Browse the repository at this point in the history
* Brute force encryption key automatically

* Fix config flow error handling

* Add config flow tests

* Cleanup
  • Loading branch information
ofalvai authored Dec 5, 2021
1 parent 71e8ccf commit a835bee
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 68 deletions.
57 changes: 43 additions & 14 deletions custom_components/candy/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import codecs
import json
import logging
from typing import Union
from json import JSONDecodeError
from typing import Union, Optional

import aiohttp
import backoff
from aiohttp import ClientSession

from .decryption import decrypt, Encryption, find_key
from .model import WashingMachineStatus, TumbleDryerStatus, DishwasherStatus, OvenStatus

_LOGGER = logging.getLogger(__name__)
Expand All @@ -22,16 +23,15 @@ def __init__(self, session: ClientSession, device_ip: str, encryption_key: str,

@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=10, logger=__name__)
@backoff.on_exception(backoff.expo, TimeoutError, max_tries=10, logger=__name__)
async def status_with_retry(self) -> Union[WashingMachineStatus, TumbleDryerStatus]:
async def status_with_retry(self) -> Union[WashingMachineStatus, TumbleDryerStatus, DishwasherStatus, OvenStatus]:
return await self.status()

async def status(self) -> Union[WashingMachineStatus, TumbleDryerStatus]:
url = f"http://{self.device_ip}/http-read.json?encrypted={1 if self.use_encryption else 0}"
async def status(self) -> Union[WashingMachineStatus, TumbleDryerStatus, DishwasherStatus, OvenStatus]:
url = _status_url(self.device_ip, self.use_encryption)
async with self.session.get(url) as resp:
if self.use_encryption:
resp_bytes = await resp.read()
resp_hex = codecs.decode(resp_bytes, encoding="hex")
decrypted_text = self.decrypt(resp_hex, self.encryption_key)
if self.encryption_key != "":
resp_hex = await resp.text() # Response is hex encoded encrypted data
decrypted_text = decrypt(self.encryption_key.encode(), bytes.fromhex(resp_hex))
resp_json = json.loads(decrypted_text)
else:
resp_json = await resp.json(content_type="text/html")
Expand All @@ -51,9 +51,38 @@ async def status(self) -> Union[WashingMachineStatus, TumbleDryerStatus]:

return status

def decrypt(self, cipher_text, key):
decrypted = ""
for i in range(len(cipher_text)):
decrypted += chr(cipher_text[i] ^ ord(key[i % len(key)]))

return decrypted
async def detect_encryption(session: aiohttp.ClientSession, device_ip: str) -> (Encryption, Optional[str]):
# noinspection PyBroadException
try:
_LOGGER.info("Trying to get a response without encryption (encrypted=0)...")
url = _status_url(device_ip, use_encryption=False)
async with session.get(url) as resp:
resp_json = await resp.json(content_type="text/html")
assert resp_json.get("response") != "BAD REQUEST"
_LOGGER.info("Received unencrypted JSON response, no need to use key for decryption")
return Encryption.NO_ENCRYPTION, None
except Exception as e:
_LOGGER.debug(e)
_LOGGER.info("Failed to get a valid response without encryption, let's try with encrypted=1...")
url = _status_url(device_ip, use_encryption=True)
async with session.get(url) as resp:
resp_hex = await resp.text() # Response is hex encoded encrypted data
try:
json.loads(bytes.fromhex(resp_hex))
_LOGGER.info("Response is not encrypted (despite encryption=1 in request), no need to brute force "
"the key")
return Encryption.ENCRYPTION_WITHOUT_KEY, None
except JSONDecodeError:
_LOGGER.info("Brute force decryption key from the encrypted response...")
_LOGGER.debug(f"Response: {resp_hex}")
key = find_key(bytes.fromhex(resp_hex))
if key is None:
raise ValueError("Couldn't brute force key")

_LOGGER.info("Using key with encrypted=1 for future requests")
return Encryption.ENCRYPTION, key


def _status_url(device_ip: str, use_encryption: bool) -> str:
return f"http://{device_ip}/http-read.json?encrypted={1 if use_encryption else 0}"
63 changes: 63 additions & 0 deletions custom_components/candy/client/decryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import itertools
import json
import logging
import math
import string
from enum import Enum

from typing import Optional, Iterable

# Adapted from https://www.online-python.com/pm93n5Sqg4

_LOGGER = logging.getLogger(__name__)

KEY_LEN = 16
KEY_CHARSET_CODEPOINTS: list[int] = [ord(c) for c in string.ascii_letters + string.digits]
PLAINTEXT_CHARSET_CODEPOINTS: list[int] = [ord(c) for c in string.printable]


class Encryption(Enum):
NO_ENCRYPTION = 1 # Use `encrypted=0` in request, response is plaintext JSON
ENCRYPTION = 2 # Use `encrypted=1` in request, response is encrypted bytes in hex encoding
ENCRYPTION_WITHOUT_KEY = 3 # Use `encrypted=1` in request, response is unencrypted hex bytes (https://github.com/ofalvai/home-assistant-candy/issues/35#issuecomment-965557116)


def find_key(encrypted_response: bytes) -> Optional[str]:
candidate_key_codepoints: list[list[int]] = [
list(_find_candidate_key_codepoints(encrypted_response, i)) for i in range(16)
]

number_of_keys = math.prod(len(l) for l in candidate_key_codepoints)
_LOGGER.info(f"{number_of_keys} keys to test")

for key in itertools.product(*candidate_key_codepoints):
decrypted = decrypt(key, encrypted_response)
if _is_valid_json(decrypted):
key_str = "".join(chr(point) for point in key)
_LOGGER.info(f"Potential key found: {key_str}")
return key_str

return None


def decrypt(key: bytes, encrypted_response: bytes) -> bytes:
key_len = len(key)
decrypted: list[int] = []
for (i, byte) in enumerate(encrypted_response):
decrypted.append(byte ^ key[i % key_len])
return bytes(decrypted)


def _find_candidate_key_codepoints(encrypted_response: bytes, key_offset: int) -> Iterable[int]:
bytes_to_check: bytes = encrypted_response[key_offset::KEY_LEN]
for point in KEY_CHARSET_CODEPOINTS:
if all(point ^ byte in PLAINTEXT_CHARSET_CODEPOINTS for byte in bytes_to_check):
yield point


def _is_valid_json(decrypted: bytes) -> bool:
try:
json.loads(decrypted)
except json.JSONDecodeError:
return False
return True
47 changes: 27 additions & 20 deletions custom_components/candy/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,24 @@
import logging
from typing import Any

import async_timeout
import voluptuous as vol

from homeassistant import config_entries
from homeassistant.core import HomeAssistant, callback
from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession

from .client import detect_encryption
from .client.decryption import Encryption
from .const import *

_LOGGER = logging.getLogger(__name__)

STEP_DATA_SCHEMA = vol.Schema({
vol.Required(CONF_IP_ADDRESS): str,
vol.Required(CONF_KEY_USE_ENCRYPTION, default=True): bool,
vol.Optional(CONF_PASSWORD): str
})


async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> bool:
"""Validate the user input allows us to connect."""
# Everything is validated in the schema
return True


class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Candy."""

Expand All @@ -40,19 +33,33 @@ async def async_step_user(
) -> FlowResult:
"""Handle the initial step."""
if user_input is None:
return self.async_show_form(
step_id="user", data_schema=STEP_DATA_SCHEMA
)
return self.async_show_form(step_id="user", data_schema=STEP_DATA_SCHEMA)

errors = {}
config_data = {
CONF_IP_ADDRESS: user_input[CONF_IP_ADDRESS]
}

errors = {}
try:
await validate_input(self.hass, user_input)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
async with async_timeout.timeout(40):
encryption_type, key = await detect_encryption(
session=async_get_clientsession(self.hass),
device_ip=user_input[CONF_IP_ADDRESS]
)
except Exception as e: # pylint: disable=broad-except
_LOGGER.exception(e)
errors["base"] = "detect_encryption"
else:
return self.async_create_entry(title=CONF_INTEGRATION_TITLE, data=user_input)
if encryption_type == Encryption.ENCRYPTION:
config_data[CONF_KEY_USE_ENCRYPTION] = True
config_data[CONF_PASSWORD] = key
elif encryption_type == Encryption.NO_ENCRYPTION:
config_data[CONF_KEY_USE_ENCRYPTION] = False
elif encryption_type == Encryption.ENCRYPTION_WITHOUT_KEY:
config_data[CONF_KEY_USE_ENCRYPTION] = True
config_data[CONF_PASSWORD] = ""

return self.async_create_entry(title=CONF_INTEGRATION_TITLE, data=config_data)

return self.async_show_form(
step_id="user", data_schema=STEP_DATA_SCHEMA, errors=errors
Expand Down
3 changes: 2 additions & 1 deletion custom_components/candy/strings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
}
},
"error": {
"unknown": "[%key:common::config_flow::error::unknown%]"
"unknown": "[%key:common::config_flow::error::unknown%]",
"detect_encryption": "Failed to detect encryption, check logs"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
Expand Down
3 changes: 2 additions & 1 deletion custom_components/candy/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"already_configured": "Device is already configured"
},
"error": {
"unknown": "Unexpected error"
"unknown": "Unexpected error",
"detect_encryption": "Failed to detect encryption, check logs"
},
"step": {
"user": {
Expand Down
13 changes: 10 additions & 3 deletions tests/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import aresponses
from pytest_homeassistant_custom_component.common import load_fixture, MockConfigEntry
from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD
from homeassistant.core import HomeAssistant
from pytest_homeassistant_custom_component.common import load_fixture, MockConfigEntry

from custom_components.candy import DOMAIN, CONF_KEY_USE_ENCRYPTION

TEST_IP = "192.168.0.66"
TEST_ENCRYPTION_KEY = ""
TEST_ENCRYPTION_KEY_EMPTY = ""
TEST_ENCRYPTION_KEY = "fbfjlbmmfklfaikm"
TEST_ENCRYPTED_HEX_RESPONSE = """

"""
TEST_UNENCRYPTED_HEX_RESPONSE = """

"""


def status_response(filename):
Expand All @@ -23,7 +30,7 @@ async def init_integration(hass: HomeAssistant, aioclient_mock, status_response:
data={
CONF_IP_ADDRESS: "192.168.0.66",
CONF_KEY_USE_ENCRYPTION: False,
CONF_PASSWORD: "asdasdasd",
CONF_PASSWORD: "",
}
)

Expand Down
23 changes: 0 additions & 23 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,3 @@ def skip_notifications_fixture():
"homeassistant.components.persistent_notification.async_dismiss"
):
yield


# This fixture, when used, will result in calls to async_get_data to return None. To have the call
# return a value, we would add the `return_value=<VALUE_TO_RETURN>` parameter to the patch call.
@pytest.fixture(name="bypass_get_data")
def bypass_get_data_fixture():
"""Skip calls to get data from API."""
with patch(
"custom_components.integration_blueprint.IntegrationBlueprintApiClient.async_get_data"
):
yield


# In this fixture, we are forcing calls to async_get_data to raise an Exception. This is useful
# for exception handling.
@pytest.fixture(name="error_on_get_data")
def error_get_data_fixture():
"""Simulate error when retrieving data from API."""
with patch(
"custom_components.integration_blueprint.IntegrationBlueprintApiClient.async_get_data",
side_effect=Exception,
):
yield
Loading

0 comments on commit a835bee

Please sign in to comment.