From 6115f9a8a8a74c3bd19612dc5b5417e61f48574b Mon Sep 17 00:00:00 2001 From: Robbie Harwood Date: Thu, 12 Nov 2020 18:03:03 -0500 Subject: [PATCH] Fix S4U tests These tests relied on rickety behavior and caching. Redo them to actually test impersonation. Fixes problems with krb5 >= 1.18.2. Resolves: #220 Signed-off-by: Robbie Harwood --- gssapi/tests/test_high_level.py | 60 ++++++++++++++--------- gssapi/tests/test_raw.py | 84 ++++++++++++++++++--------------- 2 files changed, 82 insertions(+), 62 deletions(-) diff --git a/gssapi/tests/test_high_level.py b/gssapi/tests/test_high_level.py index 13092d56..10726938 100644 --- a/gssapi/tests/test_high_level.py +++ b/gssapi/tests/test_high_level.py @@ -314,39 +314,53 @@ def test_pickle_unpickle(self): usage='initiate') @ktu.gssapi_extension_test('s4u', 'S4U') def test_impersonate(self, str_name, kwargs): - target_name = gssnames.Name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - # TODO(directxman12): make this use the high-level SecurityContext - client_ctx_resp = gb.init_sec_context(target_name) - client_token = client_ctx_resp[3] - del client_ctx_resp # free everything but the token - - server_name = self.name - server_creds = gsscreds.Credentials(name=server_name, - usage='both') - server_ctx_resp = gb.accept_sec_context(client_token, - acceptor_creds=server_creds) - - imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs) + server_name = gssnames.Name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + + password = self.realm.password("user") + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + name = gssnames.Name(b"user", gb.NameType.kerberos_principal) + client_creds = gsscreds.Credentials(name=name, usage="initiate") + client_ctx = gssctx.SecurityContext( + name=server_name, flags=gb.RequirementFlag.delegate_to_peer) + client_token = client_ctx.step() + + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gsscreds.Credentials(usage="both") + server_ctx = gssctx.SecurityContext(creds=server_creds) + server_ctx.step(client_token) + self.assertTrue(server_ctx.complete) + + imp_creds = server_ctx.delegated_creds.impersonate(server_name, + **kwargs) self.assertIsInstance(imp_creds, gsscreds.Credentials) @ktu.gssapi_extension_test('s4u', 'S4U') def test_add_with_impersonate(self): - target_name = gssnames.Name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - client_ctx = gssctx.SecurityContext(name=target_name) + server_name = gssnames.Name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + + password = self.realm.password("user") + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + name = gssnames.Name(b"user", gb.NameType.kerberos_principal) + client_creds = gsscreds.Credentials(name=name, usage="initiate") + client_ctx = gssctx.SecurityContext( + name=server_name, flags=gb.RequirementFlag.delegate_to_peer) client_token = client_ctx.step() - server_creds = gsscreds.Credentials(usage='both') - server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept') + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gsscreds.Credentials(usage="both") + server_ctx = gssctx.SecurityContext(creds=server_creds) server_ctx.step(client_token) + self.assertTrue(server_ctx.complete) # use empty creds to test here input_creds = gsscreds.Credentials(gb.Creds()) - new_creds = input_creds.add(server_ctx.initiator_name, - gb.MechType.kerberos, - impersonator=server_creds, - usage='initiate') + new_creds = input_creds.add( + server_name, gb.MechType.kerberos, + impersonator=server_ctx.delegated_creds, usage='initiate') self.assertIsInstance(new_creds, gsscreds.Credentials) diff --git a/gssapi/tests/test_raw.py b/gssapi/tests/test_raw.py index 3bd2807d..b0838877 100644 --- a/gssapi/tests/test_raw.py +++ b/gssapi/tests/test_raw.py @@ -322,58 +322,64 @@ def test_inquire_context(self): @ktu.gssapi_extension_test('s4u', 'S4U') def test_add_cred_impersonate_name(self): - target_name = gb.import_name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - client_ctx_resp = gb.init_sec_context(target_name) - client_token = client_ctx_resp[3] - del client_ctx_resp # free all the things (except the token)! - server_name = gb.import_name(SERVICE_PRINCIPAL, gb.NameType.kerberos_principal) - server_creds = gb.acquire_cred(server_name, usage='both')[0] - server_ctx_resp = gb.accept_sec_context(client_token, - acceptor_creds=server_creds) + + password = self.realm.password('user') + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + name = gb.import_name(b"user", gb.NameType.kerberos_principal) + client_creds = gb.acquire_cred(name, usage="initiate").creds + cctx_res = gb.init_sec_context( + server_name, creds=client_creds, + flags=gb.RequirementFlag.delegate_to_peer) + + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gb.acquire_cred(server_name, usage="both").creds + sctx_res = gb.accept_sec_context(cctx_res.token, server_creds) + self.assertTrue(gb.inquire_context(sctx_res.context).complete) input_creds = gb.Creds() imp_resp = gb.add_cred_impersonate_name(input_creds, - server_creds, - server_ctx_resp[1], + sctx_res.delegated_creds, + server_name, gb.MechType.kerberos) - self.assertIsNotNone(imp_resp) - - new_creds, actual_mechs, output_init_ttl, output_accept_ttl = imp_resp - self.assertIsInstance(new_creds, gb.Creds) - self.assertGreater(len(actual_mechs), 0) - self.assertIn(gb.MechType.kerberos, actual_mechs) - self.assertIsInstance(output_init_ttl, int) - self.assertIsInstance(output_accept_ttl, int) + self.assertIsInstance(imp_resp, gb.AddCredResult) + self.assertIsInstance(imp_resp.creds, gb.Creds) + self.assertGreater(len(imp_resp.mechs), 0) + self.assertIn(gb.MechType.kerberos, imp_resp.mechs) + self.assertIsInstance(imp_resp.init_lifetime, int) + self.assertGreater(imp_resp.init_lifetime, 0) + self.assertIsInstance(imp_resp.accept_lifetime, int) + self.assertEqual(imp_resp.accept_lifetime, 0) @ktu.gssapi_extension_test('s4u', 'S4U') def test_acquire_creds_impersonate_name(self): - target_name = gb.import_name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - client_ctx_resp = gb.init_sec_context(target_name) - client_token = client_ctx_resp[3] - del client_ctx_resp # free all the things (except the token)! - server_name = gb.import_name(SERVICE_PRINCIPAL, gb.NameType.kerberos_principal) - server_creds = gb.acquire_cred(server_name, usage='both')[0] - server_ctx_resp = gb.accept_sec_context(client_token, - acceptor_creds=server_creds) - imp_resp = gb.acquire_cred_impersonate_name(server_creds, - server_ctx_resp[1]) - self.assertIsNotNone(imp_resp) - - imp_creds, actual_mechs, output_ttl = imp_resp - self.assertIsInstance(imp_creds, gb.Creds) - self.assertGreater(len(actual_mechs), 0) - self.assertIn(gb.MechType.kerberos, actual_mechs) - self.assertIsInstance(output_ttl, int) + password = self.realm.password('user') + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + name = gb.import_name(b'user', gb.NameType.kerberos_principal) + client_creds = gb.acquire_cred(name, usage="initiate").creds + cctx_res = gb.init_sec_context( + server_name, creds=client_creds, + flags=gb.RequirementFlag.delegate_to_peer) - # no need to explicitly release any more -- we can just rely on - # __dealloc__ (b/c cython) + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gb.acquire_cred(server_name, usage='both').creds + sctx_res = gb.accept_sec_context(cctx_res.token, server_creds) + self.assertTrue(gb.inquire_context(sctx_res.context).complete) + + imp_resp = gb.acquire_cred_impersonate_name(sctx_res.delegated_creds, + server_name) + self.assertIsInstance(imp_resp, gb.AcquireCredResult) + self.assertIsInstance(imp_resp.creds, gb.Creds) + self.assertGreater(len(imp_resp.mechs), 0) + self.assertIn(gb.MechType.kerberos, imp_resp.mechs) + self.assertIsInstance(imp_resp.lifetime, int) + self.assertGreater(imp_resp.lifetime, 0) @ktu.gssapi_extension_test('s4u', 'S4U') @ktu.krb_minversion_test('1.11',