diff --git a/README.txt b/README.txt index fc44bf27..14219cc6 100644 --- a/README.txt +++ b/README.txt @@ -165,6 +165,8 @@ In addition to RFC 2743/2744, Python-GSSAPI also has support for: * GGF Extensions +* Kerberos specific extensions + The Team ======== diff --git a/ci/lib-setup.sh b/ci/lib-setup.sh index 2fa93292..fdb0e9dd 100755 --- a/ci/lib-setup.sh +++ b/ci/lib-setup.sh @@ -12,6 +12,8 @@ setup::debian::install() { else apt-get -y install krb5-{user,kdc,admin-server,multidev} libkrb5-dev \ gss-ntlmssp + + export GSSAPI_KRB5_MAIN_LIB="/usr/lib/x86_64-linux-gnu/libkrb5.so" fi apt-get -y install gcc virtualenv python3-{virtualenv,dev} cython3 @@ -46,6 +48,7 @@ setup::fedora::install() { setup::rh::install() { setup::rh::yuminst krb5-{devel,libs,server,workstation} \ which gcc findutils gssntlmssp + export GSSAPI_KRB5_MAIN_LIB="/usr/lib64/libkrb5.so" if [ -f /etc/fedora-release ]; then setup::fedora::install diff --git a/docs/source/gssapi.raw.rst b/docs/source/gssapi.raw.rst index 69078d20..970955a9 100644 --- a/docs/source/gssapi.raw.rst +++ b/docs/source/gssapi.raw.rst @@ -173,6 +173,13 @@ Acquiring Credentials With a Password Extensions :members: :undoc-members: +Kerberos Specific Extensions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. automodule:: gssapi.raw.ext_krb5 + :members: + :undoc-members: + Other Extensions ~~~~~~~~~~~~~~~~ diff --git a/gssapi/raw/__init__.py b/gssapi/raw/__init__.py index b2320991..b18a74b2 100644 --- a/gssapi/raw/__init__.py +++ b/gssapi/raw/__init__.py @@ -125,6 +125,12 @@ except ImportError: pass +# optional KRB5 specific extension support +try: + from gssapi.raw.ext_krb5 import * # noqa +except ImportError: + pass + # optional RFC 6680 support try: from gssapi.raw.ext_rfc6680 import * # noqa diff --git a/gssapi/raw/ext_krb5.pxd b/gssapi/raw/ext_krb5.pxd new file mode 100644 index 00000000..ae796c7a --- /dev/null +++ b/gssapi/raw/ext_krb5.pxd @@ -0,0 +1,2 @@ +cdef class Krb5LucidContext: + cdef void *raw_ctx diff --git a/gssapi/raw/ext_krb5.pyx b/gssapi/raw/ext_krb5.pyx new file mode 100644 index 00000000..da989ae5 --- /dev/null +++ b/gssapi/raw/ext_krb5.pyx @@ -0,0 +1,578 @@ +GSSAPI="BASE" # This ensures that a full module is generated by Cython + +import typing + +from libc.stdint cimport int32_t, int64_t, uint64_t, uintptr_t, UINT32_MAX +from libc.stdlib cimport calloc, free +from libc.time cimport time_t + +from gssapi.raw.creds cimport Creds +from gssapi.raw.cython_converters cimport c_make_oid +from gssapi.raw.cython_types cimport * +from gssapi.raw.sec_contexts cimport SecurityContext + +from gssapi.raw import types as gsstypes +from gssapi.raw.named_tuples import CfxKeyData, Rfc1964KeyData + +from gssapi.raw.misc import GSSError + + +cdef extern from "python_gssapi_krb5.h": + # Heimdal on macOS hides these 3 functions behind a private symbol + """ + #ifdef OSX_HAS_GSS_FRAMEWORK + #define gsskrb5_extract_authtime_from_sec_context \ + __ApplePrivate_gsskrb5_extract_authtime_from_sec_context + + #define gss_krb5_import_cred __ApplePrivate_gss_krb5_import_cred + + #define gss_krb5_get_tkt_flags __ApplePrivate_gss_krb5_get_tkt_flags + #endif + """ + + cdef struct gss_krb5_lucid_key: + OM_uint32 type + OM_uint32 length + void *data + ctypedef gss_krb5_lucid_key gss_krb5_lucid_key_t + + cdef struct gss_krb5_rfc1964_keydata: + OM_uint32 sign_alg + OM_uint32 seal_alg + gss_krb5_lucid_key_t ctx_key + ctypedef gss_krb5_rfc1964_keydata gss_krb5_rfc1964_keydata_t + + cdef struct gss_krb5_cfx_keydata: + OM_uint32 have_acceptor_subkey + gss_krb5_lucid_key_t ctx_key + gss_krb5_lucid_key_t acceptor_subkey + ctypedef gss_krb5_cfx_keydata gss_krb5_cfx_keydata_t + + cdef struct gss_krb5_lucid_context_v1: + OM_uint32 version + OM_uint32 initiate + OM_uint32 endtime + uint64_t send_seq + uint64_t recv_seq + OM_uint32 protocol + gss_krb5_rfc1964_keydata_t rfc1964_kd + gss_krb5_cfx_keydata_t cfx_kd + ctypedef gss_krb5_lucid_context_v1 gss_krb5_lucid_context_v1_t + + gss_OID GSS_KRB5_NT_PRINCIPAL_NAME + int32_t _PY_GSSAPI_KRB5_TIMESTAMP + + # The krb5 specific types are defined generically as the type names differ + # across GSSAPI implementations. + + OM_uint32 gss_krb5_ccache_name(OM_uint32 *minor_status, const char *name, + const char **out_name) nogil + + OM_uint32 gss_krb5_export_lucid_sec_context(OM_uint32 *minor_status, + gss_ctx_id_t *context_handle, + OM_uint32 version, + void **kctx) nogil + + # The actual authtime size differs across implementations. See individual + # methods for more information. + OM_uint32 gsskrb5_extract_authtime_from_sec_context( + OM_uint32 *minor_status, gss_ctx_id_t context_handle, + void *authtime) nogil + + OM_uint32 gsskrb5_extract_authz_data_from_sec_context( + OM_uint32 *minor_status, const gss_ctx_id_t context_handle, + int ad_type, gss_buffer_t ad_data) nogil + + OM_uint32 gss_krb5_free_lucid_sec_context(OM_uint32 *minor_status, + void *kctx) nogil + + OM_uint32 gss_krb5_import_cred(OM_uint32 *minor_status, + void *id, # krb5_ccache + void *keytab_principal, # krb5_principal + void *keytab, # krb5_keytab + gss_cred_id_t *cred) nogil + + # MIT uses a int32_t whereas Heimdal uses uint32_t. Use void * to satisfy + # the compiler. + OM_uint32 gss_krb5_get_tkt_flags(OM_uint32 *minor_status, + gss_ctx_id_t context_handle, + void *ticket_flags) nogil + + OM_uint32 gss_krb5_set_allowable_enctypes(OM_uint32 *minor_status, + gss_cred_id_t cred, + OM_uint32 num_ktypes, + int32_t *ktypes) nogil + + +cdef class Krb5LucidContext: + """ + The base container returned by :meth:`krb5_export_lucid_sec_context` when + an unknown version was requested. + """ + # defined in pxd + # cdef void *raw_ctx + + def __cinit__(Krb5LucidContext self): + self.raw_ctx = NULL + + def __dealloc__(Krb5LucidContext self): + cdef OM_uint32 min_stat = 0 + + if self.raw_ctx: + gss_krb5_free_lucid_sec_context(&min_stat, self.raw_ctx) + self.raw_ctx = NULL + + +cdef class Krb5LucidContextV1(Krb5LucidContext): + """ + Kerberos context data returned by :meth:`krb5_export_lucid_sec_context` + when version 1 was requested. + """ + + @property + def version(self) -> typing.Optional[int]: + """The structure version number + + Returns: + Optional[int]: the structure version number + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx: + ctx = self.raw_ctx + return ctx.version + + @property + def is_initiator(self) -> typing.Optional[bool]: + """Whether the context was the initiator + + Returns: + Optional[bool]: ``True`` when the exported context was the + initiator + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx: + ctx = self.raw_ctx + return ctx.initiate != 0 + + @property + def endtime(self) -> typing.Optional[int]: + """Expiration time of the context + + Returns: + Optional[int]: the expiration time of the context + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx: + ctx = self.raw_ctx + return ctx.endtime + + @property + def send_seq(self) -> typing.Optional[int]: + """Sender sequence number + + Returns: + Optional[int]: the sender sequence number + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx: + ctx = self.raw_ctx + return ctx.send_seq + + @property + def recv_seq(self) -> typing.Optional[int]: + """Receiver sequence number + + Returns: + Optional[int]: the receiver sequence number + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx: + ctx = self.raw_ctx + return ctx.recv_seq + + @property + def protocol(self) -> typing.Optional[int]: + """The protocol number + + If the protocol number is 0 then :attr:`rfc1964_kd` is set and + :attr:`cfx_kd` is `None`. If the protocol number is 1 then the opposite + is true. + + Protocol 0 refers to RFC1964 and 1 refers to RFC4121. + + Returns: + Optional[int]: the protocol number + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx: + ctx = self.raw_ctx + return ctx.protocol + + @property + def rfc1964_kd(self) -> typing.Optional[Rfc1964KeyData]: + """Keydata for protocol 0 (RFC1964) + + This will be set when :attr:`protocol` is ``0``. + + Returns: + Optional[Rfc1964KeyData]: the RFC1964 key data + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx != NULL and self.protocol == 0: + ctx = self.raw_ctx + kd = ctx.rfc1964_kd + key = (kd.ctx_key.data)[:kd.ctx_key.length] + + return Rfc1964KeyData(kd.sign_alg, kd.seal_alg, kd.ctx_key.type, + key) + + @property + def cfx_kd(self) -> typing.Optional[CfxKeyData]: + """Key data for protocol 1 (RFC4121) + + This will be set when :attr:`protocol` is ``1``. + + Returns: + Optional[CfxKeyData]: the RFC4121 key data + """ + cdef gss_krb5_lucid_context_v1_t *ctx = NULL + + if self.raw_ctx != NULL and self.protocol == 1: + ctx = self.raw_ctx + kd = ctx.cfx_kd + ctx_type = ctx_key = acceptor_type = acceptor_key = None + + ctx_type = kd.ctx_key.type + ctx_key = (kd.ctx_key.data)[:kd.ctx_key.length] + + if kd.have_acceptor_subkey != 0: + acceptor_type = kd.acceptor_subkey.type + key = kd.acceptor_subkey + acceptor_key = (key.data)[:key.length] + + return CfxKeyData(ctx_type, ctx_key, acceptor_type, + acceptor_key) + + +# Unfortunately MIT defines it as const - use the cast to silence warnings +gsstypes.NameType.krb5_nt_principal_name = c_make_oid( + GSS_KRB5_NT_PRINCIPAL_NAME) + + +def krb5_ccache_name(const unsigned char[:] name): + """ + krb5_ccache_name(name) + Set the default Kerberos Protocol credentials cache name. + + This method sets the default credentials cache name for use by he Kerberos + mechanism. The default credentials cache is used by + :meth:`~gssapi.raw.creds.acquire_cred` to create a GSS-API credential. It + is also used by :meth:`~gssapi.raw.sec_contexts.init_sec_context` when + `GSS_C_NO_CREDENTIAL` is specified. + + Note: + Heimdal does not return the old name when called. It also does not + reset the ccache lookup behaviour when setting to ``None``. + + Note: + The return value may not be thread safe. + + Args: + name (Optional[bytes]): the name to set as the new thread specific + ccache name. Set to ``None`` to revert back to getting the ccache + from the config/environment settings. + + Returns: + bytes: the old name that was previously set + + Raises: + ~gssapi.exceptions.GSSError + """ + cdef const char *name_ptr = NULL + if name is not None and len(name): + name_ptr = &name[0] + + cdef const char *old_name_ptr = NULL + cdef OM_uint32 maj_stat, min_stat + with nogil: + maj_stat = gss_krb5_ccache_name(&min_stat, name_ptr, &old_name_ptr) + + if maj_stat == GSS_S_COMPLETE: + out_name = None + if old_name_ptr: + out_name = old_name_ptr + + return out_name + + else: + raise GSSError(maj_stat, min_stat) + + +def krb5_export_lucid_sec_context(SecurityContext context not None, + OM_uint32 version): + """ + krb5_export_lucid_sec_context(context, version) + Retuns a non-opaque version of the internal context info. + + Gets information about the Kerberos security context passed in. Currently + only version 1 is known and supported by this library. + + Note: + The context handle must not be used again by the caller after this + call. + + Args: + context ((~gssapi.raw.sec_contexts.SecurityContext): the current + security context + version (int): the output structure version to export. Currently + only 1 is supported. + + Returns: + Krb5LucidContext: the non-opaque version context info + + Raises: + ~gssapi.exceptions.GSSError + """ + info = { + 1: Krb5LucidContextV1, + }.get(version, Krb5LucidContext)() + cdef void **raw_ctx = &(info).raw_ctx + + cdef OM_uint32 maj_stat, min_stat + with nogil: + maj_stat = gss_krb5_export_lucid_sec_context(&min_stat, + &context.raw_ctx, + version, raw_ctx) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) + + return info + + +def krb5_extract_authtime_from_sec_context(SecurityContext context not None): + """ + krb5_extract_authtime_from_sec_context(context) + Get the auth time for the security context. + + Gets the auth time for the established security context. + + Note: + Heimdal can only get the authtime on the acceptor security context. + MIT is able to get the authtime on both initiators and acceptors. + + Args: + context ((~gssapi.raw.sec_contexts.SecurityContext): the current + security context + + Returns: + int: the authtime + + Raises: + ~gssapi.exceptions.GSSError + """ + # In Heimdal, authtime is time_t which is either a 4 or 8 byte int. By + # passing in a uint64_t reference, there should be enough space for GSSAPI + # to store the data in either situation. Coming back to Python it will be + # handled as a normal int without loosing data. + cdef uint64_t time = 0 + + cdef OM_uint32 maj_stat, min_stat + with nogil: + maj_stat = gsskrb5_extract_authtime_from_sec_context(&min_stat, + context.raw_ctx, + &time) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) + + return time + + +def krb5_extract_authz_data_from_sec_context(SecurityContext context not None, + ad_type): + """ + krb5_extract_authz_data_from_sec_context(context, ad_type) + Extracts Kerberos authorization data. + + Extracts authorization data that may be stored within the context. + + Note: + Only operates on acceptor contexts. + + Args: + context ((~gssapi.raw.sec_contexts.SecurityContext): the current + security context + ad_type (int): the type of data to extract + + Returns: + bytes: the raw authz data from the sec context + + Raises: + ~gssapi.exceptions.GSSError + """ + # GSS_C_EMPTY_BUFFER + cdef gss_buffer_desc ad_data = gss_buffer_desc(0, NULL) + cdef int ad_type_val = ad_type + + cdef OM_uint32 maj_stat, min_stat + with nogil: + maj_stat = gsskrb5_extract_authz_data_from_sec_context(&min_stat, + context.raw_ctx, + ad_type_val, + &ad_data) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) + + try: + return (ad_data.value)[:ad_data.length] + + finally: + gss_release_buffer(&min_stat, &ad_data) + + +def krb5_import_cred(Creds cred_handle not None, cache=None, + keytab_principal=None, keytab=None): + """ + krb5_import_cred(cred_handle, cache=None, keytab_principal=None, \ + keytab=None) + Import Krb5 credentials into GSSAPI credential. + + Imports the krb5 credentials (either or both of the keytab and cache) into + the GSSAPI credential so it can be used within GSSAPI. The ccache is + copied by reference and thus shared, so if the credential is destroyed, + all users of cred_handle will fail. + + Args: + cred_handle (Creds): the credential handle to import into + cache (int): the krb5_ccache address pointer, as an int, to import + from + keytab_principal (int): the krb5_principal address pointer, as an int, + of the credential to import + keytab (int): the krb5_keytab address pointer, as an int, of the + keytab to import + + Returns: + None + + Raises: + ~gssapi.exceptions.GSSError + """ + cdef void *cache_ptr = NULL + if cache is not None and cache: + cache_ptr = (cache) + + cdef void *keytab_princ = NULL + if keytab_principal is not None and keytab_principal: + keytab_princ = (keytab_principal) + + cdef void *kt = NULL + if keytab is not None and keytab: + kt = (keytab) + + if cache_ptr == NULL and kt == NULL: + raise ValueError("Either cache or keytab must be set") + + cdef OM_uint32 maj_stat, min_stat + with nogil: + maj_stat = gss_krb5_import_cred(&min_stat, cache_ptr, keytab_princ, + kt, &cred_handle.raw_creds) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) + + +def krb5_get_tkt_flags(SecurityContext context not None): + """ + krb5_get_tkt_flags(context) + Return ticket flags for the kerberos ticket. + + Return the ticket flags for the kerberos ticket received when + authenticating the initiator. + + Note: + Heimdal can only get the tkt flags on the acceptor security context. + MIT is able to get the tkt flags on initators and acceptors. + + Args: + context (~gssapi.raw.sec_contexts.SecurityContext): the security + context + + Returns: + int: the ticket flags for the received kerberos ticket + + Raises: + ~gssapi.exceptions.GSSError + """ + cdef OM_uint32 maj_stat, min_stat + cdef uint32_t ticket_flags = 0 + + with nogil: + maj_stat = gss_krb5_get_tkt_flags(&min_stat, context.raw_ctx, + &ticket_flags) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) + + return ticket_flags + + +def krb5_set_allowable_enctypes(Creds cred_handle not None, + ktypes): + """ + krb5_set_allowable_enctypes(cred_handle, ktypes) + Limits the keys that can be exported. + + Called by a context initiator after acquiring the creds but before calling + :meth:`~gssapi.raw.sec_contexts.init_sec_context` to restrict the set of + enctypes which will be negotiated during context establisment to those in + the provided list. + + Warning: + The cred_handle should not be ``GSS_C_NO_CREDENTIAL``. + + Args: + cred_hande (Creds): the credential handle + ktypes (List[int]): list of enctypes allowed + + Returns: + None + + Raises: + ~gssapi.exceptions.GSSError + """ + cdef OM_uint32 maj_stat, min_stat + + # This shouldn't ever happen but it's here to satisfy compiler warnings + cdef size_t ktypes_count = len(ktypes) + if ktypes_count > UINT32_MAX: + raise ValueError("ktypes list size too large") + + cdef uint32_t count = ktypes_count + cdef int32_t *enc_types = calloc(count, sizeof(int32_t)) + if not enc_types: + raise MemoryError() + + try: + for i, val in enumerate(ktypes): + enc_types[i] = val + + with nogil: + maj_stat = gss_krb5_set_allowable_enctypes(&min_stat, + cred_handle.raw_creds, + count, + enc_types) + + finally: + free(enc_types) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) diff --git a/gssapi/raw/named_tuples.py b/gssapi/raw/named_tuples.py index 3a0e798b..03501316 100644 --- a/gssapi/raw/named_tuples.py +++ b/gssapi/raw/named_tuples.py @@ -136,3 +136,19 @@ class InquireSASLNameResult(NamedTuple): sasl_mech_name: bytes #: The SASL name mech_name: bytes #: The mechanism name mech_description: bytes #: The mechanism description + + +class Rfc1964KeyData(NamedTuple): + """Security context key data based on RFC1964.""" + sign_alg: int #: Signing algorithm identifier + seal_alg: int #: Sealing algorithm identifier + key_type: int #: Key encryption type identifier + key: bytes #: Encryption key data + + +class CfxKeyData(NamedTuple): + """Securty context key data.""" + ctx_key_type: int #: Context key encryption type identifier + ctx_key: bytes #: Context key data - session or sub-session key + acceptor_subkey_type: Optional[int] #: Acceptor key enc type identifier + acceptor_subkey: Optional[bytes] #: Acceptor key data diff --git a/gssapi/raw/python_gssapi_krb5.h b/gssapi/raw/python_gssapi_krb5.h index c67af3b9..1ca45e01 100644 --- a/gssapi/raw/python_gssapi_krb5.h +++ b/gssapi/raw/python_gssapi_krb5.h @@ -1,5 +1,23 @@ #ifdef OSX_HAS_GSS_FRAMEWORK #include + +/* These functions are "private" in macOS GSS. They need to be redeclared so + * Cython can see them. */ +OM_uint32 +__ApplePrivate_gsskrb5_extract_authtime_from_sec_context(OM_uint32 *minor, + gss_ctx_id_t context, + void *authtime); + +OM_uint32 __ApplePrivate_gss_krb5_import_cred(OM_uint32 *minor_status, + void *id, + void *keytab_principal, + void *keytab, + gss_cred_id_t *cred); + +OM_uint32 __ApplePrivate_gss_krb5_get_tkt_flags(OM_uint32 *minor_status, + gss_ctx_id_t context_handle, + void *tkt_flags); + #elif defined(__MINGW32__) && defined(__MSYS__) #include #else diff --git a/gssapi/tests/test_raw.py b/gssapi/tests/test_raw.py index 737ad602..c821800e 100644 --- a/gssapi/tests/test_raw.py +++ b/gssapi/tests/test_raw.py @@ -1,4 +1,6 @@ import copy +import ctypes +import ctypes.util import os import socket import unittest @@ -28,6 +30,7 @@ def setUpClass(cls): cls.USER_PRINC = cls.realm.user_princ.split('@')[0].encode("UTF-8") cls.ADMIN_PRINC = cls.realm.admin_princ.split('@')[0].encode("UTF-8") + cls.KRB5_LIB_PATH = os.environ.get("GSSAPI_KRB5_MAIN_LIB", None) @classmethod def _init_env(cls): @@ -906,6 +909,316 @@ def test_set_cred_option_should_raise_error(self): self.assertRaises(gb.GSSError, gb.set_cred_option, invalid_oid, orig_cred, b"\x00") + @ktu.gssapi_extension_test('krb5', 'Kerberos Extensions') + def test_krb5_ccache_name(self): + new_ccache = os.path.join(self.realm.tmpdir, 'ccache-new') + new_env = self.realm.env.copy() + new_env['KRB5CCNAME'] = new_ccache + self.realm.kinit(self.realm.user_princ, + password=self.realm.password('user'), + env=new_env) + + old_ccache = gb.krb5_ccache_name(new_ccache.encode('utf-8')) + try: + self.assertEqual(old_ccache.decode('utf-8'), self.realm.ccache) + + cred_resp = gb.acquire_cred().creds + + princ_name = gb.inquire_cred(cred_resp, name=True).name + name = gb.display_name(princ_name, name_type=False).name + self.assertEqual(name, self.realm.user_princ.encode('utf-8')) + + changed_ccache = gb.krb5_ccache_name(old_ccache) + self.assertEqual(changed_ccache.decode('utf-8'), new_ccache) + + finally: + # Ensure original behaviour is back for other tests + gb.krb5_ccache_name(None) + + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + client_resp = gb.init_sec_context(target_name, creds=cred_resp) + client_ctx = client_resp[0] + client_token = client_resp[3] + + server_name = gb.import_name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + server_creds = gb.acquire_cred(server_name)[0] + server_resp = gb.accept_sec_context(client_token, + acceptor_creds=server_creds) + server_ctx = server_resp[0] + server_token = server_resp[3] + + gb.init_sec_context(target_name, context=client_ctx, + input_token=server_token) + initiator = gb.inquire_context(server_ctx, + initiator_name=True).initiator_name + initiator_name = gb.display_name(initiator, name_type=False).name + + self.assertEqual(initiator_name, self.realm.user_princ.encode('utf-8')) + + @ktu.gssapi_extension_test('krb5', 'Kerberos Extensions') + def test_krb5_export_lucid_sec_context(self): + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + ctx_resp = gb.init_sec_context(target_name) + + client_token1 = ctx_resp[3] + client_ctx = ctx_resp[0] + server_name = gb.import_name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + server_creds = gb.acquire_cred(server_name)[0] + server_resp = gb.accept_sec_context(client_token1, + acceptor_creds=server_creds) + server_ctx = server_resp[0] + server_tok = server_resp[3] + + client_resp2 = gb.init_sec_context(target_name, + context=client_ctx, + input_token=server_tok) + ctx = client_resp2[0] + + self.assertRaises(gb.GSSError, gb.krb5_export_lucid_sec_context, + ctx, 0) + + initiator_info = gb.krb5_export_lucid_sec_context(ctx, 1) + self.assertTrue(isinstance(initiator_info, gb.Krb5LucidContextV1)) + self.assertEqual(initiator_info.version, 1) + self.assertTrue(initiator_info.is_initiator) + self.assertTrue(isinstance(initiator_info.endtime, int)) + self.assertTrue(isinstance(initiator_info.send_seq, int)) + self.assertTrue(isinstance(initiator_info.recv_seq, int)) + self.assertEqual(initiator_info.protocol, 1) + self.assertEqual(initiator_info.rfc1964_kd, None) + self.assertTrue(isinstance(initiator_info.cfx_kd, gb.CfxKeyData)) + self.assertTrue(isinstance(initiator_info.cfx_kd.ctx_key_type, int)) + self.assertTrue(isinstance(initiator_info.cfx_kd.ctx_key, bytes)) + self.assertTrue(isinstance(initiator_info.cfx_kd.acceptor_subkey_type, + int)) + self.assertTrue(isinstance(initiator_info.cfx_kd.acceptor_subkey, + bytes)) + + acceptor_info = gb.krb5_export_lucid_sec_context(server_ctx, 1) + self.assertTrue(isinstance(acceptor_info, gb.Krb5LucidContextV1)) + self.assertEqual(acceptor_info.version, 1) + self.assertFalse(acceptor_info.is_initiator) + self.assertTrue(isinstance(acceptor_info.endtime, int)) + self.assertTrue(isinstance(acceptor_info.send_seq, int)) + self.assertTrue(isinstance(acceptor_info.recv_seq, int)) + self.assertEqual(acceptor_info.protocol, 1) + self.assertEqual(acceptor_info.rfc1964_kd, None) + self.assertTrue(isinstance(acceptor_info.cfx_kd, gb.CfxKeyData)) + self.assertTrue(isinstance(acceptor_info.cfx_kd.ctx_key_type, int)) + self.assertTrue(isinstance(acceptor_info.cfx_kd.ctx_key, bytes)) + self.assertTrue(isinstance(acceptor_info.cfx_kd.acceptor_subkey_type, + int)) + self.assertTrue(isinstance(acceptor_info.cfx_kd.acceptor_subkey, + bytes)) + + self.assertEqual(initiator_info.endtime, acceptor_info.endtime) + self.assertEqual(initiator_info.send_seq, acceptor_info.recv_seq) + self.assertEqual(initiator_info.recv_seq, acceptor_info.send_seq) + self.assertEqual(initiator_info.cfx_kd.ctx_key_type, + acceptor_info.cfx_kd.ctx_key_type) + self.assertEqual(initiator_info.cfx_kd.ctx_key, + acceptor_info.cfx_kd.ctx_key) + self.assertEqual(initiator_info.cfx_kd.acceptor_subkey_type, + acceptor_info.cfx_kd.acceptor_subkey_type) + self.assertEqual(initiator_info.cfx_kd.acceptor_subkey, + acceptor_info.cfx_kd.acceptor_subkey) + + @ktu.gssapi_extension_test('krb5', 'Kerberos Extensions') + def test_krb5_extract_authtime_from_sec_context(self): + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + ctx_resp = gb.init_sec_context(target_name) + + client_token1 = ctx_resp[3] + client_ctx = ctx_resp[0] + server_name = gb.import_name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + server_creds = gb.acquire_cred(server_name)[0] + server_resp = gb.accept_sec_context(client_token1, + acceptor_creds=server_creds) + server_ctx = server_resp[0] + server_tok = server_resp[3] + + client_resp2 = gb.init_sec_context(target_name, + context=client_ctx, + input_token=server_tok) + ctx = client_resp2[0] + + client_authtime = gb.krb5_extract_authtime_from_sec_context(ctx) + server_authtime = gb.krb5_extract_authtime_from_sec_context(server_ctx) + + self.assertTrue(isinstance(client_authtime, int)) + self.assertTrue(isinstance(server_authtime, int)) + self.assertEqual(client_authtime, server_authtime) + + @ktu.gssapi_extension_test('krb5', 'Kerberos Extensions') + def test_krb5_extract_authz_data_from_sec_context(self): + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + client_token = gb.init_sec_context(target_name)[3] + + server_name = gb.import_name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + server_creds = gb.acquire_cred(server_name)[0] + server_ctx = gb.accept_sec_context(client_token, + acceptor_creds=server_creds)[0] + + # KRB5_AUTHDATA_IF_RELEVANT = 1 + authz_data = gb.krb5_extract_authz_data_from_sec_context(server_ctx, 1) + self.assertTrue(isinstance(authz_data, bytes)) + + @ktu.gssapi_extension_test('krb5', 'Kerberos Extensions') + def test_krb5_import_cred(self): + # Ensuring we match the krb5 library to the GSSAPI library is a thorny + # problem. Avoid it by requiring test suite users to explicitly + # enable this test. + if not self.KRB5_LIB_PATH: + self.skipTest("Env var GSSAPI_KRB5_MAIN_LIB not defined") + + creds = gb.Creds() + + # Should fail if only creds are specified + self.assertRaises(ValueError, gb.krb5_import_cred, creds) + + new_ccache = os.path.join(self.realm.tmpdir, 'ccache-new') + new_env = self.realm.env.copy() + new_env['KRB5CCNAME'] = new_ccache + self.realm.kinit(self.realm.user_princ, + password=self.realm.password('user'), + env=new_env) + + krb5 = ctypes.CDLL(self.KRB5_LIB_PATH) + krb5_ctx = ctypes.c_void_p() + krb5.krb5_init_context(ctypes.byref(krb5_ctx)) + try: + ccache_ptr = ctypes.c_void_p() + err = krb5.krb5_cc_resolve(krb5_ctx, new_ccache.encode('utf-8'), + ctypes.byref(ccache_ptr)) + self.assertEqual(err, 0) + + try: + gb.krb5_import_cred(creds, cache=ccache_ptr.value) + + # Creds will be invalid once the cc is closed so do this now + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + client_resp = gb.init_sec_context(target_name, creds=creds) + + finally: + krb5.krb5_cc_close(krb5_ctx, ccache_ptr) + finally: + krb5.krb5_free_context(krb5_ctx) + + client_ctx = client_resp[0] + client_token = client_resp[3] + + server_name = gb.import_name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + server_creds = gb.acquire_cred(server_name)[0] + server_resp = gb.accept_sec_context(client_token, + acceptor_creds=server_creds) + server_ctx = server_resp[0] + server_token = server_resp[3] + + gb.init_sec_context(target_name, context=client_ctx, + input_token=server_token) + initiator = gb.inquire_context(server_ctx, + initiator_name=True).initiator_name + initiator_name = gb.display_name(initiator, name_type=False).name + + self.assertEqual(initiator_name, self.realm.user_princ.encode('utf-8')) + + @ktu.gssapi_extension_test('krb5', 'Kerberos Extensions') + def test_krb5_get_tkt_flags(self): + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + ctx_resp = gb.init_sec_context(target_name) + + client_token1 = ctx_resp[3] + client_ctx = ctx_resp[0] + server_name = gb.import_name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + server_creds = gb.acquire_cred(server_name)[0] + server_resp = gb.accept_sec_context(client_token1, + acceptor_creds=server_creds) + server_ctx = server_resp[0] + server_tok = server_resp[3] + + client_resp2 = gb.init_sec_context(target_name, + context=client_ctx, + input_token=server_tok) + client_ctx = client_resp2[0] + + client_flags = gb.krb5_get_tkt_flags(client_ctx) + server_flags = gb.krb5_get_tkt_flags(server_ctx) + self.assertTrue(isinstance(client_flags, int)) + self.assertTrue(isinstance(server_flags, int)) + self.assertEqual(client_flags, server_flags) + + @ktu.gssapi_extension_test('krb5', 'Kerberos Extensions') + def test_krb5_set_allowable_enctypes(self): + krb5_mech = gb.OID.from_int_seq("1.2.840.113554.1.2.2") + AES_128 = 0x11 + AES_256 = 0x12 + + new_ccache = os.path.join(self.realm.tmpdir, 'ccache-new') + new_env = self.realm.env.copy() + new_env['KRB5CCNAME'] = new_ccache + self.realm.kinit(self.realm.user_princ, + password=self.realm.password('user'), + env=new_env) + + gb.krb5_ccache_name(new_ccache.encode('utf-8')) + try: + creds = gb.acquire_cred(usage='initiate', + mechs=[krb5_mech]).creds + finally: + gb.krb5_ccache_name(None) + + gb.krb5_set_allowable_enctypes(creds, [AES_128]) + + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + server_name = gb.import_name(SERVICE_PRINCIPAL, + gb.NameType.kerberos_principal) + server_creds = gb.acquire_cred(server_name, usage='accept', + mechs=[krb5_mech])[0] + + # Will fail because the client only offers AES128 + ctx_resp = gb.init_sec_context(target_name, creds=creds) + client_token1 = ctx_resp[3] + client_ctx = ctx_resp[0] + gb.krb5_set_allowable_enctypes(server_creds, [AES_256]) + self.assertRaises(gb.GSSError, gb.accept_sec_context, client_token1, + acceptor_creds=server_creds) + + gb.krb5_set_allowable_enctypes(server_creds, [AES_128, AES_256]) + ctx_resp = gb.init_sec_context(target_name, creds=creds) + client_token1 = ctx_resp[3] + client_ctx = ctx_resp[0] + + server_resp = gb.accept_sec_context(client_token1, + acceptor_creds=server_creds) + server_ctx = server_resp[0] + server_tok = server_resp[3] + + client_resp2 = gb.init_sec_context(target_name, + context=client_ctx, + input_token=server_tok) + ctx = client_resp2[0] + + initiator_info = gb.krb5_export_lucid_sec_context(ctx, 1) + acceptor_info = gb.krb5_export_lucid_sec_context(server_ctx, 1) + self.assertEqual(AES_128, initiator_info.cfx_kd.ctx_key_type) + self.assertEqual(initiator_info.cfx_kd.ctx_key_type, + initiator_info.cfx_kd.acceptor_subkey_type) + self.assertEqual(acceptor_info.cfx_kd.ctx_key_type, + acceptor_info.cfx_kd.acceptor_subkey_type) + class TestIntEnumFlagSet(unittest.TestCase): def test_create_from_int(self): diff --git a/setup.py b/setup.py index 1d7486a0..b36b9ca4 100755 --- a/setup.py +++ b/setup.py @@ -397,6 +397,8 @@ def gssapi_modules(lst): # see ext_password{,_add}.pyx for more information on this split extension_file('password', 'gss_acquire_cred_with_password'), extension_file('password_add', 'gss_add_cred_with_password'), + + extension_file('krb5', 'gss_krb5_ccache_name'), ]), keywords=['gssapi', 'security'], install_requires=install_requires