Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If cigetcert fails for any reason, raise PermissionError #572

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/fake_ifdh.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,26 @@ def generate_proxy_command_verbose_args(cmd_str: str) -> Dict[str, Any]:

if force_proxy or invalid_proxy:
cigetcert_cmd_str = f"cigetcert -i 'Fermi National Accelerator Laboratory' -n --proxyhours 168 --minhours 167 -o {certfile}"
# pylint: disable=subprocess-run-check
# TODO: See if we can put check in here, catch the error, and raise our Exception as below # pylint: disable=fixme
cigetcert_cmd = subprocess.run(
shlex.split(cigetcert_cmd_str),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False,
encoding="UTF-8",
env=os.environ,
)
if cigetcert_cmd.returncode != 0:

try:
cigetcert_cmd.check_returncode()
except subprocess.CalledProcessError:
msg = f"Cigetcert failed to get a proxy due to an unspecified issue. Please inspect the output below.\n{cigetcert_cmd.stdout}"
if "Kerberos initialization failed" in cigetcert_cmd.stdout:
raise Exception(
msg = (
"Cigetcert failed to get proxy due to kerberos issue. Please ensure "
"you have valid kerberos credentials."
)
raise PermissionError(msg)

voms_proxy_init_cmd_str = (
f"voms-proxy-init -dont-verify-ac -valid 167:00 -rfc -noregen"
f" -debug -cert {certfile} -key {certfile} -out {vomsfile} -vomslife 167:0"
Expand Down
74 changes: 66 additions & 8 deletions tests/test_fake_ifdh_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ def fermilab_token(clear_token, set_group_fermilab):
return fake_ifdh.getToken("Analysis")


@pytest.fixture
def fake_proxy_path(tmp_path):
fake_path = tmp_path / "test_proxy"
if os.path.exists(fake_path):
try:
os.unlink(fake_path)
except:
pass
return fake_path


@pytest.fixture
def switch_to_invalid_kerb_cache(monkeypatch, tmp_path):
# Set the environment variable to an invalid path
fakefile = tmp_path / "invalid_kerb_cache"
fakefile.touch()
monkeypatch.setenv("KRB5CCNAME", f"FILE:{fakefile}")
yield


class TestGetTmp:
@pytest.mark.unit
def test_getTmp(self):
Expand Down Expand Up @@ -358,43 +378,81 @@ def test_bearer_token_file_not_exist(self, monkeypatch, clear_bearer_token_file)

class TestGetProxy:
@pytest.mark.unit
def test_getProxy_good(check_user_kerberos_creds, clear_token, set_group_fermilab):
def test_getProxy_good(
self, check_user_kerberos_creds, clear_token, set_group_fermilab
):
proxy = fake_ifdh.getProxy("Analysis")
assert os.path.exists(proxy)

@pytest.mark.unit
def test_getProxy_override(
self,
check_user_kerberos_creds,
clear_x509_user_proxy,
clear_token,
set_group_fermilab,
fake_proxy_path,
monkeypatch,
tmp_path,
):
fake_path = tmp_path / "test_proxy"
fake_path = fake_proxy_path
monkeypatch.setenv("X509_USER_PROXY", str(fake_path))
proxy = fake_ifdh.getProxy("Analysis")
assert proxy == str(fake_path)

@pytest.mark.unit
def test_getProxy_fail(
self,
check_user_kerberos_creds,
clear_x509_user_proxy,
clear_token,
fake_proxy_path,
monkeypatch,
tmp_path,
):
fake_path = tmp_path / "test_proxy"
if os.path.exists(fake_path):
try:
os.unlink(fake_path)
except:
pass
fake_path = fake_proxy_path
monkeypatch.setenv("X509_USER_PROXY", str(fake_path))
monkeypatch.setenv("GROUP", "bozo")
with pytest.raises(PermissionError):
fake_ifdh.getProxy("Analysis")

@pytest.mark.unit
def test_getProxy_fail_cigetcert_kerberos(
self,
switch_to_invalid_kerb_cache,
clear_x509_user_proxy,
clear_token,
fake_proxy_path,
monkeypatch,
tmp_path,
):
fake_path = fake_proxy_path
monkeypatch.setenv("X509_USER_PROXY", str(fake_path))
monkeypatch.setenv("GROUP", "bozo")

# Should fail because cigetcert fails for kerberos issue
with pytest.raises(Exception, match="kerberos issue"):
fake_ifdh.getProxy("Analysis")

@pytest.mark.unit
def test_getProxy_fail_cigetcert_other(
self,
clear_x509_user_proxy,
clear_token,
monkeypatch,
tmp_path,
):
# We're trying to force a permission-denied error here. So try to write to /dev/null/fake_file, which can never exist since /dev/null isn't a directory
monkeypatch.setenv("X509_USER_PROXY", "/dev/null/fake_file")
monkeypatch.setenv("GROUP", "bozo")

# Should fail because cigetcert fails for kerberos issue
with pytest.raises(
PermissionError,
match="Cigetcert failed to get a proxy due to an unspecified issue",
):
fake_ifdh.getProxy("Analysis")


@pytest.mark.unit
def test_cp():
Expand Down
Loading