Skip to content

Commit

Permalink
Fix S4U tests
Browse files Browse the repository at this point in the history
These tests relied on rickety behavior and caching.  Redo them to
actually test impersonation.  Fixes problems with krb5 >= 1.18.2.

Resolves: #220

Signed-off-by: Robbie Harwood <rharwood@redhat.com>
  • Loading branch information
frozencemetery committed Nov 13, 2020
1 parent 743af8e commit 842017b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 60 deletions.
56 changes: 33 additions & 23 deletions gssapi/tests/test_high_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,39 +312,49 @@ def test_pickle_unpickle(self):
usage='initiate')
@ktu.gssapi_extension_test('s4u', 'S4U')
def test_impersonate(self, str_name, kwargs):
target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
# TODO(directxman12): make this use the high-level SecurityContext
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free everything but the token

server_name = self.name
server_creds = gsscreds.Credentials(name=server_name,
usage='both')
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_creds=server_creds)

imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs)
server_name = gssnames.Name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)

password = self.realm.password("user")
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
client_ctx = gssctx.SecurityContext(
name=server_name, flags=gb.RequirementFlag.delegate_to_peer)
client_token = client_ctx.step()

self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gsscreds.Credentials(usage="both")
server_ctx = gssctx.SecurityContext(creds=server_creds)
server_ctx.step(client_token)
self.assertTrue(server_ctx.complete)

imp_creds = server_ctx.delegated_creds.impersonate(server_name,
**kwargs)
self.assertIsInstance(imp_creds, gsscreds.Credentials)

@ktu.gssapi_extension_test('s4u', 'S4U')
def test_add_with_impersonate(self):
target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
client_ctx = gssctx.SecurityContext(name=target_name)
server_name = gssnames.Name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)

password = self.realm.password("user")
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
client_ctx = gssctx.SecurityContext(
name=server_name, flags=gb.RequirementFlag.delegate_to_peer)
client_token = client_ctx.step()

server_creds = gsscreds.Credentials(usage='both')
server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept')
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gsscreds.Credentials(usage="both")
server_ctx = gssctx.SecurityContext(creds=server_creds)
server_ctx.step(client_token)
self.assertTrue(server_ctx.complete)

# use empty creds to test here
input_creds = gsscreds.Credentials(gb.Creds())
new_creds = input_creds.add(server_ctx.initiator_name,
gb.MechType.kerberos,
impersonator=server_creds,
usage='initiate')
new_creds = input_creds.add(
server_name, gb.MechType.kerberos,
impersonator=server_ctx.delegated_creds, usage='initiate')
self.assertIsInstance(new_creds, gsscreds.Credentials)


Expand Down
80 changes: 43 additions & 37 deletions gssapi/tests/test_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def setUp(self):
def test_indicate_mechs(self):
mechs = gb.indicate_mechs()
self.assertIsInstance(mechs, set)
self.assertGreater(len(mechs), 0)
self.assertIn(gb.MechType.kerberos, mechs)

def test_import_name(self):
Expand Down Expand Up @@ -320,56 +319,63 @@ def test_inquire_context(self):

@ktu.gssapi_extension_test('s4u', 'S4U')
def test_add_cred_impersonate_name(self):
target_name = gb.import_name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free all the things (except the token)!

server_name = gb.import_name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)
server_creds = gb.acquire_cred(server_name, usage='both')[0]
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_creds=server_creds)

password = self.realm.password('user')
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
name = gb.import_name(b"user", gb.NameType.kerberos_principal)
client_creds = gb.acquire_cred(name, usage="initiate").creds
cctx_res = gb.init_sec_context(
server_name, creds=client_creds,
flags=gb.RequirementFlag.delegate_to_peer)

self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gb.acquire_cred(server_name, usage="both").creds
sctx_res = gb.accept_sec_context(cctx_res.token, server_creds)
self.assertTrue(gb.inquire_context(sctx_res.context).complete)

input_creds = gb.Creds()
imp_resp = gb.add_cred_impersonate_name(input_creds,
server_creds,
server_ctx_resp[1],
sctx_res.delegated_creds,
server_name,
gb.MechType.kerberos)
self.assertIsNotNone(imp_resp)

new_creds, actual_mechs, output_init_ttl, output_accept_ttl = imp_resp
self.assertIsInstance(new_creds, gb.Creds)
self.assertIn(gb.MechType.kerberos, actual_mechs)
self.assertIsInstance(output_init_ttl, int)
self.assertIsInstance(output_accept_ttl, int)
self.assertIsInstance(imp_resp, gb.AddCredResult)
self.assertIsInstance(imp_resp.creds, gb.Creds)
self.assertIn(gb.MechType.kerberos, imp_resp.mechs)
self.assertIsInstance(imp_resp.init_lifetime, int)
self.assertGreater(imp_resp.init_lifetime, 0)
self.assertIsInstance(imp_resp.accept_lifetime, int)
self.assertEqual(imp_resp.accept_lifetime, 0)

@ktu.gssapi_extension_test('s4u', 'S4U')
def test_acquire_creds_impersonate_name(self):
target_name = gb.import_name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free all the things (except the token)!

server_name = gb.import_name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)
server_creds = gb.acquire_cred(server_name, usage='both')[0]
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_creds=server_creds)

imp_resp = gb.acquire_cred_impersonate_name(server_creds,
server_ctx_resp[1])
self.assertIsNotNone(imp_resp)

imp_creds, actual_mechs, output_ttl = imp_resp
self.assertIsInstance(imp_creds, gb.Creds)
self.assertIn(gb.MechType.kerberos, actual_mechs)
self.assertIsInstance(output_ttl, int)
password = self.realm.password('user')
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
name = gb.import_name(b'user', gb.NameType.kerberos_principal)
client_creds = gb.acquire_cred(name, usage="initiate").creds
cctx_res = gb.init_sec_context(
server_name, creds=client_creds,
flags=gb.RequirementFlag.delegate_to_peer)

# no need to explicitly release any more -- we can just rely on
# __dealloc__ (b/c cython)
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gb.acquire_cred(server_name, usage='both').creds
sctx_res = gb.accept_sec_context(cctx_res.token, server_creds)
self.assertTrue(gb.inquire_context(sctx_res.context).complete)

imp_resp = gb.acquire_cred_impersonate_name(sctx_res.delegated_creds,
server_name)
self.assertIsInstance(imp_resp, gb.AcquireCredResult)
self.assertIsInstance(imp_resp.creds, gb.Creds)
self.assertIn(gb.MechType.kerberos, imp_resp.mechs)
self.assertIsInstance(imp_resp.lifetime, int)
self.assertGreater(imp_resp.lifetime, 0)

@ktu.gssapi_extension_test('s4u', 'S4U')
@ktu.krb_minversion_test('1.11',
Expand Down

0 comments on commit 842017b

Please sign in to comment.