Skip to content

Commit

Permalink
Support for running tests against Heimdal in CI
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Borean <jborean93@gmail.com>
  • Loading branch information
jborean93 committed Aug 22, 2021
1 parent be33336 commit 1bb99bf
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 79 deletions.
4 changes: 2 additions & 2 deletions ci/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ if [ $BUILD_RES -ne 0 ]; then
fi

# Only call exit on failures so we can source this script
if [ x"$KRB5_VER" = "xheimdal" ] || [ "$OS_NAME" = "windows" ]; then
# heimdal/Windows can't run the tests yet, so just make sure it imports and exit
if [ "$OS_NAME" = "windows" ]; then
# Windows can't run the tests yet, so just make sure it imports and exit
python -c "import gssapi" || exit $?
else
python setup.py nosetests --verbosity=3 || exit $?
Expand Down
12 changes: 11 additions & 1 deletion ci/lib-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ setup::debian::install() {
apt-get update

if [ x"$KRB5_VER" = "xheimdal" ]; then
apt-get -y install heimdal-dev
apt-get -y install heimdal-{clients,dev,kdc}

export GSSAPI_KRB5_MAIN_LIB="/usr/lib/x86_64-linux-gnu/libkrb5.so.26"
export PATH="/usr/lib/heimdal-servers:${PATH}"
else
apt-get -y install krb5-{user,kdc,admin-server,multidev} libkrb5-dev \
gss-ntlmssp
Expand Down Expand Up @@ -62,6 +65,13 @@ setup::macos::install() {
python3 -m virtualenv -p $(which python3) .venv
source .venv/bin/activate
pip install --install-option='--no-cython-compile' cython

export GSSAPI_KRB5_MAIN_LIB="/System/Library/PrivateFrameworks/Heimdal.framework/Heimdal"

# macOS's Heimdal version is buggy, it will only use KRB5_KTNAME if the
# env var was set when GSSAPI creates the context. Setting it here to any
# value solves that problem for CI.
export KRB5_KTNAME=initial
}

setup::windows::install() {
Expand Down
2 changes: 1 addition & 1 deletion ci/run-on-linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# If we try to use a normal Github Actions container with
# github-pages-deploy-action, it will fail due to inability to find git.

docker run -h test.box \
docker run -h test.krbtest.com \
-v `pwd`:/tmp/build -w /tmp/build \
-e KRB5_VER=${KRB5_VER:-mit} \
-e FLAKE=${FLAKE:no} \
Expand Down
149 changes: 119 additions & 30 deletions gssapi/tests/test_high_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@


TARGET_SERVICE_NAME = b'host'
FQDN = socket.getfqdn().encode('utf-8')
FQDN = (
'localhost' if sys.platform == 'darwin' else socket.getfqdn()
).encode('utf-8')
SERVICE_PRINCIPAL = TARGET_SERVICE_NAME + b'/' + FQDN

# disable error deferring to catch errors immediately
Expand Down Expand Up @@ -124,7 +126,8 @@ def setUp(self):
usage='both')
def test_acquire_by_init(self, str_name, kwargs):
creds = gsscreds.Credentials(name=self.name, **kwargs)
self.assertIsInstance(creds.lifetime, int)
if sys.platform != 'darwin':
self.assertIsInstance(creds.lifetime, int)
del creds

@exist_perms(lifetime=30, mechs=[gb.MechType.kerberos],
Expand All @@ -137,7 +140,8 @@ def test_acquire_by_method(self, str_name, kwargs):
creds, actual_mechs, ttl = cred_resp
self.assertIsInstance(creds, gsscreds.Credentials)
self.assertIn(gb.MechType.kerberos, actual_mechs)
self.assertIsInstance(ttl, int)
if sys.platform != 'darwin':
self.assertIsInstance(ttl, int)

del creds

Expand Down Expand Up @@ -165,9 +169,12 @@ def test_store_acquire(self):
self.assertIsNotNone(deleg_creds)

store_res = deleg_creds.store(usage='initiate', set_default=True,
mech=gb.MechType.kerberos,
overwrite=True)
self.assertEqual(store_res.usage, "initiate")
self.assertIn(gb.MechType.kerberos, store_res.mechs)
# While Heimdal doesn't fail it doesn't set the return values as exp.
if self.realm.provider.lower() != 'heimdal':
self.assertEqual(store_res.usage, "initiate")
self.assertIn(gb.MechType.kerberos, store_res.mechs)

reacquired_creds = gsscreds.Credentials(name=deleg_creds.name,
usage='initiate')
Expand All @@ -187,10 +194,18 @@ def test_store_into_acquire_from(self):
initial_creds = gsscreds.Credentials(name=None,
usage='initiate')

store_res = initial_creds.store(store, overwrite=True)
acquire_kwargs = {}
expected_usage = 'initiate'
if self.realm.provider.lower() == 'heimdal':
acquire_kwargs['usage'] = 'initiate'
acquire_kwargs['mech'] = gb.MechType.kerberos
expected_usage = 'both'

store_res = initial_creds.store(store, overwrite=True,
**acquire_kwargs)
self.assertIsNotNone(store_res.mechs)
self.assertGreater(len(store_res.mechs), 0)
self.assertEqual(store_res.usage, "initiate")
self.assertEqual(store_res.usage, expected_usage)

name = gssnames.Name(princ_name)
retrieved_creds = gsscreds.Credentials(name=name, store=store)
Expand All @@ -212,13 +227,14 @@ def test_inquire(self, str_name, kwargs):
else:
self.assertIsNone(resp.name)

if kwargs['lifetime']:
if kwargs['lifetime'] and sys.platform != 'darwin':
self.assertIsInstance(resp.lifetime, int)
else:
self.assertIsNone(resp.lifetime)

if kwargs['usage']:
self.assertEqual(resp.usage, "both")
expected = "accept" if sys.platform == "darwin" else "both"
self.assertEqual(resp.usage, expected)
else:
self.assertIsNone(resp.usage)

Expand All @@ -242,17 +258,21 @@ def test_inquire_by_mech(self, str_name, kwargs):
else:
self.assertIsNone(resp.init_lifetime)

if kwargs['accept_lifetime']:
if kwargs['accept_lifetime'] and sys.platform != "darwin":
self.assertIsInstance(resp.accept_lifetime, int)
else:
self.assertIsNone(resp.accept_lifetime)

if kwargs['usage']:
self.assertEqual(resp.usage, "both")
expected = "accept" if sys.platform == "darwin" else "both"
self.assertEqual(resp.usage, expected)
else:
self.assertIsNone(resp.usage)

def test_add(self):
if sys.platform == 'darwin':
self.skipTest("macOS Heimdal broken")

input_creds = gsscreds.Credentials(gb.Creds())
name = gssnames.Name(SERVICE_PRINCIPAL)
new_creds = input_creds.add(name, gb.MechType.kerberos,
Expand All @@ -265,18 +285,25 @@ def test_store_into_add_from(self):
KT = '{tmpdir}/other_keytab'.format(tmpdir=self.realm.tmpdir)
store = {'ccache': CCACHE, 'keytab': KT}

princ_name = 'service/cs@' + self.realm.realm
princ_name = 'service_add_from/cs@' + self.realm.realm
self.realm.addprinc(princ_name)
self.realm.extract_keytab(princ_name, KT)
self.realm.kinit(princ_name, None, ['-k', '-t', KT])

initial_creds = gsscreds.Credentials(name=None,
usage='initiate')

store_res = initial_creds.store(store, overwrite=True)
store_kwargs = {}
expected_usage = 'initiate'
if self.realm.provider.lower() == 'heimdal':
store_kwargs['usage'] = 'initiate'
store_kwargs['mech'] = gb.MechType.kerberos
expected_usage = 'both'

store_res = initial_creds.store(store, overwrite=True, **store_kwargs)
self.assertIsNotNone(store_res.mechs)
self.assertGreater(len(store_res.mechs), 0)
self.assertEqual(store_res.usage, "initiate")
self.assertEqual(store_res.usage, expected_usage)

name = gssnames.Name(princ_name)
input_creds = gsscreds.Credentials(gb.Creds())
Expand All @@ -286,26 +313,34 @@ def test_store_into_add_from(self):

@ktu.gssapi_extension_test('cred_imp_exp', 'credentials import-export')
def test_export(self):
creds = gsscreds.Credentials(name=self.name)
creds = gsscreds.Credentials(name=self.name,
mechs=[gb.MechType.kerberos])
token = creds.export()
self.assertIsInstance(token, bytes)

@ktu.gssapi_extension_test('cred_imp_exp', 'credentials import-export')
def test_import_by_init(self):
creds = gsscreds.Credentials(name=self.name)
creds = gsscreds.Credentials(name=self.name,
mechs=[gb.MechType.kerberos])
token = creds.export()
imported_creds = gsscreds.Credentials(token=token)

self.assertEqual(imported_creds.lifetime, creds.lifetime)
# lifetime seems to be None in Heimdal
if self.realm.provider.lower() != 'heimdal':
self.assertEqual(imported_creds.lifetime, creds.lifetime)

self.assertEqual(imported_creds.name, creds.name)

@ktu.gssapi_extension_test('cred_imp_exp', 'credentials import-export')
def test_pickle_unpickle(self):
creds = gsscreds.Credentials(name=self.name)
creds = gsscreds.Credentials(name=self.name,
mechs=[gb.MechType.kerberos])
pickled_creds = pickle.dumps(creds)
unpickled_creds = pickle.loads(pickled_creds)

self.assertEqual(unpickled_creds.lifetime, creds.lifetime)
# lifetime seems to be None in Heimdal
if self.realm.provider.lower() != 'heimdal':
self.assertEqual(unpickled_creds.lifetime, creds.lifetime)
self.assertEqual(unpickled_creds.name, creds.name)

@exist_perms(lifetime=30, mechs=[gb.MechType.kerberos],
Expand Down Expand Up @@ -381,8 +416,15 @@ def test_sasl_properties(self):
if mech.description:
self.assertIsInstance(mech.description, str)

cmp_mech = gssmechs.Mechanism.from_sasl_name(mech.sasl_name)
self.assertEqual(str(cmp_mech), str(mech))
# Heimdal fails with Unknown mech-code on sanon
if not (self.realm.provider.lower() == "heimdal" and
s == '1.3.6.1.4.1.5322.26.1.110'):
cmp_mech = gssmechs.Mechanism.from_sasl_name(mech.sasl_name)

# For some reason macOS sometimes returns this for mechs
if not (sys.platform == 'darwin' and
str(cmp_mech) == '1.2.752.43.14.2'):
self.assertEqual(str(cmp_mech), str(mech))

@ktu.gssapi_extension_test('rfc5587', 'RFC 5587: Mech Inquiry')
def test_mech_inquiry(self):
Expand Down Expand Up @@ -441,6 +483,8 @@ def test_create_from_token(self):
self.assertEqual(name2.name_type, gb.NameType.kerberos_principal)

@ktu.gssapi_extension_test('rfc6680', 'RFC 6680')
@ktu.krb_provider_test(['mit'], 'gss_display_name_ext as it is not '
'implemented for krb5')
def test_display_as(self):
name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
Expand All @@ -457,6 +501,8 @@ def test_display_as(self):
self.assertEqual(krb_name, princ_str)

@ktu.gssapi_extension_test('rfc6680', 'RFC 6680')
@ktu.krb_provider_test(['mit'], 'gss_canonicalize_name as it is not '
'implemented for krb5')
def test_create_from_composite_token_no_attrs(self):
name1 = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
Expand Down Expand Up @@ -539,7 +585,16 @@ def test_canonicalize(self):

canonicalized_name = name.canonicalize(gb.MechType.kerberos)
self.assertIsInstance(canonicalized_name, gssnames.Name)
self.assertEqual(bytes(canonicalized_name), SERVICE_PRINCIPAL + b"@")

expected = SERVICE_PRINCIPAL + b"@"
if sys.platform == 'darwin':
# No idea - just go with it
expected = b"host/wellknown:org.h5l.hostbased-service@" \
b"H5L.HOSTBASED-SERVICE"
elif self.realm.provider.lower() == 'heimdal':
expected += self.realm.realm.encode('utf-8')

self.assertEqual(bytes(canonicalized_name), expected)

def test_copy(self):
name1 = gssnames.Name(SERVICE_PRINCIPAL)
Expand All @@ -551,6 +606,7 @@ def test_copy(self):
# doesn't actually implement it

@ktu.gssapi_extension_test('rfc6680', 'RFC 6680')
@ktu.krb_provider_test(['mit'], 'Heimdal does not implemented for krb5')
def test_is_mech_name(self):
name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
Expand All @@ -562,6 +618,7 @@ def test_is_mech_name(self):
self.assertEqual(canon_name.mech, gb.MechType.kerberos)

@ktu.gssapi_extension_test('rfc6680', 'RFC 6680')
@ktu.krb_provider_test(['mit'], 'Heimdal does not implemented for krb5')
def test_export_name_composite_no_attrs(self):
name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
Expand Down Expand Up @@ -611,8 +668,13 @@ def setUp(self):
self.client_creds = gsscreds.Credentials(name=None,
usage='initiate')

self.target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
if sys.platform == "darwin":
spn = TARGET_SERVICE_NAME + b"@" + FQDN
self.target_name = gssnames.Name(spn,
gb.NameType.hostbased_service)
else:
self.target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)

self.server_name = gssnames.Name(SERVICE_PRINCIPAL)
self.server_creds = gsscreds.Credentials(name=self.server_name,
Expand All @@ -628,7 +690,12 @@ def _create_client_ctx(self, **kwargs):
def test_create_from_other(self):
raw_client_ctx, raw_server_ctx = self._create_completed_contexts()
high_level_ctx = gssctx.SecurityContext(raw_client_ctx)
self.assertEqual(high_level_ctx.target_name, self.target_name)

expected = self.target_name
if self.realm.provider.lower() == "heimdal":
expected = gssnames.Name(self.realm.host_princ.encode('utf-8'),
name_type=gb.NameType.kerberos_principal)
self.assertEqual(high_level_ctx.target_name, expected)

@exist_perms(lifetime=30, flags=[],
mech=gb.MechType.kerberos,
Expand Down Expand Up @@ -688,7 +755,13 @@ def test_initiate_accept_steps(self):
self.assertTrue(server_ctx.complete)

self.assertLessEqual(client_ctx.lifetime, 400)
self.assertEqual(client_ctx.target_name, self.target_name)

expected = self.target_name
if self.realm.provider.lower() == "heimdal":
expected = gssnames.Name(self.realm.host_princ.encode('utf-8'),
name_type=gb.NameType.kerberos_principal)
self.assertEqual(client_ctx.target_name, expected)

self.assertIsInstance(client_ctx.mech, gb.OID)
self.assertIsInstance(client_ctx.actual_flags, gb.IntEnumFlagSet)
self.assertTrue(client_ctx.locally_initiated)
Expand All @@ -714,6 +787,9 @@ def test_channel_bindings(self):
client_ctx.step(server_token)

def test_bad_channel_bindings_raises_error(self):
if sys.platform == "darwin":
self.skipTest("macOS Heimdal doesn't fail as expected")

bdgs = gb.ChannelBindings(application_data=b'abcxyz',
initiator_address_type=gb.AddressType.ip,
initiator_address=b'127.0.0.1',
Expand All @@ -738,7 +814,13 @@ def test_export_create_from_token(self):

imported_ctx = gssctx.SecurityContext(token=token)
self.assertEqual(imported_ctx.usage, "initiate")
self.assertEqual(imported_ctx.target_name, self.target_name)

expected = self.target_name
if self.realm.provider.lower() == "heimdal":
expected = gssnames.Name(self.realm.host_princ.encode('utf-8'),
name_type=gb.NameType.kerberos_principal)

self.assertEqual(imported_ctx.target_name, expected)

def test_pickle_unpickle(self):
client_ctx, server_ctx = self._create_completed_contexts()
Expand All @@ -747,7 +829,12 @@ def test_pickle_unpickle(self):
unpickled_ctx = pickle.loads(pickled_ctx)
self.assertIsInstance(unpickled_ctx, gssctx.SecurityContext)
self.assertEqual(unpickled_ctx.usage, "initiate")
self.assertEqual(unpickled_ctx.target_name, self.target_name)

expected = self.target_name
if self.realm.provider.lower() == "heimdal":
expected = gssnames.Name(self.realm.host_princ.encode('utf-8'),
name_type=gb.NameType.kerberos_principal)
self.assertEqual(unpickled_ctx.target_name, expected)

def test_encrypt_decrypt(self):
client_ctx, server_ctx = self._create_completed_contexts()
Expand Down Expand Up @@ -810,7 +897,8 @@ def test_verify_signature_raise(self):
self.assertRaises(gb.GSSError, server_ctx.verify_signature,
b"other message", mic_token)

@ktu.krb_minversion_test("1.11", "returning tokens")
@ktu.krb_minversion_test("1.11", "returning tokens", provider="mit")
@ktu.krb_provider_test(["mit"], "returning tokens")
def test_defer_step_error_on_method(self):
gssctx.SecurityContext.__DEFER_STEP_ERRORS__ = True
bdgs = gb.ChannelBindings(application_data=b'abcxyz')
Expand All @@ -827,7 +915,8 @@ def test_defer_step_error_on_method(self):
self.assertRaises(gb.BadChannelBindingsError, server_ctx.encrypt,
b"test")

@ktu.krb_minversion_test("1.11", "returning tokens")
@ktu.krb_minversion_test("1.11", "returning tokens", provider="mit")
@ktu.krb_provider_test(["mit"], "returning tokens")
def test_defer_step_error_on_complete_property_access(self):
gssctx.SecurityContext.__DEFER_STEP_ERRORS__ = True
bdgs = gb.ChannelBindings(application_data=b'abcxyz')
Expand Down
Loading

0 comments on commit 1bb99bf

Please sign in to comment.