-
-
Notifications
You must be signed in to change notification settings - Fork 31.1k
/
auth.py
324 lines (267 loc) · 10.4 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""Authentication for HTTP component."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from datetime import timedelta
from http import HTTPStatus
from ipaddress import ip_address
import logging
import os
import secrets
import time
from typing import Any, Final
from aiohttp import hdrs
from aiohttp.web import Application, Request, Response, StreamResponse, middleware
from aiohttp.web_exceptions import HTTPBadRequest
from aiohttp_session import session_middleware
import jwt
from jwt import api_jws
from yarl import URL
from homeassistant.auth import jwt_wrapper
from homeassistant.auth.const import GROUP_ID_READ_ONLY
from homeassistant.auth.models import User
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.http import current_request
from homeassistant.helpers.json import json_bytes
from homeassistant.helpers.network import is_cloud_connection
from homeassistant.helpers.storage import Store
from homeassistant.util.network import is_local
from .const import (
KEY_AUTHENTICATED,
KEY_HASS_REFRESH_TOKEN_ID,
KEY_HASS_USER,
StrictConnectionMode,
)
from .session import HomeAssistantCookieStorage
_LOGGER = logging.getLogger(__name__)
DATA_API_PASSWORD: Final = "api_password"
DATA_SIGN_SECRET: Final = "http.auth.sign_secret"
SIGN_QUERY_PARAM: Final = "authSig"
SAFE_QUERY_PARAMS: Final = ["height", "width"]
STORAGE_VERSION = 1
STORAGE_KEY = "http.auth"
CONTENT_USER_NAME = "Home Assistant Content"
STRICT_CONNECTION_EXCLUDED_PATH = "/api/webhook/"
STRICT_CONNECTION_STATIC_PAGE = os.path.join(
os.path.dirname(__file__), "strict_connection_static_page.html"
)
@callback
def async_sign_path(
hass: HomeAssistant,
path: str,
expiration: timedelta,
*,
refresh_token_id: str | None = None,
use_content_user: bool = False,
) -> str:
"""Sign a path for temporary access without auth header."""
if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
secret = hass.data[DATA_SIGN_SECRET] = secrets.token_hex()
if refresh_token_id is None:
if use_content_user:
refresh_token_id = hass.data[STORAGE_KEY]
elif connection := websocket_api.current_connection.get():
refresh_token_id = connection.refresh_token_id
elif (
request := current_request.get()
) and KEY_HASS_REFRESH_TOKEN_ID in request:
refresh_token_id = request[KEY_HASS_REFRESH_TOKEN_ID]
else:
refresh_token_id = hass.data[STORAGE_KEY]
url = URL(path)
now_timestamp = int(time.time())
expiration_timestamp = now_timestamp + int(expiration.total_seconds())
params = [itm for itm in url.query.items() if itm[0] not in SAFE_QUERY_PARAMS]
json_payload = json_bytes(
{
"iss": refresh_token_id,
"path": url.path,
"params": params,
"iat": now_timestamp,
"exp": expiration_timestamp,
}
)
encoded = api_jws.encode(json_payload, secret, "HS256")
params.append((SIGN_QUERY_PARAM, encoded))
url = url.with_query(params)
return f"{url.path}?{url.query_string}"
@callback
def async_user_not_allowed_do_auth(
hass: HomeAssistant, user: User, request: Request | None = None
) -> str | None:
"""Validate that user is not allowed to do auth things."""
if not user.is_active:
return "User is not active"
if not user.local_only:
return None
# User is marked as local only, check if they are allowed to do auth
if request is None:
request = current_request.get()
if not request:
return "No request available to validate local access"
if is_cloud_connection(hass):
return "User is local only"
try:
remote_address = ip_address(request.remote) # type: ignore[arg-type]
except ValueError:
return "Invalid remote IP"
if is_local(remote_address):
return None
return "User cannot authenticate remotely"
async def async_setup_auth(
hass: HomeAssistant,
app: Application,
strict_connection_mode_non_cloud: StrictConnectionMode,
) -> None:
"""Create auth middleware for the app."""
store = Store[dict[str, Any]](hass, STORAGE_VERSION, STORAGE_KEY)
if (data := await store.async_load()) is None:
data = {}
refresh_token = None
if "content_user" in data:
user = await hass.auth.async_get_user(data["content_user"])
if user and user.refresh_tokens:
refresh_token = list(user.refresh_tokens.values())[0]
if refresh_token is None:
user = await hass.auth.async_create_system_user(
CONTENT_USER_NAME, group_ids=[GROUP_ID_READ_ONLY]
)
refresh_token = await hass.auth.async_create_refresh_token(user)
data["content_user"] = user.id
await store.async_save(data)
hass.data[STORAGE_KEY] = refresh_token.id
strict_connection_static_file_content = None
if strict_connection_mode_non_cloud is StrictConnectionMode.STATIC_PAGE:
def read_static_page() -> str:
with open(STRICT_CONNECTION_STATIC_PAGE, encoding="utf-8") as file:
return file.read()
strict_connection_static_file_content = await hass.async_add_executor_job(
read_static_page
)
@callback
def async_validate_auth_header(request: Request) -> bool:
"""Test authorization header against access token.
Basic auth_type is legacy code, should be removed with api_password.
"""
try:
auth_type, auth_val = request.headers.get(hdrs.AUTHORIZATION, "").split(
" ", 1
)
except ValueError:
# If no space in authorization header
return False
if auth_type != "Bearer":
return False
refresh_token = hass.auth.async_validate_access_token(auth_val)
if refresh_token is None:
return False
if async_user_not_allowed_do_auth(hass, refresh_token.user, request):
return False
request[KEY_HASS_USER] = refresh_token.user
request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
return True
@callback
def async_validate_signed_request(request: Request) -> bool:
"""Validate a signed request."""
if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
return False
if (signature := request.query.get(SIGN_QUERY_PARAM)) is None:
return False
try:
claims = jwt_wrapper.verify_and_decode(
signature, secret, algorithms=["HS256"], options={"verify_iss": False}
)
except jwt.InvalidTokenError:
return False
if claims["path"] != request.path:
return False
params = [
list(itm) # claims stores tuples as lists
for itm in request.query.items()
if itm[0] not in SAFE_QUERY_PARAMS and itm[0] != SIGN_QUERY_PARAM
]
if claims["params"] != params:
return False
refresh_token = hass.auth.async_get_refresh_token(claims["iss"])
if refresh_token is None:
return False
request[KEY_HASS_USER] = refresh_token.user
request[KEY_HASS_REFRESH_TOKEN_ID] = refresh_token.id
return True
@middleware
async def auth_middleware(
request: Request, handler: Callable[[Request], Awaitable[StreamResponse]]
) -> StreamResponse:
"""Authenticate as middleware."""
authenticated = False
if hdrs.AUTHORIZATION in request.headers and async_validate_auth_header(
request
):
authenticated = True
auth_type = "bearer token"
# We first start with a string check to avoid parsing query params
# for every request.
elif (
request.method == "GET"
and SIGN_QUERY_PARAM in request.query_string
and async_validate_signed_request(request)
):
authenticated = True
auth_type = "signed request"
if (
not authenticated
and strict_connection_mode_non_cloud is not StrictConnectionMode.DISABLED
and not request.path.startswith(STRICT_CONNECTION_EXCLUDED_PATH)
and not await hass.auth.session.async_validate_request_for_strict_connection_session(
request
)
and (
resp := _async_perform_action_on_non_local(
request, strict_connection_static_file_content
)
)
is not None
):
return resp
if authenticated and _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"Authenticated %s for %s using %s",
request.remote,
request.path,
auth_type,
)
request[KEY_AUTHENTICATED] = authenticated
return await handler(request)
app.middlewares.append(session_middleware(HomeAssistantCookieStorage(hass)))
app.middlewares.append(auth_middleware)
@callback
def _async_perform_action_on_non_local(
request: Request,
strict_connection_static_file_content: str | None,
) -> StreamResponse | None:
"""Perform strict connection mode action if the request is not local.
The function does the following:
- Try to get the IP address of the request. If it fails, assume it's not local
- If the request is local, return None (allow the request to continue)
- If strict_connection_static_file_content is set, return a response with the content
- Otherwise close the connection and raise an exception
"""
try:
ip_address_ = ip_address(request.remote) # type: ignore[arg-type]
except ValueError:
_LOGGER.debug("Invalid IP address: %s", request.remote)
ip_address_ = None
if ip_address_ and is_local(ip_address_):
return None
_LOGGER.debug("Perform strict connection action for %s", ip_address_)
if strict_connection_static_file_content:
return Response(
text=strict_connection_static_file_content,
content_type="text/html",
status=HTTPStatus.IM_A_TEAPOT,
)
if transport := request.transport:
# it should never happen that we don't have a transport
transport.close()
# We need to raise an exception to stop processing the request
raise HTTPBadRequest