Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Tests for OIDC based auth middleware
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <johnandersenpdx@gmail.com>
  • Loading branch information
pdxjohnny committed Sep 12, 2023
1 parent a54712a commit 2027b17
Showing 1 changed file with 179 additions and 0 deletions.
179 changes: 179 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import os
import threading
import pytest
import jwt
import jwcrypto.jwk
from flask import Request
from werkzeug.wrappers import Request
from werkzeug.serving import make_server
from scitt_emulator import cli, server

Expand Down Expand Up @@ -142,3 +146,178 @@ def test_client_cli(use_lro: bool, tmp_path):
with open(receipt_path_2, "rb") as f:
receipt_2 = f.read()
assert receipt == receipt_2


class AuthError(Exception):
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code


class JWTAuthMiddleware:
def __init__(self, app, config):
self.app = app
self.config = config

def __call__(self, environ, start_response):
# TODO not Flask request - from werkzeug.wrappers import Request
request = Request(environ)
print(
"path: %s, url: %s, headers: %r"
% (request.path, request.url, request.headers)
)
self.validate_token(
self.config, request.headers["Authorization"].replace("Bearer ", "")
)
return self.app(environ, start_response)

@staticmethod
def validate_token(config, token):

import base64
import jwt
import requests

# Part 1: setup
# get the OIDC config and JWKs to use

# in OIDC, you must know your client_id (this is the OAuth 2.0 client_id)
client_id = config["audience"]

# example of fetching data from your OIDC server
# see: https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfig
# oidc_server = config["oidc_server"]
oidc_server = "token.actions.githubusercontent.com"
oidc_config = requests.get(
f"https://{oidc_server}/.well-known/openid-configuration"
).json()
signing_algos = oidc_config["id_token_signing_alg_values_supported"]

# setup a PyJWKClient to get the appropriate signing key
jwks_client = jwt.PyJWKClient(oidc_config["jwks_uri"])

# Part 2: login / authorization
# when a user completes an OIDC login flow, there will be a well-formed
# response object to parse/handle

# data from the login flow
# see: https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse
import yaml

print()
print()
print()
print(yaml.dump(oidc_config))
print()
print()
# token_response = token
# id_token = token_response["id_token"]
# access_token = token_response["access_token"]

# Part 3: decode and validate at_hash
# after the login is complete, the id_token needs to be decoded
# this is the stage at which an OIDC client must verify the at_hash

# get signing_key from id_token
# signing_key = jwks_client.get_signing_key_from_jwt(id_token)

# now, decode_complete to get payload + header
"""
data = jwt.decode_complete(
id_token,
key=signing_key.key,
algorithms=signing_algos,
audience=client_id,
)
"""
claims = jwt.decode(
token,
key=config.get("key", None).export_to_pem(),
algorithms=config.get("algorithms", None),
audience=config.get("audience", None),
issuer=config.get("issuer", None),
options={"strict_aud": config.get("strict_aud", True),},
leeway=config.get("leeway", True),
)
return
payload, header = data["payload"], data["header"]

# get the pyjwt algorithm object
alg_obj = jwt.get_algorithm_by_name(header["alg"])

# compute at_hash, then validate / assert
digest = alg_obj.compute_hash_digest(access_token)
at_hash = base64.urlsafe_b64encode(digest[: (len(digest) // 2)]).rstrip("=")
assert at_hash == payload["at_hash"]


def create_jwt(payload, algorithm):
key = jwcrypto.jwk.JWK.generate(kty="RSA", size=2048)

token = jwt.encode(
payload,
key.export_to_pem(private_key=True, password=None),
algorithm=algorithm,
)

return key, token


def test_client_cli_auth_via_oidc(tmp_path):
workspace_path = tmp_path / "workspace"

claim_path = tmp_path / "claim.cose"
receipt_path = tmp_path / "claim.receipt.cbor"
entry_id_path = tmp_path / "claim.entry_id.txt"
retrieved_claim_path = tmp_path / "claim.retrieved.cose"

audience = "urn:scitt"
algorithm = "RS256"
payload = {"sub": "feedface", "aud": audience}
key, token = create_jwt(payload, algorithm)

with Service(
{
"middleware": lambda app: JWTAuthMiddleware(
app, {"key": key, "audience": audience, "algorithms": [algorithm]}
),
"tree_alg": "CCF",
"workspace": workspace_path,
"error_rate": 0.1,
"use_lro": False,
}
) as service:
# create claim
command = [
"client",
"create-claim",
"--out",
claim_path,
"--issuer",
issuer,
"--content-type",
content_type,
"--payload",
payload,
]
execute_cli(command)
assert os.path.exists(claim_path)

# submit claim
command = [
"client",
"submit-claim",
"--token",
token,
"--claim",
claim_path,
"--out",
receipt_path,
"--out-entry-id",
entry_id_path,
"--url",
service.url,
]
execute_cli(command)
assert os.path.exists(receipt_path)
assert os.path.exists(entry_id_path)

0 comments on commit 2027b17

Please sign in to comment.