diff --git a/dev-requirements.txt b/dev-requirements.txt index 4b601892..e969ac1f 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -5,3 +5,5 @@ requests==2.31.0 requests-toolbelt==0.9 urllib3<2.0.0 myst-parser +PyJWT +jwcrypto diff --git a/environment.yml b/environment.yml index d46121c8..90e61fa8 100644 --- a/environment.yml +++ b/environment.yml @@ -36,3 +36,5 @@ dependencies: - urllib3<2.0.0 - myst-parser==1.0.0 - jsonschema==4.17.3 + - jwcrypto==1.5.0 + - PyJWT==2.8.0 diff --git a/run-tests.sh b/run-tests.sh index c08e8128..de8eadb1 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -8,8 +8,9 @@ if [ ! -f "venv/bin/activate" ]; then echo "Setting up Python virtual environment." python3 -m venv "venv" . ./venv/bin/activate + pip install -q -U pip setuptools wheel pip install -q -r dev-requirements.txt - pip install -q -e . + pip install -q -e .[oidc] else . ./venv/bin/activate fi diff --git a/tests/test_cli.py b/tests/test_cli.py index 7e9db7ad..58839070 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,10 +1,15 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import os +import json import threading import pytest +import jwt +import jwcrypto +from flask import Flask, jsonify from werkzeug.serving import make_server from scitt_emulator import cli, server +from scitt_emulator.oidc import OIDCAuthMiddleware issuer = "did:web:example.com" content_type = "application/json" @@ -149,3 +154,127 @@ def test_client_cli(use_lro: bool, tmp_path): with open(receipt_path_2, "rb") as f: receipt_2 = f.read() assert receipt == receipt_2 + + +def create_flask_app_oidc_server(config): + app = Flask("oidc_server") + + app.config.update(dict(DEBUG=True)) + app.config.update(config) + + @app.route("/.well-known/openid-configuration", methods=["GET"]) + def openid_configuration(): + return jsonify( + { + "issuer": app.url, + "jwks_uri": f"{app.url}/.well-known/jwks", + "response_types_supported": ["id_token"], + "claims_supported": ["sub", "aud", "exp", "iat", "iss"], + "id_token_signing_alg_values_supported": app.config["algorithms"], + "scopes_supported": ["openid"], + } + ) + + @app.route("/.well-known/jwks", methods=["GET"]) + def jwks(): + return jsonify( + { + "keys": [ + { + **app.config["key"].export_public(as_dict=True), + "use": "sig", + "kid": app.config["key"].thumbprint(), + } + ] + } + ) + + return app + + +def test_client_cli_token(tmp_path): + workspace_path = tmp_path / "workspace" + + claim_path = tmp_path / "claim.cose" + receipt_path = tmp_path / "claim.receipt.cbor" + entry_id_path = tmp_path / "claim.entry_id.txt" + retrieved_claim_path = tmp_path / "claim.retrieved.cose" + + key = jwcrypto.jwk.JWK.generate(kty="RSA", size=2048) + algorithm = "RS256" + audience = "scitt.example.org" + + with Service( + {"key": key, "algorithms": [algorithm]}, + create_flask_app=create_flask_app_oidc_server, + ) as oidc_service: + os.environ["no_proxy"] = ",".join( + os.environ.get("no_proxy", "").split(",") + [oidc_service.host] + ) + middleware_config_path = tmp_path / "oidc-middleware-config.json" + middleware_config_path.write_text( + json.dumps({"issuer": oidc_service.url, "audience": audience}) + ) + with Service( + { + "middleware": OIDCAuthMiddleware, + "middleware_config_path": middleware_config_path, + "tree_alg": "CCF", + "workspace": workspace_path, + "error_rate": 0.1, + "use_lro": False, + } + ) as service: + # create claim + command = [ + "client", + "create-claim", + "--out", + claim_path, + "--issuer", + issuer, + "--content-type", + content_type, + "--payload", + payload, + ] + execute_cli(command) + assert os.path.exists(claim_path) + + # submit claim without token + command = [ + "client", + "submit-claim", + "--claim", + claim_path, + "--out", + receipt_path, + "--out-entry-id", + entry_id_path, + "--url", + service.url, + ] + check_error = None + try: + execute_cli(command) + except Exception as error: + check_error = error + assert check_error + assert not os.path.exists(receipt_path) + assert not os.path.exists(entry_id_path) + + # create token + token = jwt.encode( + {"iss": oidc_service.url, "aud": audience}, + key.export_to_pem(private_key=True, password=None), + algorithm=algorithm, + headers={"kid": key.thumbprint()}, + ) + # submit claim with token + command += [ + "--token", + token, + ] + execute_cli(command) + assert os.path.exists(receipt_path) + assert os.path.exists(entry_id_path)