Skip to content

Commit

Permalink
Fix S4U tests
Browse files Browse the repository at this point in the history
These tests relied on rickety behavior and caching.  Redo them to
actually test impersonation.  Fixes problems with krb5 >= 1.18.2.

Resolves: pythongssapi#220

Signed-off-by: Robbie Harwood <rharwood@redhat.com>
  • Loading branch information
frozencemetery committed Nov 13, 2020
1 parent 7d2ab77 commit 40420db
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 59 deletions.
56 changes: 33 additions & 23 deletions gssapi/tests/test_high_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,39 +312,49 @@ def test_pickle_unpickle(self):
usage='initiate')
@ktu.gssapi_extension_test('s4u', 'S4U')
def test_impersonate(self, str_name, kwargs):
target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
# TODO(directxman12): make this use the high-level SecurityContext
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free everything but the token

server_name = self.name
server_creds = gsscreds.Credentials(name=server_name,
usage='both')
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_creds=server_creds)

imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs)
server_name = gssnames.Name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)

password = self.realm.password("user")
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
client_ctx = gssctx.SecurityContext(
name=server_name, flags=gb.RequirementFlag.delegate_to_peer)
client_token = client_ctx.step()

self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gsscreds.Credentials(usage="both")
server_ctx = gssctx.SecurityContext(creds=server_creds)
server_ctx.step(client_token)
self.assertTrue(server_ctx.complete)

imp_creds = server_ctx.delegated_creds.impersonate(server_name,
**kwargs)
self.assertIsInstance(imp_creds, gsscreds.Credentials)

@ktu.gssapi_extension_test('s4u', 'S4U')
def test_add_with_impersonate(self):
target_name = gssnames.Name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
client_ctx = gssctx.SecurityContext(name=target_name)
server_name = gssnames.Name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)

password = self.realm.password("user")
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
client_ctx = gssctx.SecurityContext(
name=server_name, flags=gb.RequirementFlag.delegate_to_peer)
client_token = client_ctx.step()

server_creds = gsscreds.Credentials(usage='both')
server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept')
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gsscreds.Credentials(usage="both")
server_ctx = gssctx.SecurityContext(creds=server_creds)
server_ctx.step(client_token)
self.assertTrue(server_ctx.complete)

# use empty creds to test here
input_creds = gsscreds.Credentials(gb.Creds())
new_creds = input_creds.add(server_ctx.initiator_name,
gb.MechType.kerberos,
impersonator=server_creds,
usage='initiate')
new_creds = input_creds.add(
server_name, gb.MechType.kerberos,
impersonator=server_ctx.delegated_creds, usage='initiate')
self.assertIsInstance(new_creds, gsscreds.Credentials)


Expand Down
80 changes: 44 additions & 36 deletions gssapi/tests/test_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,56 +321,64 @@ def test_inquire_context(self):

@ktu.gssapi_extension_test('s4u', 'S4U')
def test_add_cred_impersonate_name(self):
target_name = gb.import_name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free all the things (except the token)!

server_name = gb.import_name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)
server_creds = gb.acquire_cred(server_name, usage='both')[0]
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_creds=server_creds)

password = self.realm.password('user')
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
name = gb.import_name(b"user", gb.NameType.kerberos_principal)
client_creds = gb.acquire_cred(name, usage="initiate").creds
cctx_res = gb.init_sec_context(
server_name, creds=client_creds,
flags=gb.RequirementFlag.delegate_to_peer)

self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gb.acquire_cred(server_name, usage="both").creds
sctx_res = gb.accept_sec_context(cctx_res.token, server_creds)
self.assertTrue(gb.inquire_context(sctx_res.context).complete)

input_creds = gb.Creds()
imp_resp = gb.add_cred_impersonate_name(input_creds,
server_creds,
server_ctx_resp[1],
sctx_res.delegated_creds,
server_name,
gb.MechType.kerberos)
self.assertIsNotNone(imp_resp)

new_creds, actual_mechs, output_init_ttl, output_accept_ttl = imp_resp
self.assertIsInstance(new_creds, gb.Creds)
self.assertIn(gb.MechType.kerberos, actual_mechs)
self.assertIsInstance(output_init_ttl, int)
self.assertIsInstance(output_accept_ttl, int)
self.assertIsInstance(imp_resp, gb.AddCredResult)
self.assertIsInstance(imp_resp.creds, gb.Creds)
self.assertIn(gb.MechType.kerberos, imp_resp.mechs)
self.assertIsInstance(imp_resp.init_lifetime, int)
self.assertGreater(imp_resp.init_lifetime, 0)
self.assertIsInstance(imp_resp.accept_lifetime, int)
self.assertEqual(imp_resp.accept_lifetime, 0)

@ktu.gssapi_extension_test('s4u', 'S4U')
def test_acquire_creds_impersonate_name(self):
target_name = gb.import_name(TARGET_SERVICE_NAME,
gb.NameType.hostbased_service)
client_ctx_resp = gb.init_sec_context(target_name)
client_token = client_ctx_resp[3]
del client_ctx_resp # free all the things (except the token)!

server_name = gb.import_name(SERVICE_PRINCIPAL,
gb.NameType.kerberos_principal)
server_creds = gb.acquire_cred(server_name, usage='both')[0]
server_ctx_resp = gb.accept_sec_context(client_token,
acceptor_creds=server_creds)

imp_resp = gb.acquire_cred_impersonate_name(server_creds,
server_ctx_resp[1])
self.assertIsNotNone(imp_resp)

imp_creds, actual_mechs, output_ttl = imp_resp
self.assertIsInstance(imp_creds, gb.Creds)
self.assertIn(gb.MechType.kerberos, actual_mechs)
self.assertIsInstance(output_ttl, int)
password = self.realm.password('user')
self.realm.kinit(self.realm.user_princ, password=password,
flags=["-f"])
name = gb.import_name(b'user', gb.NameType.kerberos_principal)
client_creds = gb.acquire_cred(name, usage="initiate").creds
cctx_res = gb.init_sec_context(
server_name, creds=client_creds,
flags=gb.RequirementFlag.delegate_to_peer)

# no need to explicitly release any more -- we can just rely on
# __dealloc__ (b/c cython)
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
server_creds = gb.acquire_cred(server_name, usage='both').creds
sctx_res = gb.accept_sec_context(cctx_res.token, server_creds)
self.assertTrue(gb.inquire_context(sctx_res.context).complete)

imp_resp = gb.acquire_cred_impersonate_name(sctx_res.delegated_creds,
server_name)
self.assertIsInstance(imp_resp, gb.AcquireCredResult)
self.assertIsInstance(imp_resp.creds, gb.Creds)
self.assertGreater(len(imp_resp.mechs), 0)
self.assertIn(gb.MechType.kerberos, imp_resp.mechs)
self.assertIsInstance(imp_resp.lifetime, int)
self.assertGreater(imp_resp.lifetime, 0)

@ktu.gssapi_extension_test('s4u', 'S4U')
@ktu.krb_minversion_test('1.11',
Expand Down

0 comments on commit 40420db

Please sign in to comment.