Skip to content

Commit

Permalink
Merge pull request #572 from shreyb/issue-570
Browse files Browse the repository at this point in the history
If cigetcert fails for any reason, raise `PermissionError`
  • Loading branch information
shreyb committed May 23, 2024
2 parents 115727d + d77e5a6 commit 1f71d00
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 12 deletions.
13 changes: 9 additions & 4 deletions lib/fake_ifdh.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,26 @@ def generate_proxy_command_verbose_args(cmd_str: str) -> Dict[str, Any]:

if force_proxy or invalid_proxy:
cigetcert_cmd_str = f"cigetcert -i 'Fermi National Accelerator Laboratory' -n --proxyhours 168 --minhours 167 -o {certfile}"
# pylint: disable=subprocess-run-check
# TODO: See if we can put check in here, catch the error, and raise our Exception as below # pylint: disable=fixme
cigetcert_cmd = subprocess.run(
shlex.split(cigetcert_cmd_str),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False,
encoding="UTF-8",
env=os.environ,
)
if cigetcert_cmd.returncode != 0:

try:
cigetcert_cmd.check_returncode()
except subprocess.CalledProcessError:
msg = f"Cigetcert failed to get a proxy due to an unspecified issue. Please inspect the output below.\n{cigetcert_cmd.stdout}"
if "Kerberos initialization failed" in cigetcert_cmd.stdout:
raise Exception(
msg = (
"Cigetcert failed to get proxy due to kerberos issue. Please ensure "
"you have valid kerberos credentials."
)
raise PermissionError(msg)

voms_proxy_init_cmd_str = (
f"voms-proxy-init -dont-verify-ac -valid 167:00 -rfc -noregen"
f" -debug -cert {certfile} -key {certfile} -out {vomsfile} -vomslife 167:0"
Expand Down
74 changes: 66 additions & 8 deletions tests/test_fake_ifdh_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ def fermilab_token(clear_token, set_group_fermilab):
return fake_ifdh.getToken("Analysis")


@pytest.fixture
def fake_proxy_path(tmp_path):
fake_path = tmp_path / "test_proxy"
if os.path.exists(fake_path):
try:
os.unlink(fake_path)
except:
pass
return fake_path


@pytest.fixture
def switch_to_invalid_kerb_cache(monkeypatch, tmp_path):
# Set the environment variable to an invalid path
fakefile = tmp_path / "invalid_kerb_cache"
fakefile.touch()
monkeypatch.setenv("KRB5CCNAME", f"FILE:{fakefile}")
yield


class TestGetTmp:
@pytest.mark.unit
def test_getTmp(self):
Expand Down Expand Up @@ -358,43 +378,81 @@ def test_bearer_token_file_not_exist(self, monkeypatch, clear_bearer_token_file)

class TestGetProxy:
@pytest.mark.unit
def test_getProxy_good(check_user_kerberos_creds, clear_token, set_group_fermilab):
def test_getProxy_good(
self, check_user_kerberos_creds, clear_token, set_group_fermilab
):
proxy = fake_ifdh.getProxy("Analysis")
assert os.path.exists(proxy)

@pytest.mark.unit
def test_getProxy_override(
self,
check_user_kerberos_creds,
clear_x509_user_proxy,
clear_token,
set_group_fermilab,
fake_proxy_path,
monkeypatch,
tmp_path,
):
fake_path = tmp_path / "test_proxy"
fake_path = fake_proxy_path
monkeypatch.setenv("X509_USER_PROXY", str(fake_path))
proxy = fake_ifdh.getProxy("Analysis")
assert proxy == str(fake_path)

@pytest.mark.unit
def test_getProxy_fail(
self,
check_user_kerberos_creds,
clear_x509_user_proxy,
clear_token,
fake_proxy_path,
monkeypatch,
tmp_path,
):
fake_path = tmp_path / "test_proxy"
if os.path.exists(fake_path):
try:
os.unlink(fake_path)
except:
pass
fake_path = fake_proxy_path
monkeypatch.setenv("X509_USER_PROXY", str(fake_path))
monkeypatch.setenv("GROUP", "bozo")
with pytest.raises(PermissionError):
fake_ifdh.getProxy("Analysis")

@pytest.mark.unit
def test_getProxy_fail_cigetcert_kerberos(
self,
switch_to_invalid_kerb_cache,
clear_x509_user_proxy,
clear_token,
fake_proxy_path,
monkeypatch,
tmp_path,
):
fake_path = fake_proxy_path
monkeypatch.setenv("X509_USER_PROXY", str(fake_path))
monkeypatch.setenv("GROUP", "bozo")

# Should fail because cigetcert fails for kerberos issue
with pytest.raises(Exception, match="kerberos issue"):
fake_ifdh.getProxy("Analysis")

@pytest.mark.unit
def test_getProxy_fail_cigetcert_other(
self,
clear_x509_user_proxy,
clear_token,
monkeypatch,
tmp_path,
):
# We're trying to force a permission-denied error here. So try to write to /dev/null/fake_file, which can never exist since /dev/null isn't a directory
monkeypatch.setenv("X509_USER_PROXY", "/dev/null/fake_file")
monkeypatch.setenv("GROUP", "bozo")

# Should fail because cigetcert fails for kerberos issue
with pytest.raises(
PermissionError,
match="Cigetcert failed to get a proxy due to an unspecified issue",
):
fake_ifdh.getProxy("Analysis")


@pytest.mark.unit
def test_cp():
Expand Down

0 comments on commit 1f71d00

Please sign in to comment.