From 842017b60766ef54fc5e47f306fa9313e9165f5f Mon Sep 17 00:00:00 2001 From: Robbie Harwood Date: Thu, 12 Nov 2020 18:03:03 -0500 Subject: [PATCH] Fix S4U tests These tests relied on rickety behavior and caching. Redo them to actually test impersonation. Fixes problems with krb5 >= 1.18.2. Resolves: #220 Signed-off-by: Robbie Harwood --- gssapi/tests/test_high_level.py | 56 +++++++++++++---------- gssapi/tests/test_raw.py | 80 ++++++++++++++++++--------------- 2 files changed, 76 insertions(+), 60 deletions(-) diff --git a/gssapi/tests/test_high_level.py b/gssapi/tests/test_high_level.py index e0633ed4..09cf05b0 100644 --- a/gssapi/tests/test_high_level.py +++ b/gssapi/tests/test_high_level.py @@ -312,39 +312,49 @@ def test_pickle_unpickle(self): usage='initiate') @ktu.gssapi_extension_test('s4u', 'S4U') def test_impersonate(self, str_name, kwargs): - target_name = gssnames.Name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - # TODO(directxman12): make this use the high-level SecurityContext - client_ctx_resp = gb.init_sec_context(target_name) - client_token = client_ctx_resp[3] - del client_ctx_resp # free everything but the token - - server_name = self.name - server_creds = gsscreds.Credentials(name=server_name, - usage='both') - server_ctx_resp = gb.accept_sec_context(client_token, - acceptor_creds=server_creds) - - imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs) + server_name = gssnames.Name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + + password = self.realm.password("user") + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + client_ctx = gssctx.SecurityContext( + name=server_name, flags=gb.RequirementFlag.delegate_to_peer) + client_token = client_ctx.step() + + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gsscreds.Credentials(usage="both") + server_ctx = gssctx.SecurityContext(creds=server_creds) + server_ctx.step(client_token) + self.assertTrue(server_ctx.complete) + + imp_creds = server_ctx.delegated_creds.impersonate(server_name, + **kwargs) self.assertIsInstance(imp_creds, gsscreds.Credentials) @ktu.gssapi_extension_test('s4u', 'S4U') def test_add_with_impersonate(self): - target_name = gssnames.Name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - client_ctx = gssctx.SecurityContext(name=target_name) + server_name = gssnames.Name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + + password = self.realm.password("user") + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + client_ctx = gssctx.SecurityContext( + name=server_name, flags=gb.RequirementFlag.delegate_to_peer) client_token = client_ctx.step() - server_creds = gsscreds.Credentials(usage='both') - server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept') + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gsscreds.Credentials(usage="both") + server_ctx = gssctx.SecurityContext(creds=server_creds) server_ctx.step(client_token) + self.assertTrue(server_ctx.complete) # use empty creds to test here input_creds = gsscreds.Credentials(gb.Creds()) - new_creds = input_creds.add(server_ctx.initiator_name, - gb.MechType.kerberos, - impersonator=server_creds, - usage='initiate') + new_creds = input_creds.add( + server_name, gb.MechType.kerberos, + impersonator=server_ctx.delegated_creds, usage='initiate') self.assertIsInstance(new_creds, gsscreds.Credentials) diff --git a/gssapi/tests/test_raw.py b/gssapi/tests/test_raw.py index f3fd8869..3742b279 100644 --- a/gssapi/tests/test_raw.py +++ b/gssapi/tests/test_raw.py @@ -58,7 +58,6 @@ def setUp(self): def test_indicate_mechs(self): mechs = gb.indicate_mechs() self.assertIsInstance(mechs, set) - self.assertGreater(len(mechs), 0) self.assertIn(gb.MechType.kerberos, mechs) def test_import_name(self): @@ -320,56 +319,63 @@ def test_inquire_context(self): @ktu.gssapi_extension_test('s4u', 'S4U') def test_add_cred_impersonate_name(self): - target_name = gb.import_name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - client_ctx_resp = gb.init_sec_context(target_name) - client_token = client_ctx_resp[3] - del client_ctx_resp # free all the things (except the token)! - server_name = gb.import_name(SERVICE_PRINCIPAL, gb.NameType.kerberos_principal) - server_creds = gb.acquire_cred(server_name, usage='both')[0] - server_ctx_resp = gb.accept_sec_context(client_token, - acceptor_creds=server_creds) + + password = self.realm.password('user') + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + name = gb.import_name(b"user", gb.NameType.kerberos_principal) + client_creds = gb.acquire_cred(name, usage="initiate").creds + cctx_res = gb.init_sec_context( + server_name, creds=client_creds, + flags=gb.RequirementFlag.delegate_to_peer) + + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gb.acquire_cred(server_name, usage="both").creds + sctx_res = gb.accept_sec_context(cctx_res.token, server_creds) + self.assertTrue(gb.inquire_context(sctx_res.context).complete) input_creds = gb.Creds() imp_resp = gb.add_cred_impersonate_name(input_creds, - server_creds, - server_ctx_resp[1], + sctx_res.delegated_creds, + server_name, gb.MechType.kerberos) self.assertIsNotNone(imp_resp) - - new_creds, actual_mechs, output_init_ttl, output_accept_ttl = imp_resp - self.assertIsInstance(new_creds, gb.Creds) - self.assertIn(gb.MechType.kerberos, actual_mechs) - self.assertIsInstance(output_init_ttl, int) - self.assertIsInstance(output_accept_ttl, int) + self.assertIsInstance(imp_resp, gb.AddCredResult) + self.assertIsInstance(imp_resp.creds, gb.Creds) + self.assertIn(gb.MechType.kerberos, imp_resp.mechs) + self.assertIsInstance(imp_resp.init_lifetime, int) + self.assertGreater(imp_resp.init_lifetime, 0) + self.assertIsInstance(imp_resp.accept_lifetime, int) + self.assertEqual(imp_resp.accept_lifetime, 0) @ktu.gssapi_extension_test('s4u', 'S4U') def test_acquire_creds_impersonate_name(self): - target_name = gb.import_name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - client_ctx_resp = gb.init_sec_context(target_name) - client_token = client_ctx_resp[3] - del client_ctx_resp # free all the things (except the token)! - server_name = gb.import_name(SERVICE_PRINCIPAL, gb.NameType.kerberos_principal) - server_creds = gb.acquire_cred(server_name, usage='both')[0] - server_ctx_resp = gb.accept_sec_context(client_token, - acceptor_creds=server_creds) - imp_resp = gb.acquire_cred_impersonate_name(server_creds, - server_ctx_resp[1]) - self.assertIsNotNone(imp_resp) - - imp_creds, actual_mechs, output_ttl = imp_resp - self.assertIsInstance(imp_creds, gb.Creds) - self.assertIn(gb.MechType.kerberos, actual_mechs) - self.assertIsInstance(output_ttl, int) + password = self.realm.password('user') + self.realm.kinit(self.realm.user_princ, password=password, + flags=["-f"]) + name = gb.import_name(b'user', gb.NameType.kerberos_principal) + client_creds = gb.acquire_cred(name, usage="initiate").creds + cctx_res = gb.init_sec_context( + server_name, creds=client_creds, + flags=gb.RequirementFlag.delegate_to_peer) - # no need to explicitly release any more -- we can just rely on - # __dealloc__ (b/c cython) + self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"]) + server_creds = gb.acquire_cred(server_name, usage='both').creds + sctx_res = gb.accept_sec_context(cctx_res.token, server_creds) + self.assertTrue(gb.inquire_context(sctx_res.context).complete) + + imp_resp = gb.acquire_cred_impersonate_name(sctx_res.delegated_creds, + server_name) + self.assertIsInstance(imp_resp, gb.AcquireCredResult) + self.assertIsInstance(imp_resp.creds, gb.Creds) + self.assertIn(gb.MechType.kerberos, imp_resp.mechs) + self.assertIsInstance(imp_resp.lifetime, int) + self.assertGreater(imp_resp.lifetime, 0) @ktu.gssapi_extension_test('s4u', 'S4U') @ktu.krb_minversion_test('1.11',