From 5bb3f21213665de4de2c599cde7b2153e108bc27 Mon Sep 17 00:00:00 2001 From: Chris Wagner Date: Thu, 25 Jan 2024 14:48:27 -0800 Subject: [PATCH 1/2] Improve CSRF Added testing to /tf-setup - there wasn't any CSRF issue - all working. CSRF handling is complex and there are few unit tests. - Added @pytest.mark.csrf to make it easier to turn on and test CSRF w/o lots of boilerplate - Added tests and improved many templates to show CSRF errors - mostly for developers - but otherwise CSRF errors tent do just disappear and are difficult to debug - Found issue with WTFforms with the new form-level errors - it uses a `None` key - which, if there are multiple errors, isn't sortable by Flasks default JSON serializer. Filed issue and now change if from `None` to "" - Fixed issue in webauthn with CSRF errors causing exceptions - added tests. - In the case of CSRFprotect() (the app configuring CSRF for the entire app) a CSRF error would raise an exception which would always return an HTML response - added code to return a JSON response if desired. closes #905 --- .git-blame-ignore-revs | 2 + CHANGES.rst | 10 +- docs/patterns.rst | 3 +- flask_security/decorators.py | 42 +++++++-- .../templates/security/change_password.html | 4 +- .../templates/security/forgot_password.html | 4 +- .../templates/security/reset_password.html | 4 +- .../templates/security/two_factor_setup.html | 4 +- .../templates/security/wan_signin.html | 4 +- flask_security/utils.py | 50 +++++----- flask_security/views.py | 1 + flask_security/webauthn.py | 10 +- pytest.ini | 1 + tests/conftest.py | 17 +++- tests/test_changeable.py | 48 ++++++++++ tests/test_common.py | 64 +++++++++++-- tests/test_csrf.py | 93 +++++++------------ tests/test_misc.py | 8 +- tests/test_oauthglue.py | 2 +- tests/test_recoverable.py | 31 +++++++ tests/test_registerable.py | 2 +- tests/test_two_factor.py | 36 +++++++ tests/test_unified_signin.py | 12 +-- tests/test_utils.py | 10 +- tests/test_webauthn.py | 61 +++++++++++- 25 files changed, 399 insertions(+), 124 deletions(-) diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 361578b7..aa3ca69c 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -1,2 +1,4 @@ # Black 721d31e9cb02af22e3ad9d579b7b82123527fafe +# Black 2024 +af6e7176eb54e7e9de77dd6e2cba03630c13f14e diff --git a/CHANGES.rst b/CHANGES.rst index c998a3da..f333413e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -32,8 +32,10 @@ Fixes - (:issue:`884`) Oauth re-used POST_LOGIN_VIEW which caused confusion. See below for the new configuration and implications. - (:pr:`899`) Improve (and simplify) Two-Factor setup. See below for backwards compatability issues and new functionality. - (:pr:`901`) Work with py_webauthn 2.0 -- (:pr:`xxx`) Remove undocumented and untested looking in session for possible 'next' +- (:pr:`906`) Remove undocumented and untested looking in session for possible 'next' redirect location. +- (:pr:`xxx`) Improve CSRF documentation and testing. Fix bug where a CSRF failure could + return an HTML page even if the request was JSON. Notes ++++++ @@ -101,6 +103,12 @@ Backwards Compatibility Concerns This implementation is independent of Werkzeug (and relative Location headers are again the default). The entire regex option has been removed. Instead, any user-supplied path used as a redirect is parsed and quoted. +- JSON error response has changed due to issue with WTForms form-level errors. When WTForms + introduced form-level errors they added it to the form.errors response using `None` as a key. + When serializing it, it would turn into "null". However, if there is more than one error + the default settings for JSON serialization in Flask attempt to sort the keys - which fails + with the `None` key. An issue has been filed with WTForms - and maybe it will be changed. + Flask-Security now changes any `None` key to `""`. Version 5.3.3 ------------- diff --git a/docs/patterns.rst b/docs/patterns.rst index 38bc3821..e0c5c521 100644 --- a/docs/patterns.rst +++ b/docs/patterns.rst @@ -212,6 +212,7 @@ Flask-Security strives to support various options for both its endpoints (e.g. ` and the application endpoints (protected with Flask-Security decorators such as :func:`.auth_required`). If your application just uses forms that are derived from ``Flask-WTF::Flaskform`` - you are done. +Note that all of Flask-Security's endpoints are form based (regardless of how the request was made). CSRF: Single-Page-Applications and AJAX/XHR @@ -387,7 +388,7 @@ CSRF: Pro-Tips (or clients must use CSRF/session cookie for logging in then once they have an authentication token, no further need for cookie). - #) If you enable CSRFProtect(app) and you want to support non-form based JSON requests, + #) If you enable CSRFProtect(app) and you want to send request data as JSON, then you must include the CSRF token in the header (e.g. X-CSRF-Token) #) You must enable CSRFProtect(app) if you want to accept the CSRF token in the request diff --git a/flask_security/decorators.py b/flask_security/decorators.py index 31064802..54c65529 100644 --- a/flask_security/decorators.py +++ b/flask_security/decorators.py @@ -5,10 +5,12 @@ Flask-Security decorators module :copyright: (c) 2012-2019 by Matt Wright. - :copyright: (c) 2019-2023 by J. Christopher Wagner (jwag). + :copyright: (c) 2019-2024 by J. Christopher Wagner (jwag). :license: MIT, see LICENSE for more details. """ +from __future__ import annotations + from collections import namedtuple import datetime from functools import wraps @@ -39,6 +41,9 @@ url_for_security, ) +if t.TYPE_CHECKING: # pragma: no cover + from flask.typing import ResponseValue + # Convenient references _csrf = LocalProxy(lambda: current_app.extensions["csrf"]) @@ -203,14 +208,14 @@ def _check_http_auth(): return False -def handle_csrf(method: t.Optional[str]) -> None: +def handle_csrf(method: str, json_response: bool = False) -> ResponseValue | None: """Invoke CSRF protection based on authentication method. Usually this is called as part of a decorator, but if that isn't appropriate, endpoint code can call this directly. If CSRF protection is appropriate, this will call flask_wtf::protect() which - will raise a ValidationError on CSRF failure. + will raise a CSRFError(BadRequest) on CSRF failure. This routine does nothing if any of these are true: @@ -220,10 +225,18 @@ def handle_csrf(method: t.Optional[str]) -> None: #) csrfProtect already checked and accepted the token + This means in the default config - CSRF is done as part of form validation + not here. Only if the application calls CSRFProtect(app) will this method + do anything. Furthermore - since this is called PRIOR to form instantiation + if the request is JSON - it MUST send the csrf_token as a header. + If the passed in method is not in *SECURITY_CSRF_PROTECT_MECHANISMS* then not only will no CSRF code be run, but a flag in the current context ``fs_ignore_csrf`` will be set so that downstream code knows to ignore any CSRF checks. + Returns None if all ok, returns a Response with JSON error if request + wanted JSON - else re-raises the CSRFError exception. + .. versionadded:: 3.3.0 """ if ( @@ -231,13 +244,20 @@ def handle_csrf(method: t.Optional[str]) -> None: or not current_app.extensions.get("csrf", None) or g.get("csrf_valid", False) ): - return + return None if config_value("CSRF_PROTECT_MECHANISMS"): if method in config_value("CSRF_PROTECT_MECHANISMS"): - _csrf.protect() # type: ignore + try: + _csrf.protect() # type: ignore + except CSRFError as e: + if json_response: + payload = json_error_response(errors=e.description) + return _security._render_json(payload, 400, None, None) + raise else: set_request_attr("fs_ignore_csrf", True) + return None def http_auth_required(realm: t.Any) -> DecoratedView: @@ -255,7 +275,9 @@ def decorator(fn): @wraps(fn) def wrapper(*args, **kwargs): if _check_http_auth(): - handle_csrf("basic") + eresponse = handle_csrf("basic", _security._want_json(request)) + if eresponse: + return eresponse set_request_attr("fs_authn_via", "basic") return current_app.ensure_sync(fn)(*args, **kwargs) r = _security.default_http_auth_realm if callable(realm) else realm @@ -282,7 +304,9 @@ def auth_token_required(fn: DecoratedView) -> DecoratedView: @wraps(fn) def decorated(*args, **kwargs): if _check_token(): - handle_csrf("token") + eresponse = handle_csrf("token", _security._want_json(request)) + if eresponse: + return eresponse set_request_attr("fs_authn_via", "token") return current_app.ensure_sync(fn)(*args, **kwargs) return _security._unauthn_handler(["token"]) @@ -407,7 +431,9 @@ def decorated_view( # in a session cookie... if not check_and_update_authn_fresh(within, grace, method): return _security._reauthn_handler(within, grace) - handle_csrf(method) + eresponse = handle_csrf(method, _security._want_json(request)) + if eresponse: + return eresponse set_request_attr("fs_authn_via", method) return current_app.ensure_sync(fn)(*args, **kwargs) return _security._unauthn_handler(ams, headers=h) diff --git a/flask_security/templates/security/change_password.html b/flask_security/templates/security/change_password.html index a34632cb..405de1e6 100644 --- a/flask_security/templates/security/change_password.html +++ b/flask_security/templates/security/change_password.html @@ -1,11 +1,12 @@ {% extends "security/base.html" %} -{% from "security/_macros.html" import render_field_with_errors, render_field %} +{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_errors, render_form_errors %} {% block content %} {% include "security/_messages.html" %}

{{ _fsdomain('Change password') }}

{{ change_password_form.hidden_tag() }} + {{ render_form_errors(change_password_form) }} {% if active_password %} {{ render_field_with_errors(change_password_form.password) }} {% else %} @@ -13,6 +14,7 @@

{{ _fsdomain('You do not currently have a password - this will add one.') }} {% endif %} {{ render_field_with_errors(change_password_form.new_password) }} {{ render_field_with_errors(change_password_form.new_password_confirm) }} + {{ render_field_errors(change_password_form.csrf_token) }} {{ render_field(change_password_form.submit) }} {% endblock content %} diff --git a/flask_security/templates/security/forgot_password.html b/flask_security/templates/security/forgot_password.html index 1b07a09d..f5fe04db 100644 --- a/flask_security/templates/security/forgot_password.html +++ b/flask_security/templates/security/forgot_password.html @@ -1,12 +1,14 @@ {% extends "security/base.html" %} -{% from "security/_macros.html" import render_field_with_errors, render_field %} +{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_errors, render_form_errors %} {% block content %} {% include "security/_messages.html" %}

{{ _fsdomain('Send password reset instructions') }}

{{ forgot_password_form.hidden_tag() }} + {{ render_form_errors(forgot_password_form) }} {{ render_field_with_errors(forgot_password_form.email) }} + {{ render_field_errors(forgot_password_form.csrf_token) }} {{ render_field(forgot_password_form.submit) }}
{% include "security/_menu.html" %} diff --git a/flask_security/templates/security/reset_password.html b/flask_security/templates/security/reset_password.html index 1a9b593e..7cd7ef9c 100644 --- a/flask_security/templates/security/reset_password.html +++ b/flask_security/templates/security/reset_password.html @@ -1,13 +1,15 @@ {% extends "security/base.html" %} -{% from "security/_macros.html" import render_field_with_errors, render_field %} +{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_errors, render_form_errors %} {% block content %} {% include "security/_messages.html" %}

{{ _fsdomain('Reset password') }}

{{ reset_password_form.hidden_tag() }} + {{ render_form_errors(reset_password_form) }} {{ render_field_with_errors(reset_password_form.password) }} {{ render_field_with_errors(reset_password_form.password_confirm) }} + {{ render_field_errors(reset_password_form.csrf_token) }} {{ render_field(reset_password_form.submit) }}
{% include "security/_menu.html" %} diff --git a/flask_security/templates/security/two_factor_setup.html b/flask_security/templates/security/two_factor_setup.html index 5cf3d01c..8388e74d 100644 --- a/flask_security/templates/security/two_factor_setup.html +++ b/flask_security/templates/security/two_factor_setup.html @@ -17,7 +17,7 @@ #} {% extends "security/base.html" %} -{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_no_label, render_field_errors %} +{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_no_label, render_field_errors, render_form_errors %} {% block content %} {% include "security/_messages.html" %} @@ -25,6 +25,7 @@

{{ _fsdomain("Two-factor authentication adds an extra layer of security to y

{{ _fsdomain("In addition to your username and password, you'll need to use a code.") }}

{{ two_factor_setup_form.hidden_tag() }} + {{ render_form_errors(two_factor_setup_form) }}
{{ _fsdomain("Currently setup two-factor method: %(method)s", method=primary_method) }}

{% for subfield in two_factor_setup_form.setup %} @@ -37,6 +38,7 @@

{{ _fsdomain("In addition to your username and password, you'll need to use
{{ render_field_errors(two_factor_setup_form.setup) }} + {{ render_field_errors(two_factor_setup_form.csrf_token) }} {{ render_field(two_factor_setup_form.submit) }}
{% if chosen_method=="authenticator" %} diff --git a/flask_security/templates/security/wan_signin.html b/flask_security/templates/security/wan_signin.html index 05b486d5..5ee9102f 100644 --- a/flask_security/templates/security/wan_signin.html +++ b/flask_security/templates/security/wan_signin.html @@ -3,7 +3,7 @@ #} {% extends "security/base.html" %} -{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_errors, prop_next %} +{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_errors, render_form_errors, prop_next %} {% block head_scripts %} {{ super() }} @@ -21,11 +21,13 @@

{{ _fsdomain("Use Your WebAuthn Security Key as a Second Factor") }}

{% if not credential_options %} {{ wan_signin_form.hidden_tag() }} + {{ render_form_errors(wan_signin_form) }} {% if not is_secondary %} {{ render_field_with_errors(wan_signin_form.identity) }} {{ render_field_with_errors(wan_signin_form.remember) }} {% endif %} {{ render_field_errors(wan_signin_form.credential) }} + {{ render_field_errors(wan_signin_form.csrf_token) }} {{ render_field(wan_signin_form.submit) }}
{% else %} diff --git a/flask_security/utils.py b/flask_security/utils.py index 49c0a1ad..24e348ca 100644 --- a/flask_security/utils.py +++ b/flask_security/utils.py @@ -9,6 +9,8 @@ :license: MIT, see LICENSE for more details. """ +from __future__ import annotations + import abc import base64 from datetime import datetime, timedelta, timezone @@ -52,9 +54,6 @@ from flask.typing import ResponseValue from .datastore import User -SB = t.Union[str, bytes] - - localize_callback = LocalProxy(lambda: _security.i18n_domain.gettext) FsPermNeed = partial(Need, "fsperm") @@ -139,7 +138,7 @@ def find_csrf_field_name(): return None -def is_user_authenticated(user: "User") -> bool: +def is_user_authenticated(user: User) -> bool: """ return True is user is authenticated. @@ -159,9 +158,9 @@ def is_user_authenticated(user: "User") -> bool: def login_user( - user: "User", - remember: t.Optional[bool] = None, - authn_via: t.Optional[t.List[str]] = None, + user: User, + remember: bool | None = None, + authn_via: list[str] | None = None, ) -> bool: """Perform the login routine. @@ -324,7 +323,7 @@ def check_and_update_authn_fresh( return False -def get_hmac(password: SB) -> bytes: +def get_hmac(password: str | bytes) -> bytes: """Returns a Base64 encoded HMAC+SHA512 of the password signed with the salt specified by *SECURITY_PASSWORD_SALT*. @@ -343,7 +342,7 @@ def get_hmac(password: SB) -> bytes: return base64.b64encode(h.digest()) -def verify_password(password: SB, password_hash: SB) -> bool: +def verify_password(password: str | bytes, password_hash: str | bytes) -> bool: """Returns ``True`` if the password matches the supplied hash. :param password: A plaintext password to verify @@ -359,7 +358,7 @@ def verify_password(password: SB, password_hash: SB) -> bool: return _pwd_context.verify(password, password_hash) -def verify_and_update_password(password: SB, user: "User") -> bool: +def verify_and_update_password(password: str | bytes, user: User) -> bool: """Returns ``True`` if the password is valid for the specified user. Additionally, the hashed password in the database is updated if the @@ -390,7 +389,7 @@ def verify_and_update_password(password: SB, user: "User") -> bool: return verified -def hash_password(password: SB) -> t.Any: +def hash_password(password: str | bytes) -> str: """Hash the specified plaintext password. Unless the hash algorithm (as specified by `SECURITY_PASSWORD_HASH`) is listed in @@ -683,7 +682,7 @@ def simplify_url(base_url: str, redirect_url: str) -> str: return redirect_url -def get_config(app: "Flask") -> t.Dict[str, t.Any]: +def get_config(app: Flask) -> t.Dict[str, t.Any]: """Conveniently get the security configuration for the specified application without the annoying 'SECURITY_' prefix. @@ -857,7 +856,7 @@ def check_and_get_token_status( return expired, invalid, data -def get_identity_attributes(app: t.Optional["Flask"] = None) -> t.List[str]: +def get_identity_attributes(app: t.Optional[Flask] = None) -> t.List[str]: # Return list of keys of identity attributes # Is it possible to not have any? app = app or current_app @@ -868,7 +867,7 @@ def get_identity_attributes(app: t.Optional["Flask"] = None) -> t.List[str]: def get_identity_attribute( - attr: str, app: t.Optional["Flask"] = None + attr: str, app: t.Optional[Flask] = None ) -> t.Dict[str, t.Any]: """Given an user_identity_attribute, return the defining dict. A bit annoying since USER_IDENTITY_ATTRIBUTES is a list of dict @@ -957,7 +956,7 @@ def use_double_hash(password_hash=None): return not (single_hash is True or scheme in single_hash) -def csrf_cookie_handler(response: "Response") -> "Response": +def csrf_cookie_handler(response: Response) -> Response: """Called at end of every request. Uses session to track state (set/clear) @@ -1026,12 +1025,12 @@ def csrf_cookie_handler(response: "Response") -> "Response": def base_render_json( - form: "FlaskForm", + form: FlaskForm, include_user: bool = True, include_auth_token: bool = False, additional: t.Optional[t.Dict[str, t.Any]] = None, error_status_code: int = 400, -) -> "ResponseValue": +) -> ResponseValue: """ This method is called by all views that return JSON responses. This fills in the response and then calls :meth:`.Security.render_json` @@ -1079,7 +1078,7 @@ def base_render_json( def simple_render_json( additional: t.Optional[t.Dict[str, t.Any]] = None, -) -> "ResponseValue": +) -> ResponseValue: payload = dict(csrf_token=csrf.generate_csrf()) if additional: payload.update(additional) @@ -1107,7 +1106,7 @@ def default_want_json(req): def json_error_response( errors: t.Optional[t.Union[str, list]] = None, - field_errors: t.Optional[t.Dict[str, list]] = None, + field_errors: t.Optional[t.Dict[str | None, list]] = None, ) -> t.Dict[str, t.Any]: """Helper to create an error response. @@ -1117,7 +1116,9 @@ def json_error_response( The "field_errors" key which is exactly what is returned from WTForms - namely a dict of field-name: msg. For form-level errors (WTForms 3.0) the 'field-name' is - 'None' + None - which alas means it isn't sortable and Flask's default JSONProvider + sorts keys - so we change that to '__all__' which is what django uses + apparently and was suggested as part of WTForms 3.0. """ response_json: t.Dict[str, t.Union[list, t.Dict[str, list]]] = dict() plain_errors = [] @@ -1133,7 +1134,14 @@ def json_error_response( # we return that, as well as create a simple list of errors. for e in field_errors.values(): plain_errors.extend(e) - response_json["field_errors"] = field_errors + if None in field_errors.keys(): + # Ugh - wtforms decided to use None as a key - which json + # a) can't sort + # b) converts to "null" + # Issue filed - maybe they will change it + field_errors[""] = field_errors[None] + del field_errors[None] + response_json["field_errors"] = field_errors # type: ignore response_json["errors"] = plain_errors return response_json diff --git a/flask_security/views.py b/flask_security/views.py index a83b18f8..f48923c0 100644 --- a/flask_security/views.py +++ b/flask_security/views.py @@ -767,6 +767,7 @@ def two_factor_setup(): else: # Caller is changing their TFA profile. This requires a 'fresh' authentication + # N.B unauth_csrf has done the CSRF check already. if not check_and_update_authn_fresh( cv("FRESHNESS"), cv("FRESHNESS_GRACE_PERIOD"), diff --git a/flask_security/webauthn.py b/flask_security/webauthn.py index 5ea08b6e..103d9cb5 100644 --- a/flask_security/webauthn.py +++ b/flask_security/webauthn.py @@ -560,8 +560,9 @@ def webauthn_register_response(token: str) -> "ResponseValue": if _security._want_json(request): return base_render_json(form) - if len(form.errors) > 0: - do_flash(form.errors["credential"][0], "error") + if form.errors: + for v in form.errors.values(): + do_flash(v[0], "error") return redirect(url_for_security("wan_register")) @@ -742,8 +743,9 @@ def webauthn_signin_response(token: str) -> "ResponseValue": # Since the response is auto submitted - we go back to # signin form - for now use flash. - if form.credential.errors: - do_flash(form.credential.errors[0], "error") + if form.errors: + for v in form.errors.values(): + do_flash(v[0], "error") return redirect(url_for_security("wan_signin")) diff --git a/pytest.ini b/pytest.ini index 8c6a5b42..14552c89 100644 --- a/pytest.ini +++ b/pytest.ini @@ -15,6 +15,7 @@ markers = unified_signin webauthn flask_async + csrf filterwarnings = error diff --git a/tests/conftest.py b/tests/conftest.py index 72610c67..83c5d31d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,6 +22,7 @@ from flask import Flask, Response, jsonify, render_template from flask import request as flask_request from flask_mailman import Mail +from flask_wtf import CSRFProtect from flask_security import ( MongoEngineUserDatastore, @@ -175,6 +176,20 @@ def app(request: pytest.FixtureRequest) -> "SecurityFixture": raise pytest.skip("Requires Babel") Babel(app) + csrf = marker_getter("csrf") + if csrf is not None: + # without any keys/arguments - this is the default config + # Note that WTF_CSRF_CHECK_DEFAULT = True means Flask_wtf will + # run a CSRF check as part of @before_request - before we see it. + app.config["WTF_CSRF_ENABLED"] = True + if "ignore_unauth" in csrf.kwargs.keys(): + app.config["WTF_CSRF_CHECK_DEFAULT"] = False + app.config["SECURITY_CSRF_IGNORE_UNAUTH_ENDPOINTS"] = True + if "csrfprotect" in csrf.kwargs.keys(): + # This is needed when passing CSRF in header or non-form input + app.config["WTF_CSRF_CHECK_DEFAULT"] = False + CSRFProtect(app) + @app.route("/") def index(): return render_template("index.html", content="Home Page") @@ -193,7 +208,7 @@ def profile(): def post_login(): return render_template("index.html", content="Post Login") - @app.route("/http") + @app.route("/http", methods=["GET", "POST"]) @http_auth_required def http(): return "HTTP Authentication" diff --git a/tests/test_changeable.py b/tests/test_changeable.py index 25042a49..64e87b16 100644 --- a/tests/test_changeable.py +++ b/tests/test_changeable.py @@ -21,7 +21,9 @@ from flask_security.utils import localize_callback from tests.test_utils import ( authenticate, + check_location, check_xlation, + get_form_input, get_session, hash_password, init_app_with_options, @@ -666,3 +668,49 @@ def test_pwd_no_normalize(app, client): headers={"Content-Type": "application/json"}, ) assert response.status_code == 200 + + +@pytest.mark.csrf(ignore_unauth=True) +@pytest.mark.settings(post_change_view="/post_change_view") +def test_csrf(app, client): + # enable CSRF, make sure template shows CSRF errors. + authenticate(client) + data = { + "password": "password", + "new_password": "new strong password", + "new_password_confirm": "new strong password", + } + response = client.post("/change", data=data) + assert b"The CSRF token is missing" in response.data + # Note that we get a CSRF token EVEN for errors - this seems odd + # but can't find anything that says its a security issue + csrf_token = get_form_input(response, "csrf_token") + + data["csrf_token"] = csrf_token + response = client.post("/change", data=data) + assert check_location(app, response.location, "/post_change_view") + + +@pytest.mark.csrf(ignore_unauth=True, csrfprotect=True) +def test_csrf_json(app, client): + # This tests the handle_csrf code path - especially the JSON code path + # that should return a JSON response! + authenticate(client) + data = { + "password": "password", + "new_password": "new strong password", + "new_password_confirm": "new strong password", + } + response = client.post("/change", json=data) + assert response.status_code == 400 + assert response.json["response"]["errors"][0] == "The CSRF token is missing." + + # check form path also + response = client.post("/change", data=data) + assert response.status_code == 400 + assert b"The CSRF token is missing." in response.data + + response = client.get("/change", content_type="application/json") + csrf_token = response.json["response"]["csrf_token"] + response = client.post("/change", json=data, headers={"X-CSRF-Token": csrf_token}) + assert response.status_code == 200 diff --git a/tests/test_common.py b/tests/test_common.py index 2b51027e..68d4da10 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -25,6 +25,7 @@ check_location, get_auth_token_version_3x, get_form_action, + get_form_input, get_num_queries, hash_password, json_authenticate, @@ -258,9 +259,9 @@ def test_generic_response(app, client, get_message): response = client.post( "/login", json=dict(email="mattwho@lp.com", password="forgot") ) - # make sure just 'null' key in errors. - assert list(response.json["response"]["field_errors"].keys()) == ["null"] - assert len(response.json["response"]["field_errors"]["null"]) == 1 + # make sure no field error key. + assert list(response.json["response"]["field_errors"].keys()) == [""] + assert len(response.json["response"]["field_errors"][""]) == 1 assert response.json["response"]["errors"][0].encode("utf-8") == get_message( "GENERIC_AUTHN_FAILED" ) @@ -299,9 +300,9 @@ def test_generic_response_username(app, client, get_message): assert get_message("GENERIC_AUTHN_FAILED") in response.data response = client.post("/login", json=dict(username="dude2", password="forgot")) - # make sure just 'null' key in errors. - assert list(response.json["response"]["field_errors"].keys()) == ["null"] - assert len(response.json["response"]["field_errors"]["null"]) == 1 + # make sure no field error key. + assert list(response.json["response"]["field_errors"].keys()) == [""] + assert len(response.json["response"]["field_errors"][""]) == 1 assert response.json["response"]["errors"][0].encode("utf-8") == get_message( "GENERIC_AUTHN_FAILED" ) @@ -595,6 +596,29 @@ def test_token_auth_invalid_for_session_auth(client): assert response.status_code == 401 +@pytest.mark.csrf(ignore_unauth=True, csrfprotect=True) +def test_token_auth_csrf(client): + response = json_authenticate(client) + token = response.json["response"]["user"]["authentication_token"] + csrf_token = response.json["response"]["csrf_token"] + headers = {"Authentication-Token": token} + response = client.post("/token", headers=headers) + assert b"The CSRF token is missing" in response.data + + # test JSON version + response = client.post("/token", headers=headers, content_type="application/json") + assert response.status_code == 400 + assert response.json["response"]["errors"][0] == "The CSRF token is missing." + + # now do it right + headers["X-CSRF-Token"] = csrf_token + response = client.post( + "/token", + headers=headers, + ) + assert b"Token Authentication" in response.data + + def test_http_auth(client, get_message): # browsers expect 401 response with WWW-Authenticate header - which will prompt # them to pop up a login form. @@ -730,6 +754,34 @@ def test_custom_http_auth_realm(client, get_message): assert 'Basic realm="My Realm"' == response.headers["WWW-Authenticate"] +@pytest.mark.csrf(csrfprotect=True) +def test_http_auth_csrf(client, get_message): + headers = { + "Authorization": "Basic %s" + % base64.b64encode(b"joe@lp.com:password").decode("utf-8") + } + response = client.post( + "/http", + headers=headers, + ) + assert b"The CSRF token is missing" in response.data + + # test JSON version + response = client.post("/http", headers=headers, content_type="application/json") + assert response.status_code == 400 + assert response.json["response"]["errors"][0] == "The CSRF token is missing." + + # grab a csrf_token + response = client.get("/login") + csrf_token = get_form_input(response, "csrf_token") + headers["X-CSRF-Token"] = csrf_token + response = client.post( + "/http", + headers=headers, + ) + assert b"HTTP Authentication" in response.data + + def test_multi_auth_basic(client): response = client.get( "/multi_auth", diff --git a/tests/test_csrf.py b/tests/test_csrf.py index 778980af..f944d50d 100644 --- a/tests/test_csrf.py +++ b/tests/test_csrf.py @@ -4,7 +4,7 @@ CSRF tests - :copyright: (c) 2019-2023 by J. Christopher Wagner (jwag). + :copyright: (c) 2019-2024 by J. Christopher Wagner (jwag). :license: MIT, see LICENSE for more details. """ @@ -17,7 +17,7 @@ from flask_wtf import CSRFProtect from flask_security import Security, hash_password -from tests.test_utils import get_session, logout +from tests.test_utils import get_form_input, get_session, logout REAL_VALIDATE_CSRF = None @@ -79,10 +79,7 @@ def json_login( use_header=False, remember=None, ): - """Return tuple (auth_token, csrf_token) - Note that since this is sent as JSON rather than form that csrfProtect - won't find token value (since it looks in request.form). - """ + # Return tuple (auth_token, csrf_token) csrf_token = _get_csrf_token(client) data = dict(email=email, password=password, remember=remember) @@ -110,16 +107,15 @@ def json_logout(client): return response +@pytest.mark.csrf() def test_login_csrf(app, client): - app.config["WTF_CSRF_ENABLED"] = True - # This shouldn't log in - but return login form with csrf token. data = dict(email="matt@lp.com", password="password", remember="y") response = client.post("/login", data=data) assert response.status_code == 200 - assert b"csrf_token" in response.data + assert b"The CSRF token is missing." in response.data - data["csrf_token"] = _get_csrf_token(client) + data["csrf_token"] = get_form_input(response, "csrf_token") response = client.post("/login", data=data, follow_redirects=True) assert response.status_code == 200 assert b"Welcome matt" in response.data @@ -152,9 +148,8 @@ def test_login_csrf_double(app, client): assert b"Welcome matt" in response.data +@pytest.mark.csrf() def test_login_csrf_json(app, client): - app.config["WTF_CSRF_ENABLED"] = True - with mp_validate_csrf() as mp: auth_token, csrf_token = json_login(client) assert auth_token @@ -167,19 +162,13 @@ def test_login_csrf_json(app, client): assert "csrf_token" not in session -def test_login_csrf_json_header(app, sqlalchemy_datastore): - app.config["WTF_CSRF_ENABLED"] = True - CSRFProtect(app) - app.security = Security(app=app, datastore=sqlalchemy_datastore) - create_user(app) - - client = app.test_client() - +@pytest.mark.csrf(csrfprotect=True) +def test_login_csrf_json_header(app, client): with mp_validate_csrf() as mp: auth_token, csrf_token = json_login(client, use_header=True) assert auth_token assert csrf_token - assert mp.success == 2 and mp.failure == 0 + assert mp.success == 1 and mp.failure == 0 json_logout(client) @@ -216,21 +205,27 @@ def test_login_csrf_unauth_double(app, client, get_message): ) +@pytest.mark.csrf() @pytest.mark.recoverable() def test_reset(app, client): """Test that form-based CSRF works for /reset""" - app.config["WTF_CSRF_ENABLED"] = True + response = client.get("/reset", content_type="application/json") + csrf_token = response.json["response"]["csrf_token"] with mp_validate_csrf() as mp: data = dict(email="matt@lp.com") # should fail - no CSRF token response = client.post("/reset", content_type="application/json", json=data) assert response.status_code == 400 + assert response.json["response"]["errors"][0] == "The CSRF token is missing." + # test template also has error + response = client.post("/reset", data=data) + assert b"The CSRF token is missing" in response.data - data["csrf_token"] = _get_csrf_token(client) + data["csrf_token"] = csrf_token response = client.post("/reset", content_type="application/json", json=data) assert response.status_code == 200 - assert mp.success == 1 and mp.failure == 1 + assert mp.success == 1 and mp.failure == 2 @pytest.mark.recoverable() @@ -247,6 +242,7 @@ def test_cp_reset(app, client): # should fail - no CSRF token response = client.post("/reset", content_type="application/json", json=data) assert response.status_code == 400 + assert response.json["response"]["errors"][0] == "The CSRF token is missing." csrf_token = _get_csrf_token(client) response = client.post( @@ -262,15 +258,11 @@ def test_cp_reset(app, client): @pytest.mark.changeable() -def test_cp_with_token(app, sqlalchemy_datastore): +@pytest.mark.csrf(csrfprotect=True) +def test_cp_with_token(app, client): # Make sure can use returned CSRF-Token in Header. - app.config["WTF_CSRF_ENABLED"] = True - CSRFProtect(app) - app.security = Security(app=app, datastore=sqlalchemy_datastore) - create_user(app) - - client = app.test_client() - + # Since the csrf token isn't in the form - must enable app-wide CSRF + # using CSRFProtect() - as the above mark does. auth_token, csrf_token = json_login(client, use_header=True) # make sure returned csrf_token works in header. @@ -443,17 +435,11 @@ def test_csrf_cookie(app, sqlalchemy_datastore): assert not client.get_cookie("XSRF-Token") +@pytest.mark.csrf(csrfprotect=True) @pytest.mark.settings(CSRF_COOKIE={"key": "XSRF-Token"}) @pytest.mark.changeable() -def test_cp_with_token_cookie(app, sqlalchemy_datastore): - # Make sure can use returned CSRF-Token cookie in Header when changing password. - app.config["WTF_CSRF_ENABLED"] = True - CSRFProtect(app) - app.security = Security(app=app, datastore=sqlalchemy_datastore) - create_user(app) - - client = app.test_client() - +def test_cp_with_token_cookie(app, client): + # Make sure can use returned CSRF-Token cookie in Header when changing password json_login(client, use_header=True) # make sure returned csrf_token works in header. @@ -518,19 +504,13 @@ def test_cp_with_token_cookie_expire(app, sqlalchemy_datastore): assert not client.get_cookie("XSRF-Token") +@pytest.mark.csrf(csrfprotect=True) @pytest.mark.settings( CSRF_COOKIE_NAME="XSRF-Token", CSRF_COOKIE_REFRESH_EACH_REQUEST=True ) @pytest.mark.changeable() -def test_cp_with_token_cookie_refresh(app, sqlalchemy_datastore): +def test_cp_with_token_cookie_refresh(app, client): # Test CSRF_COOKIE_REFRESH_EACH_REQUEST - app.config["WTF_CSRF_ENABLED"] = True - CSRFProtect(app) - app.security = Security(app=app, datastore=sqlalchemy_datastore) - create_user(app) - - client = app.test_client() - json_login(client, use_header=True) # make sure returned csrf_token works in header. @@ -565,25 +545,14 @@ def test_cp_with_token_cookie_refresh(app, sqlalchemy_datastore): assert not client.get_cookie("XSRF-Token") +@pytest.mark.csrf(csrfprotect=True) @pytest.mark.settings(CSRF_COOKIE_NAME="XSRF-Token") @pytest.mark.changeable() -def test_remember_login_csrf_cookie(app, sqlalchemy_datastore): +def test_remember_login_csrf_cookie(app, client): # Test csrf cookie upon resuming a remember session - app.config["WTF_CSRF_ENABLED"] = True - CSRFProtect(app) - app.security = Security(app=app, datastore=sqlalchemy_datastore) - create_user(app) - - client = app.test_client() - # Login with remember_token generation json_login(client, use_header=True, remember=True) - # csrf_cookie = [c for c in client.cookie_jar if c.name == "XSRF-Token"][0] - # session_cookie = [c for c in client.cookie_jar if c.name == "session"][0] - # Delete session and csrf cookie - we should always get new ones - # client.delete_cookie(csrf_cookie.domain, csrf_cookie.name) - # client.delete_cookie(session_cookie.domain, session_cookie.name) client.delete_cookie("XSRF-Token") client.delete_cookie("session") diff --git a/tests/test_misc.py b/tests/test_misc.py index ef67a170..0525615a 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -886,10 +886,12 @@ def test_json_form_errors(app, client): """Test wtforms form level errors are correctly sent via json""" with app.test_request_context(): form = ChangePasswordForm() + form.validate() form.form_errors.append("I am an error") response = base_render_json(form) - assert len(response.json["response"]["errors"]) == 1 - assert response.json["response"]["errors"][0] == "I am an error" + error_list = response.json["response"]["errors"] + assert len(error_list) == 3 + assert "I am an error" in error_list def test_method_view(app, client): @@ -1313,7 +1315,7 @@ def test_open_redirect(app, client, get_message): (r"/\github.com", "/%5Cgithub.com"), (r"\/github.com", "%5C/github.com"), ("//github.com", ""), - ("\t//github.com", ""), + ("\t//github.com", "%09//github.com"), ] for i, o in test_urls: data = dict(email="matt@lp.com", password="password", next=i) diff --git a/tests/test_oauthglue.py b/tests/test_oauthglue.py index 8b40b920..c151e4bc 100644 --- a/tests/test_oauthglue.py +++ b/tests/test_oauthglue.py @@ -87,7 +87,7 @@ def test_github(app, sqlalchemy_datastore, get_message): # make sure required CSRF response = client.post(github_url, follow_redirects=False) - assert "The CSRF token is missing" + assert b"The CSRF token is missing" in response.data response = client.post( github_url, data=dict(csrf_token=csrf_token), follow_redirects=False diff --git a/tests/test_recoverable.py b/tests/test_recoverable.py index b0288a83..6cf0d174 100644 --- a/tests/test_recoverable.py +++ b/tests/test_recoverable.py @@ -3,6 +3,9 @@ ~~~~~~~~~~~~~~~~ Recoverable functionality tests + + :copyright: (c) 2019-2024 by J. Christopher Wagner (jwag). + :license: MIT, see LICENSE for more details. """ import re @@ -17,6 +20,8 @@ authenticate, capture_flashes, capture_reset_password_requests, + check_location, + get_form_input, logout, populate_data, ) @@ -761,3 +766,29 @@ async def on_instructions_sent(myapp, **kwargs): ) assert not response.json["response"] assert len(recorded_resets) == 1 + + +@pytest.mark.csrf() +@pytest.mark.settings(post_reset_view="/post_reset_view") +def test_csrf(app, client, get_message): + response = client.get("/reset") + csrf_token = get_form_input(response, "csrf_token") + with capture_reset_password_requests() as requests: + client.post( + "/reset", + data=dict(email="joe@lp.com", csrf_token=csrf_token), + follow_redirects=True, + ) + token = requests[0]["token"] + + # use the token - no CSRF so shouldn't work + data = {"password": "mypassword", "password_confirm": "mypassword"} + response = client.post( + "/reset/" + token, + data=data, + ) + assert b"The CSRF token is missing" in response.data + + data["csrf_token"] = csrf_token + response = client.post(f"/reset/{token}", data=data) + assert check_location(app, response.location, "/post_reset_view") diff --git a/tests/test_registerable.py b/tests/test_registerable.py index 42dd48c1..15ec3c15 100644 --- a/tests/test_registerable.py +++ b/tests/test_registerable.py @@ -516,7 +516,7 @@ def test_username(app, client, get_message): assert response.status_code == 400 assert ( get_message("USER_DOES_NOT_EXIST") - == response.json["response"]["field_errors"]["null"][0].encode() + == response.json["response"]["field_errors"][""][0].encode() ) # login using us-signin diff --git a/tests/test_two_factor.py b/tests/test_two_factor.py index 4d06ab5a..7525b2a0 100644 --- a/tests/test_two_factor.py +++ b/tests/test_two_factor.py @@ -30,6 +30,7 @@ check_location, check_xlation, get_form_action, + get_form_input, get_session, is_authenticated, logout, @@ -1469,3 +1470,38 @@ def test_xlation(app, client, get_message_local): with app.test_request_context(): existing = "Méthode à deux facteurs actuellement configurée : authentificateur" assert markupsafe.escape(existing).encode() in response.data + + +@pytest.mark.csrf(ignore_unauth=True) +@pytest.mark.settings(two_factor_post_setup_view="/post_setup_view") +def test_setup_csrf(app, client): + # Verify /tf-setup properly handles CSRF and template relays CSRF errors + tf_authenticate(app, client) + response = client.get("tf-setup") + assert b"Disable" in response.data + csrf_token = get_form_input(response, "csrf_token") + + response = client.post("tf-setup", data=dict(setup="disable")) + assert b"The CSRF token is missing" in response.data + + response = client.post( + "tf-setup", data=dict(setup="disable", csrf_token=csrf_token) + ) + assert check_location(app, response.location, "/post_setup_view") + + +@pytest.mark.csrf(ignore_unauth=True, csrfprotect=True) +def test_setup_csrf_header(app, client): + # Test that can setup using csrf token in header + tf_authenticate(app, client) + response = client.get("tf-setup", json=dict()) + csrf_token = response.json["response"]["csrf_token"] + + response = client.post("tf-setup", json=dict(setup="disable")) + assert response.status_code == 400 + assert response.json["response"]["errors"][0] == "The CSRF token is missing." + + response = client.post( + "tf-setup", json=dict(setup="disable"), headers={"X-CSRF-Token": csrf_token} + ) + assert response.status_code == 200 diff --git a/tests/test_unified_signin.py b/tests/test_unified_signin.py index b0ba6e00..b9435179 100644 --- a/tests/test_unified_signin.py +++ b/tests/test_unified_signin.py @@ -2030,9 +2030,9 @@ def test_generic_response(app, client, get_message): data = dict(identity="matt@lp.com", code="12345") response = client.post("/us-signin", json=data) assert response.status_code == 400 - assert list(response.json["response"]["field_errors"].keys()) == ["null"] - assert len(response.json["response"]["field_errors"]["null"]) == 1 - assert response.json["response"]["field_errors"]["null"][0].encode( + assert list(response.json["response"]["field_errors"].keys()) == [""] + assert len(response.json["response"]["field_errors"][""]) == 1 + assert response.json["response"]["field_errors"][""][0].encode( "utf-8" ) == get_message("GENERIC_AUTHN_FAILED") assert response.json["response"]["errors"][0].encode("utf-8") == get_message( @@ -2047,9 +2047,9 @@ def test_generic_response(app, client, get_message): data = dict(identity="matt2@lp.com", code="12345") response = client.post("/us-signin", json=data) assert response.status_code == 400 - assert list(response.json["response"]["field_errors"].keys()) == ["null"] - assert len(response.json["response"]["field_errors"]["null"]) == 1 - assert response.json["response"]["field_errors"]["null"][0].encode( + assert list(response.json["response"]["field_errors"].keys()) == [""] + assert len(response.json["response"]["field_errors"][""]) == 1 + assert response.json["response"]["field_errors"][""][0].encode( "utf-8" ) == get_message("GENERIC_AUTHN_FAILED") assert response.json["response"]["errors"][0].encode("utf-8") == get_message( diff --git a/tests/test_utils.py b/tests/test_utils.py index b28cca8e..51583992 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -36,9 +36,17 @@ def authenticate( - client, email="matt@lp.com", password="password", endpoint=None, **kwargs + client, + email="matt@lp.com", + password="password", + endpoint=None, + csrf=False, + **kwargs, ): data = dict(email=email, password=password, remember="y") + if csrf: + response = client.get(endpoint or "/login") + data["csrf_token"] = get_form_input(response, "csrf_token") return client.post(endpoint or "/login", data=data, **kwargs) diff --git a/tests/test_webauthn.py b/tests/test_webauthn.py index 00fee8b6..d2460f4a 100644 --- a/tests/test_webauthn.py +++ b/tests/test_webauthn.py @@ -23,6 +23,7 @@ FakeSerializer, authenticate, capture_flashes, + check_location, get_existing_session, get_form_action, get_form_input, @@ -185,8 +186,12 @@ def origin(self): return "http://localhost:5001" -def _register_start(client, name="testr1", usage="secondary", endpoint="wan-register"): - response = client.post(endpoint, data=dict(name=name, usage=usage)) +def _register_start( + client, name="testr1", usage="secondary", endpoint="wan-register", csrf_token=None +): + response = client.post( + endpoint, data=dict(name=name, usage=usage, csrf_token=csrf_token) + ) matcher = re.match( r".*handleRegister\(\'(.*)\'\).*", response.data.decode("utf-8"), @@ -230,8 +235,11 @@ def _signin_start( client, identity=None, endpoint="wan-signin", + csrf_token=None, ): - response = client.post(endpoint, data=dict(identity=identity)) + response = client.post( + endpoint, data=dict(identity=identity, csrf_token=csrf_token) + ) matcher = re.match( r".*handleSignin\(\'(.*)\'\).*", response.data.decode("utf-8"), @@ -1713,3 +1721,50 @@ async def wan_delete(sender, user, name, **extra_args): response = client.post( "/wan-delete", data=dict(name="testr1"), follow_redirects=True ) + + +@pytest.mark.csrf() +@pytest.mark.settings( + webauthn_util_cls=HackWebauthnUtil, + wan_post_register_view="/done-registration", + post_login_view="/post-login", +) +def test_csrf(app, client, get_message): + response = client.get("/login") + csrf_token = get_form_input(response, "csrf_token") + authenticate(client, csrf=True) + + register_options, response_url = _register_start( + client, usage="first", csrf_token=csrf_token + ) + data = dict(credential=json.dumps(REG_DATA1)) + response = client.post(response_url, data=data, follow_redirects=True) + assert ( + b"The CSRF token is missing" in response.data + ) # this should have been flashed + + data["csrf_token"] = csrf_token + response = client.post(response_url, data=data) + assert check_location(app, response.location, "/done-registration") + logout(client) + + # use old csrf_token - should fail and we should get the error in the template + response = client.post( + "wan-signin", data=dict(identity="matt@lp.com", csrf_token=csrf_token) + ) + assert b"The CSRF tokens do not match." in response.data + + response = client.get("/wan-signin") + csrf_token = get_form_input(response, "csrf_token") + signin_options, response_url = _signin_start( + client, "matt@lp.com", csrf_token=csrf_token + ) + data = dict(credential=json.dumps(SIGNIN_DATA1)) + response = client.post(response_url, data=data, follow_redirects=True) + assert ( + b"The CSRF token is missing" in response.data + ) # this should have been flashed + + data["csrf_token"] = csrf_token + response = client.post(response_url, data=data) + assert check_location(app, response.location, "/post-login") From 0363354e8d4aa1218bb917f4e2df65b50b8f0f99 Mon Sep 17 00:00:00 2001 From: Chris Wagner Date: Fri, 26 Jan 2024 09:58:34 -0800 Subject: [PATCH 2/2] Improve CSRF Added testing to /tf-setup - there wasn't any CSRF issue - all working. CSRF handling is complex and there are few unit tests. - Added @pytest.mark.csrf to make it easier to turn on and test CSRF w/o lots of boilerplate - Added tests and improved many templates to show CSRF errors - mostly for developers - but otherwise CSRF errors tent do just disappear and are difficult to debug - Found issue with WTFforms with the new form-level errors - it uses a `None` key - which, if there are multiple errors, isn't sortable by Flasks default JSON serializer. Filed issue and now change if from `None` to "" - Fixed issue in webauthn with CSRF errors causing exceptions - added tests. - In the case of CSRFprotect() (the app configuring CSRF for the entire app) a CSRF error would raise an exception which would always return an HTML response - added code to return a JSON response if desired. - Add more documentation... closes #905 --- docs/patterns.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/patterns.rst b/docs/patterns.rst index e0c5c521..0fcbb447 100644 --- a/docs/patterns.rst +++ b/docs/patterns.rst @@ -289,6 +289,13 @@ Be aware that if you enable this it will ONLY work if you send the session cooki .. note:: It is IMPORTANT that you initialize/call ``CSRFProtect`` PRIOR to initializing Flask_Security. +.. note:: + Calling CSRFProtect(app) will setup a @before_request handler to verify CSRF - this occurs BEFORE any Flask-Security decorators + or other view/form logic. One side effect is that CSRFProtect, on error, will raise a BadRequest error which returns a small + piece of HTML by default - your application will need to add a Flask ErrorHandler to change that. Alternatively, and recommended + is to set `WTF_CSRF_CHECK_DEFAULT` to `False` - which will disable the @before_request and let Flask-Security handle CSRF protection + including properly returning a JSON response if the caller asks for it. + Using a Cookie --------------