Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
[#227] implement jku checking feature
Browse files Browse the repository at this point in the history
  • Loading branch information
gimppa committed Jun 22, 2023
1 parent a7d28ae commit 2551e10
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 1 deletion.
3 changes: 2 additions & 1 deletion beacon_api/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ def parse_oauth2_config_file(path: str) -> Any:
"""Parse configuration file."""
config = ConfigParser()
config.read(path)
config_vars: Dict[str, Union[str, bool, None]] = {
config_vars: Dict[str, Union[str, bool, None, List[str]]] = {
"server": config.get("oauth2", "server"),
"issuers": config.get("oauth2", "issuers"),
"userinfo": config.get("oauth2", "userinfo"),
"audience": config.get("oauth2", "audience") or None,
"verify_aud": bool(strtobool(config.get("oauth2", "verify_aud"))),
"bona_fide_value": config.get("oauth2", "bona_fide_value"),
"trusted_jkus": config.get("oauth2", "trusted_jkus", fallback="").split(","),
}
return convert(config_vars)

Expand Down
5 changes: 5 additions & 0 deletions beacon_api/conf/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,8 @@ audience=
# If your service is not part of any network or AAI, but you still want to use tokens
# produced by other AAI parties, set this value to False to skip the audience validation step
verify_aud=False

# Comma separated list of trusted JKUs for checking passports
# Passport with an untrusted JKU will be denied access
# Leave empty to disable JKU checking
trusted_jkus=
7 changes: 7 additions & 0 deletions beacon_api/permissions/ga4gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ async def get_ga4gh_permissions(token: str) -> Tuple[set, bool]:
for encoded_passport in encoded_passports:
# Decode passport
header, payload = await decode_passport(encoded_passport)
# If trusted_jkus variable is set, only allow passports with a trusted JKU
if not OAUTH2_CONFIG.trusted_jkus == [""]:
# Skip passports with untrusted JKUs
passport_jku = header.get("jku")
if passport_jku not in OAUTH2_CONFIG.trusted_jkus:
LOG.debug("Untrusted JKU.")
continue
# Sort passports that carry dataset permissions
pass_type = payload.get("ga4gh_visa_v1", {}).get("type")
if pass_type == "ControlledAccessGrants": # nosec
Expand Down
5 changes: 5 additions & 0 deletions tests/test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ audience=
# If your service is not part of any network or AAI, but you still want to use tokens
# produced by other AAI parties, set this value to False to skip the audience validation step
verify_aud=False

# Comma separated list of trusted JKUs for checking passports
# Passport with an untrusted JKU will be denied access
# Leave empty to disable JKU checking
trusted_jkus=http://test.csc.fi/jwk
51 changes: 51 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .test_app import PARAMS, generate_token
from testfixtures import TempDirectory
from test.support import EnvironmentVarGuard
from beacon_api.conf import OAUTH2_CONFIG


def mock_token(bona_fide, permissions, auth):
Expand Down Expand Up @@ -70,6 +71,11 @@ async def load_datafile(self, vcf, datafile, datasetId, n=1000, min_ac=1):
return ["datasetId", "variants"]


async def mock_get_ga4gh_controlled(input):
"""Mock retrieve dataset permissions."""
return input


class TestBasicFunctions(unittest.IsolatedAsyncioTestCase):
"""Test supporting functions."""

Expand Down Expand Up @@ -482,5 +488,50 @@ async def test_get_ga4gh_permissions(self, m_userinfo, m_decode, m_controlled, m
self.assertEqual(bona_fide_status, True)


class TestCaseCheckJku(unittest.IsolatedAsyncioTestCase):
"""Test case."""

@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_bona_fide")
@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_controlled", side_effect=mock_get_ga4gh_controlled)
@unittest.mock.patch("beacon_api.permissions.ga4gh.decode_passport")
@unittest.mock.patch("beacon_api.permissions.ga4gh.retrieve_user_data")
async def test_jku_check(self, m_userinfo, m_decode, m_controller, m_bonafide):
"""Test trusted and untrusted jku."""
# Test: trusted jku
m_userinfo.return_value = [""]
header = {"jku": "http://test.csc.fi/jwk"}
payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}}
m_decode.return_value = header, payload
m_bonafide.return_value = False
dataset_permissions, bona_fide_status = await get_ga4gh_permissions({})
self.assertEqual(dataset_permissions, [("", header)])
self.assertEqual(bona_fide_status, False)
# Test: untrusted jku
m_userinfo.return_value = [""]
header = {"jku": "untrusted_jku"}
payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}}
m_decode.return_value = header, payload
m_bonafide.return_value = False
dataset_permissions, bona_fide_status = await get_ga4gh_permissions({})
self.assertEqual(dataset_permissions, [])
self.assertEqual(bona_fide_status, False)

@unittest.mock.patch("beacon_api.permissions.ga4gh.OAUTH2_CONFIG", new=OAUTH2_CONFIG._replace(trusted_jkus=[""]))
@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_bona_fide")
@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_controlled", side_effect=mock_get_ga4gh_controlled)
@unittest.mock.patch("beacon_api.permissions.ga4gh.decode_passport")
@unittest.mock.patch("beacon_api.permissions.ga4gh.retrieve_user_data")
async def test_jku_check_not_active(self, m_userinfo, m_decode, m_controller, m_bonafide):
"""Test if jku check is skipped when trusted_jkus config var is not set."""
m_userinfo.return_value = [""]
header = {"jku": "untrusted_jku"}
payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}}
m_decode.return_value = header, payload
m_bonafide.return_value = False
dataset_permissions, bona_fide_status = await get_ga4gh_permissions({})
self.assertEqual(dataset_permissions, [("", header)])
self.assertEqual(bona_fide_status, False)


if __name__ == "__main__":
unittest.main()

0 comments on commit 2551e10

Please sign in to comment.