diff --git a/docs/source/basic-tutorial.md b/docs/source/basic-tutorial.md index 2e88308a..2922beea 100644 --- a/docs/source/basic-tutorial.md +++ b/docs/source/basic-tutorial.md @@ -21,9 +21,10 @@ for later: >>> Note that this assumes you have a KRB5 realm set up, and some relevant -functions available in the `REALM` object (see gssapi-console.py, or +functions available in the `REALM` object (see gssapi-console.py in +[gssapi_console](https://pypi.python.org/pypi/gssapi_console)), or try `$ run-lit -e gssapi basic-tutorial.md` when you have both -gssapi-console and yalpt installed). Any actions performed using the +gssapi_console and yalpt installed). Any actions performed using the `REALM` object are not part of the GSSAPI library; the `REALM` object simply contians wrappers to krb5 commands generally run separately from the application using GSSAPI. @@ -205,9 +206,9 @@ since the context was set up to use encryption (the default): Traceback (most recent call last): File "", line 1, in File "", line 2, in decrypt - File "/home/directxman12/dev/gssapi/gssapi-console/.venv/lib/python3.4/site-packages/gssapi/_utils.py", line 167, in check_last_err + File "/usr/lib/python3.4/site-packages/gssapi/_utils.py", line 167, in check_last_err return func(self, *args, **kwargs) - File "/home/directxman12/dev/gssapi/gssapi-console/.venv/lib/python3.4/site-packages/gssapi/sec_contexts.py", line 295, in decrypt + File "/usr/lib/python3.4/site-packages/gssapi/sec_contexts.py", line 295, in decrypt unwrapped_message=res.message) gssapi.exceptions.EncryptionNotUsed: Confidentiality was requested, but not used: The context was established with encryption, but unwrapped message was not encrypted. >>> diff --git a/gssapi-console.py b/gssapi-console.py deleted file mode 100755 index d6bc4cc8..00000000 --- a/gssapi-console.py +++ /dev/null @@ -1,142 +0,0 @@ -#!/usr/bin/env python - -# interactive console with easy access to krb5 test harness stuff - -import code -import os -import sys -import copy - -from gssapi.tests import k5test - - -READLINE_SRC = """ -# python startup file -import readline -import rlcompleter -import atexit -import os - -completer = rlcompleter.Completer(globals()) -readline.set_completer(completer.complete) - -# tab completion -readline.parse_and_bind('Control-space: complete') -# history file -histfile = os.path.join(os.environ['HOME'], '.pythonhistory') -try: - readline.read_history_file(histfile) -except IOError: - pass - -atexit.register(readline.write_history_file, histfile) -del os, histfile, readline, rlcompleter -""" - -BANNER = """GSSAPI Interactive console -Python {ver} on {platform} -Type "help", "copyright", "credits" or "license" for more information about Python. - -mechansim: {mech}, realm: {realm}, user: {user}, host: {host} -(functions for controlling the realm available in `REALM`)""" - - -class GSSAPIConsole(code.InteractiveConsole): - def __init__(self, use_readline=True, realm_args={}, *args, **kwargs): - code.InteractiveConsole.__init__(self, *args, **kwargs) - - self.realm = self._create_realm(realm_args) - self.locals['REALM'] = self.realm - - self.runsource('import gssapi') - self.runsource('import gssapi.raw as gb') - - if use_readline: - self._add_readline() - - if os.environ.get('LD_LIBRARY_PATH'): - self.realm.env['LD_LIBRARY_PATH'] = os.environ['LD_LIBRARY_PATH'] - - def _create_realm(self): - return None - - def _add_readline(self): - res = self.runsource(READLINE_SRC, '', 'exec') - - -class Krb5Console(GSSAPIConsole): - def _create_realm(self, realm_args): - return k5test.K5Realm(**realm_args) - - -MECH_MAP = {'krb5': Krb5Console} - - -import argparse -parser = argparse.ArgumentParser(description='An interactive Python console with krb5 setup') -parser.add_argument('file', metavar='FILE', nargs='?', - help='a file to run', default=None) -parser.add_argument('-i', default=False, dest='force_interactive', - action='store_const', const=True, - help='Force interactive mode when running with a file') -parser.add_argument('--realm-args', default=None, - help='A comma-separated list of key=(true|false) values to ' - 'pass to the realm constructor') -parser.add_argument('--mech', default='krb5', - help='Which environment to setup up ' - '(supports krb5 [default])') - -PARSED_ARGS = parser.parse_args() - - -if PARSED_ARGS.mech not in MECH_MAP: - sys.exit('The %s environment is not supported by the ' - 'GSSAPI console' % PARSED_ARGS.mech) - - -realm_args = {} -if PARSED_ARGS.realm_args: - for arg in PARSED_ARGS.realm_args.split(','): - key, raw_val = arg.split('=') - realm_args[key] = (raw_val.lower() == 'true') - -console = MECH_MAP[PARSED_ARGS.mech](realm_args=realm_args) -SAVED_ENV = None - -try: - # create the banner - banner_text = BANNER.format(ver=sys.version, platform=sys.platform, - mech=PARSED_ARGS.mech, - realm=console.realm.realm, - user=console.realm.user_princ, - host=console.realm.host_princ) - - # import the env - SAVED_ENV = copy.deepcopy(os.environ) - for k,v in console.realm.env.items(): - os.environ[k] = v - - INTER = True - # run the interactive interpreter - if PARSED_ARGS.file is not None: - if not PARSED_ARGS.force_interactive: - INTER = False - - with open(PARSED_ARGS.file) as src: - console.runsource(src.read(), src.name, 'exec') - - if INTER: - console.interact(banner_text) - -except (KeyboardInterrupt, EOFError): - pass -finally: - # restore the env - if SAVED_ENV is not None: - for k in copy.deepcopy(os.environ): - if k in SAVED_ENV: - os.environ[k] = SAVED_ENV[k] - else: - del os.environ[k] - - console.realm.stop() diff --git a/gssapi/tests/_utils.py b/gssapi/tests/_utils.py deleted file mode 100644 index 603be560..00000000 --- a/gssapi/tests/_utils.py +++ /dev/null @@ -1,140 +0,0 @@ -import os -import subprocess - -from gssapi._utils import import_gssapi_extension - - -def get_output(*args, **kwargs): - res = subprocess.check_output(*args, shell=True, **kwargs) - decoded = res.decode('utf-8') - return decoded.strip() - - -def _extension_test(extension_name, extension_text): - def make_ext_test(func): - def ext_test(self, *args, **kwargs): - if import_gssapi_extension(extension_name) is None: - self.skipTest("The %s GSSAPI extension is not supported by " - "your GSSAPI implementation" % extension_text) - else: - func(self, *args, **kwargs) - - return ext_test - - return make_ext_test - - -_KRB_VERSION = None - - -def _minversion_test(target_version, problem): - global _KRB_VERSION - if _KRB_VERSION is None: - _KRB_VERSION = get_output("krb5-config --version") - _KRB_VERSION = _KRB_VERSION.split(' ')[-1].split('.') - - def make_ext_test(func): - def ext_test(self, *args, **kwargs): - if _KRB_VERSION < target_version.split('.'): - self.skipTest("Your GSSAPI (version %s) is known to have " - "problems with %s" % (_KRB_VERSION, problem)) - else: - func(self, *args, **kwargs) - return ext_test - - return make_ext_test - - -_PLUGIN_DIR = None - - -def _find_plugin_dir(): - global _PLUGIN_DIR - if _PLUGIN_DIR is not None: - return _PLUGIN_DIR - - # if we've set a LD_LIBRARY_PATH, use that first - ld_path_raw = os.environ.get('LD_LIBRARY_PATH') - if ld_path_raw is not None: - # first, try assuming it's just a normal install - - ld_paths = [path for path in ld_path_raw.split(':') if path] - - for ld_path in ld_paths: - if not os.path.exists(ld_path): - continue - - _PLUGIN_DIR = _decide_plugin_dir( - _find_plugin_dirs_installed(ld_path)) - if _PLUGIN_DIR is None: - _PLUGIN_DIR = _decide_plugin_dir( - _find_plugin_dirs_src(ld_path)) - - if _PLUGIN_DIR is not None: - break - - # if there was no LD_LIBRARY_PATH, or the above failed - if _PLUGIN_DIR is None: - # if we don't have a LD_LIBRARY_PATH, just search in - # $prefix/lib - - lib_dir = os.path.join(get_output('krb5-config --prefix'), 'lib') - _PLUGIN_DIR = _decide_plugin_dir(_find_plugin_dirs_installed(lib_dir)) - - if _PLUGIN_DIR is not None: - _PLUGIN_DIR = os.path.normpath(_PLUGIN_DIR) - return _PLUGIN_DIR - else: - return None - - -def _decide_plugin_dir(dirs): - if dirs is None: - return None - - # the shortest path is probably more correct - shortest_first = sorted(dirs, key=len) - - for path in shortest_first: - # check to see if it actually contains .so files - if get_output('find %s -name "*.so"' % path): - return path - - return None - - -def _find_plugin_dirs_installed(search_path): - options_raw = get_output('find %s/ -type d ' - '-path "*/krb5/plugins"' % search_path) - - if options_raw: - return options_raw.split('\n') - else: - return None - - -def _find_plugin_dirs_src(search_path): - options_raw = get_output('find %s/../ -type d -name plugins' % search_path) - - if options_raw: - return options_raw.split('\n') - else: - return None - - -def _requires_krb_plugin(plugin_type, plugin_name): - plugin_path = os.path.join(_find_plugin_dir(), - plugin_type, '%s.so' % plugin_name) - - def make_krb_plugin_test(func): - def krb_plugin_test(self, *args, **kwargs): - if not os.path.exists(plugin_path): - self.skipTest("You do not have the GSSAPI {type}" - "plugin {name} installed".format( - type=plugin_type, name=plugin_name)) - else: - func(self, *args, **kwargs) - - return krb_plugin_test - - return make_krb_plugin_test diff --git a/gssapi/tests/k5test.py b/gssapi/tests/k5test.py deleted file mode 100644 index 7edd6b08..00000000 --- a/gssapi/tests/k5test.py +++ /dev/null @@ -1,447 +0,0 @@ -# Copyright (C) 2014 by Solly Ross -# Copyright (C) 2010 by the Massachusetts Institute of Technology. -# All rights reserved. - -# Export of this software from the United States of America may -# require a specific license from the United States Government. -# It is the responsibility of any person or organization contemplating -# export to obtain such a license before exporting. -# -# WITHIN THAT CONSTRAINT, permission to use, copy, modify, and -# distribute this software and its documentation for any purpose and -# without fee is hereby granted, provided that the above copyright -# notice appear in all copies and that both that copyright notice and -# this permission notice appear in supporting documentation, and that -# the name of M.I.T. not be used in advertising or publicity pertaining -# to distribution of the software without specific, written prior -# permission. Furthermore if you modify this software you must label -# your software as modified software and not distribute it in such a -# fashion that it might be confused with the original M.I.T. software. -# M.I.T. makes no representations about the suitability of -# this software for any purpose. It is provided "as is" without express -# or implied warranty. - -# Changes from original: modified to work with Python's unittest -from __future__ import print_function - -import copy -import os -import shutil -import signal -import socket -import string -import sys -import subprocess -import tempfile -import unittest - -import six - -from gssapi.tests import _utils - - -def _cfg_merge(cfg1, cfg2): - if not cfg2: - return cfg1 - if not cfg1: - return cfg2 - result = copy.deepcopy(cfg1) - for key, value2 in cfg2.items(): - if value2 is None or key not in result: - result[key] = copy.deepcopy(value2) - else: - value1 = result[key] - if isinstance(value1, dict): - if not isinstance(value2, dict): - raise TypeError("value at key '{key}' not dict: " - "{type}".format(key=key, - type=type(value2))) - result[key] = _cfg_merge(value1, value2) - else: - result[key] = copy.deepcopy(value2) - return result - - -_default_krb5_conf = { - 'libdefaults': { - 'default_realm': '$realm', - 'dns_lookup_kdc': 'false'}, - 'realms': { - '$realm': { - 'kdc': '$hostname:$port0', - 'admin_server': '$hostname:$port1', - 'kpasswd_server': '$hostname:$port2'}}} - - -_default_kdc_conf = { - 'realms': { - '$realm': { - 'database_module': 'db', - 'iprop_port': '$port4', - 'key_stash_file': '$tmpdir/stash', - 'acl_file': '$tmpdir/acl', - 'dictfile': '$tmpdir/dictfile', - 'kadmind_port': '$port1', - 'kpasswd_port': '$port2', - 'kdc_ports': '$port0', - 'kdc_tcp_ports': '$port0', - 'database_name': '$tmpdir/db'}}, - 'dbmodules': { - 'db_module_dir': os.path.join(_utils._find_plugin_dir(), - 'kdb'), - 'db': {'db_library': 'db2', 'database_name': '$tmpdir/db'}}, - 'logging': { - 'admin_server': 'FILE:$tmpdir/kadmind5.log', - 'kdc': 'FILE:$tmpdir/kdc.log', - 'default': 'FILE:$tmpdir/others.log'}} - - -class K5Realm(object): - """An object representing a functional krb5 test realm.""" - def __init__(self, realm='KRBTEST.COM', portbase=61000, - krb5_conf=None, kdc_conf=None, create_kdb=True, - krbtgt_keysalt=None, create_user=True, get_creds=True, - create_host=True, start_kdc=True, start_kadmind=False, - **paths): - - self.tmpdir = tempfile.mkdtemp(suffix='-krbtest') - - self.realm = realm - self.portbase = portbase - self.user_princ = 'user@' + self.realm - self.admin_princ = 'user/admin@' + self.realm - self.host_princ = 'host/%s@%s' % (self.hostname, self.realm) - self.nfs_princ = 'nfs/%s@%s' % (self.hostname, self.realm) - self.krbtgt_princ = 'krbtgt/%s@%s' % (self.realm, self.realm) - self.keytab = os.path.join(self.tmpdir, 'keytab') - self.client_keytab = os.path.join(self.tmpdir, 'client_keytab') - self.ccache = os.path.join(self.tmpdir, 'ccache') - self.kadmin_ccache = os.path.join(self.tmpdir, 'kadmin_ccache') - self._krb5_conf = _cfg_merge(_default_krb5_conf, krb5_conf) - self._kdc_conf = _cfg_merge(_default_kdc_conf, kdc_conf) - self._kdc_proc = None - self._kadmind_proc = None - krb5_conf_path = os.path.join(self.tmpdir, 'krb5.conf') - kdc_conf_path = os.path.join(self.tmpdir, 'kdc.conf') - self.env = self._make_env(krb5_conf_path, kdc_conf_path) - - self._daemons = [] - - self._init_paths(**paths) - - self._devnull = open(os.devnull, 'r') - - self._create_conf(self._krb5_conf, krb5_conf_path) - self._create_conf(self._kdc_conf, kdc_conf_path) - self._create_acl() - self._create_dictfile() - - if create_kdb: - self.create_kdb() - if krbtgt_keysalt and create_kdb: - self.run_kadminl('cpw -randkey -e %s %s' % - (krbtgt_keysalt, self.krbtgt_princ)) - if create_user and create_kdb: - self.addprinc(self.user_princ, self.password('user')) - self.addprinc(self.admin_princ, self.password('admin')) - if create_host and create_kdb: - self.addprinc(self.host_princ) - self.extract_keytab(self.host_princ, self.keytab) - if start_kdc and create_kdb: - self.start_kdc() - if start_kadmind and create_kdb: - self.start_kadmind() - if get_creds and create_kdb and create_user and start_kdc: - self.kinit(self.user_princ, self.password('user')) - self.klist() - - def _discover_path(self, name, default, paths): - stderr_out = getattr(subprocess, 'DEVNULL', subprocess.PIPE) - try: - path = subprocess.check_output(['which', name], - stderr=stderr_out).strip() - path = path.decode(sys.getfilesystemencoding() or - sys.getdefaultencoding()) - print("Using discovered path for {name} ({path}".format( - name=name, path=path)) - return path - except subprocess.CalledProcessError as e: - path = paths.get(name, default) - print("Using default path for {name} ({path}): {err}".format( - name=name, path=path, err=e)) - return path - - def _init_paths(self, **paths): - self.kdb5_util = self._discover_path('kdb5_util', - '/usr/sbin/kdb5_util', paths) - self.krb5kdc = self._discover_path('krb5kdc', - '/usr/sbin/krb5kdc', paths) - self.kadmin_local = self._discover_path('kadmin_local', - '/usr/sbin/kadmin.local', - paths) - self.kprop = self._discover_path('kprop', '/usr/sbin/kprop', paths) - self.kadmind = self._discover_path('kadmind', - '/usr/sbin/kadmind', paths) - - self._kinit = self._discover_path('kinit', '/usr/bin/kinit', paths) - self._klist = self._discover_path('klist', '/usr/bin/klist', paths) - - def _create_conf(self, profile, filename): - with open(filename, 'w') as conf_file: - for section, contents in profile.items(): - conf_file.write('[%s]\n' % section) - self._write_cfg_section(conf_file, contents, 1) - - def _write_cfg_section(self, conf_file, contents, indent_level): - indent = '\t' * indent_level - for name, value in contents.items(): - name = self._subst_cfg_value(name) - if isinstance(value, dict): - # A dictionary value yields a list subsection. - conf_file.write('%s%s = {\n' % (indent, name)) - self._write_cfg_section(conf_file, value, indent_level + 1) - conf_file.write('%s}\n' % indent) - elif isinstance(value, list): - # A list value yields multiple values for the same name. - for item in value: - item = self._subst_cfg_value(item) - conf_file.write('%s%s = %s\n' % (indent, name, item)) - elif isinstance(value, six.string_types): - # A string value yields a straightforward variable setting. - value = self._subst_cfg_value(value) - conf_file.write('%s%s = %s\n' % (indent, name, value)) - elif value is not None: - raise TypeError("Unknown config type at key '{key}': " - "{type}".format(key=name, type=type(value))) - - @property - def hostname(self): - return socket.getfqdn() - - def _subst_cfg_value(self, value): - template = string.Template(value) - return template.substitute(realm=self.realm, - tmpdir=self.tmpdir, - hostname=self.hostname, - port0=self.portbase, - port1=self.portbase + 1, - port2=self.portbase + 2, - port3=self.portbase + 3, - port4=self.portbase + 4, - port5=self.portbase + 5, - port6=self.portbase + 6, - port7=self.portbase + 7, - port8=self.portbase + 8, - port9=self.portbase + 9) - - def _create_acl(self): - filename = os.path.join(self.tmpdir, 'acl') - with open(filename, 'w') as acl_file: - acl_file.write('%s *\n' % self.admin_princ) - acl_file.write('kiprop/%s@%s p\n' % (self.hostname, self.realm)) - - def _create_dictfile(self): - filename = os.path.join(self.tmpdir, 'dictfile') - with open(filename, 'w') as dict_file: - dict_file.write('weak_password\n') - - def _make_env(self, krb5_conf_path, kdc_conf_path): - env = {} - env['KRB5_CONFIG'] = krb5_conf_path - env['KRB5_KDC_PROFILE'] = kdc_conf_path or os.devnull - env['KRB5CCNAME'] = self.ccache - env['KRB5_KTNAME'] = self.keytab - env['KRB5_CLIENT_KTNAME'] = self.client_keytab - env['KRB5RCACHEDIR'] = self.tmpdir - env['KPROPD_PORT'] = six.text_type(self.kprop_port()) - env['KPROP_PORT'] = six.text_type(self.kprop_port()) - return env - - def run(self, args, env=None, input=None, expected_code=0): - if env is None: - env = self.env - - if input: - infile = subprocess.PIPE - else: - infile = self._devnull - - proc = subprocess.Popen(args, stdin=infile, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, env=env) - if input: - inbytes = input.encode() - else: - inbytes = None - (outdata, blank_errdata) = proc.communicate(inbytes) - code = proc.returncode - cmd = ' '.join(args) - outstr = outdata.decode() - print('[OUTPUT FROM `{args}`]\n{output}\n'.format(args=cmd, - output=outstr)) - if code != expected_code: - raise Exception("Unexpected return code " - "for command `{args}`: {code}".format(args=cmd, - code=code)) - - return outdata - - def __del__(self): - self._devnull.close() - - def kprop_port(self): - return self.portbase + 3 - - def server_port(self): - return self.portbase + 5 - - def create_kdb(self): - self.run([self.kdb5_util, 'create', '-W', '-s', '-P', 'master']) - - def _start_daemon(self, args, env, sentinel): - proc = subprocess.Popen(args, stdin=self._devnull, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, env=env) - cmd = ' '.join(args) - while True: - line = proc.stdout.readline().decode() - if line == "": - code = proc.wait() - raise Exception('`{args}` failed to start ' - 'with code {code}'.format(args=cmd, - code=code)) - else: - print('[OUTPUT FROM `{args}`]\n' - '{output}\n'.format(args=cmd, output=line)) - - if sentinel in line: - break - - self._daemons.append(proc) - - return proc - - def start_kdc(self, args=[], env=None): - if env is None: - env = self.env - assert(self._kdc_proc is None) - self._kdc_proc = self._start_daemon([self.krb5kdc, '-n'] + args, env, - 'starting...') - - def _stop_daemon(self, proc): - os.kill(proc.pid, signal.SIGTERM) - proc.wait() - self._daemons.remove(proc) - - def stop_kdc(self): - assert(self._kdc_proc is not None) - self._stop_daemon(self._kdc_proc) - self._kdc_proc = None - - def start_kadmind(self, env=None): - if env is None: - env = self.env - assert(self._kadmind_proc is None) - dump_path = os.path.join(self.tmpdir, 'dump') - self._kadmind_proc = self._start_daemon([self.kadmind, '-nofork', '-W', - '-p', self.kdb5_util, - '-K', self.kprop, - '-F', dump_path], env, - 'starting...') - - def stop_kadmind(self): - assert(self._kadmind_proc is not None) - self.stop_daemon(self._kadmind_proc) - self._kadmind_proc = None - - def stop(self): - if self._kdc_proc: - self.stop_kdc() - if self._kadmind_proc: - self.stop_kadmind() - - if self.tmpdir: - shutil.rmtree(self.tmpdir) - - def addprinc(self, princname, password=None): - if password: - self.run_kadminl('addprinc -pw %s %s' % (password, princname)) - else: - self.run_kadminl('addprinc -randkey %s' % princname) - - def extract_keytab(self, princname, keytab): - self.run_kadminl('ktadd -k %s -norandkey %s' % (keytab, princname)) - - def kinit(self, princname, password=None, flags=[], verbose=True, - **keywords): - if password: - input = password + "\n" - else: - input = None - - cmd = [self._kinit] - if verbose: - cmd.append('-V') - cmd.extend(flags) - cmd.append(princname) - return self.run(cmd, input=input, **keywords) - - def klist(self, ccache=None, **keywords): - if ccache is None: - ccache = self.ccache - ccachestr = ccache - if len(ccachestr) < 2 or ':' not in ccachestr[2:]: - ccachestr = 'FILE:' + ccachestr - return self.run([self._klist, ccache], **keywords) - - def klist_keytab(self, keytab=None, **keywords): - if keytab is None: - keytab = self.keytab - output = self.run([self._klist, '-k', keytab], **keywords) - return output - - def run_kadminl(self, query, env=None): - return self.run([self.kadmin_local, '-q', query], env=env) - - def password(self, name): - """Get a weakly random password from name, consistent across calls.""" - return name + six.text_type(os.getpid()) - - def prep_kadmin(self, princname=None, pw=None, flags=[]): - if princname is None: - princname = self.admin_princ - pw = self.password('admin') - return self.kinit(princname, pw, - flags=['-S', 'kadmin/admin', - '-c', self.kadmin_ccache] + flags) - - def run_kadmin(self, query, **keywords): - return self.run([self.kadmin, '-c', self.kadmin_ccache, '-q', query], - **keywords) - - def special_env(self, name, has_kdc_conf, krb5_conf=None, kdc_conf=None): - krb5_conf_path = os.path.join(self.tmpdir, 'krb5.conf.%s' % name) - krb5_conf = _cfg_merge(self._krb5_conf, krb5_conf) - self._create_conf(krb5_conf, krb5_conf_path) - if has_kdc_conf: - kdc_conf_path = os.path.join(self.tmpdir, 'kdc.conf.%s' % name) - kdc_conf = _cfg_merge(self._kdc_conf, kdc_conf) - self._create_conf(kdc_conf, kdc_conf_path) - else: - kdc_conf_path = None - return self._make_env(krb5_conf_path, kdc_conf_path) - - def kill_daemons(self): - # clean up daemons - for proc in self._daemons: - os.kill(proc.pid, signal.SIGTERM) - - -class KerberosTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.realm = K5Realm() - - @classmethod - def tearDownClass(cls): - cls.realm.stop() - del cls.realm diff --git a/gssapi/tests/test_high_level.py b/gssapi/tests/test_high_level.py index 404ed43e..0d645124 100644 --- a/gssapi/tests/test_high_level.py +++ b/gssapi/tests/test_high_level.py @@ -14,9 +14,8 @@ from gssapi import raw as gb from gssapi import _utils as gssutils from gssapi import exceptions as excs -from gssapi.tests._utils import _extension_test, _minversion_test -from gssapi.tests._utils import _requires_krb_plugin -from gssapi.tests import k5test as kt +import k5test.unit as ktu +import k5test as kt TARGET_SERVICE_NAME = b'host' @@ -151,7 +150,7 @@ def test_acquire_by_method(self, str_name, kwargs): del creds - @_extension_test('rfc5588', 'RFC 5588') + @ktu.gssapi_extension_test('rfc5588', 'RFC 5588') def test_store_acquire(self): # we need to acquire a forwardable ticket svc_princ = SERVICE_PRINCIPAL.decode("UTF-8") @@ -182,7 +181,7 @@ def test_store_acquire(self): usage='initiate') reacquired_creds.shouldnt_be_none() - @_extension_test('cred_store', 'credentials store') + @ktu.gssapi_extension_test('cred_store', 'credentials store') def test_store_into_acquire_from(self): CCACHE = 'FILE:{tmpdir}/other_ccache'.format(tmpdir=self.realm.tmpdir) KT = '{tmpdir}/other_keytab'.format(tmpdir=self.realm.tmpdir) @@ -273,7 +272,7 @@ def test_add(self): new_creds.shouldnt_be_none() new_creds.should_be_a(gsscreds.Credentials) - @_extension_test('cred_store', 'credentials store') + @ktu.gssapi_extension_test('cred_store', 'credentials store') def test_store_into_add_from(self): CCACHE = 'FILE:{tmpdir}/other_ccache'.format(tmpdir=self.realm.tmpdir) KT = '{tmpdir}/other_keytab'.format(tmpdir=self.realm.tmpdir) @@ -301,13 +300,13 @@ def test_store_into_add_from(self): retrieved_creds.shouldnt_be_none() retrieved_creds.should_be_a(gsscreds.Credentials) - @_extension_test('cred_imp_exp', 'credentials import-export') + @ktu.gssapi_extension_test('cred_imp_exp', 'credentials import-export') def test_export(self): creds = gsscreds.Credentials(name=self.name) token = creds.export() token.should_be_a(bytes) - @_extension_test('cred_imp_exp', 'credentials import-export') + @ktu.gssapi_extension_test('cred_imp_exp', 'credentials import-export') def test_import_by_init(self): creds = gsscreds.Credentials(name=self.name) token = creds.export() @@ -316,7 +315,7 @@ def test_import_by_init(self): imported_creds.lifetime.should_be(creds.lifetime) imported_creds.name.should_be(creds.name) - @_extension_test('cred_imp_exp', 'credentials import-export') + @ktu.gssapi_extension_test('cred_imp_exp', 'credentials import-export') def test_pickle_unpickle(self): creds = gsscreds.Credentials(name=self.name) pickled_creds = pickle.dumps(creds) @@ -327,7 +326,7 @@ def test_pickle_unpickle(self): @exist_perms(lifetime=30, mechs=[gb.MechType.kerberos], usage='initiate') - @_extension_test('s4u', 'S4U') + @ktu.gssapi_extension_test('s4u', 'S4U') def test_impersonate(self, str_name, kwargs): target_name = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -347,7 +346,7 @@ def test_impersonate(self, str_name, kwargs): imp_creds.shouldnt_be_none() imp_creds.should_be_a(gsscreds.Credentials) - @_extension_test('s4u', 'S4U') + @ktu.gssapi_extension_test('s4u', 'S4U') def test_add_with_impersonate(self): target_name = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -396,7 +395,7 @@ def test_create_from_token(self): name2.shouldnt_be_none() name2.name_type.should_be(gb.NameType.kerberos_principal) - @_extension_test('rfc6680', 'RFC 6680') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') def test_display_as(self): name = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -412,7 +411,7 @@ def test_display_as(self): krb_name.should_be_a(six.text_type) krb_name.should_be(princ_str) - @_extension_test('rfc6680', 'RFC 6680') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') def test_create_from_composite_token_no_attrs(self): name1 = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -422,8 +421,8 @@ def test_create_from_composite_token_no_attrs(self): name2.shouldnt_be_none() - @_extension_test('rfc6680', 'RFC 6680') - @_requires_krb_plugin('authdata', 'greet_client') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') + @ktu.krb_plugin_test('authdata', 'greet_client') def test_create_from_composite_token_with_attrs(self): name1 = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -514,7 +513,7 @@ def test_copy(self): # NB(directxman12): we don't test display_name_ext because the krb5 mech # doesn't actually implement it - @_extension_test('rfc6680', 'RFC 6680') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') def test_is_mech_name(self): name = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -527,7 +526,7 @@ def test_is_mech_name(self): canon_name.mech.should_be_a(gb.OID) canon_name.mech.should_be(gb.MechType.kerberos) - @_extension_test('rfc6680', 'RFC 6680') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') def test_export_name_composite_no_attrs(self): name = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -536,8 +535,8 @@ def test_export_name_composite_no_attrs(self): exported_name.should_be_a(bytes) - @_extension_test('rfc6680', 'RFC 6680') - @_requires_krb_plugin('authdata', 'greet_client') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') + @ktu.krb_plugin_test('authdata', 'greet_client') def test_export_name_composite_with_attrs(self): name = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -547,8 +546,8 @@ def test_export_name_composite_with_attrs(self): exported_name.should_be_a(bytes) - @_extension_test('rfc6680', 'RFC 6680') - @_requires_krb_plugin('authdata', 'greet_client') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') + @ktu.krb_plugin_test('authdata', 'greet_client') def test_basic_get_set_del_name_attribute_no_auth(self): name = gssnames.Name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -785,7 +784,7 @@ def test_verify_signature_raise(self): server_ctx.verify_signature.should_raise(gb.GSSError, b'other message', mic_token) - @_minversion_test("1.11", "returning tokens") + @ktu.krb_minversion_test("1.11", "returning tokens") def test_defer_step_error_on_method(self): gssctx.SecurityContext.__DEFER_STEP_ERRORS__ = True bdgs = gb.ChannelBindings(application_data=b'abcxyz') @@ -801,7 +800,7 @@ def test_defer_step_error_on_method(self): server_ctx.step(client_token).should_be_a(bytes) server_ctx.encrypt.should_raise(gb.BadChannelBindingsError, b'test') - @_minversion_test("1.11", "returning tokens") + @ktu.krb_minversion_test("1.11", "returning tokens") def test_defer_step_error_on_complete_property_access(self): gssctx.SecurityContext.__DEFER_STEP_ERRORS__ = True bdgs = gb.ChannelBindings(application_data=b'abcxyz') diff --git a/gssapi/tests/test_raw.py b/gssapi/tests/test_raw.py index 37f6372e..275994e8 100644 --- a/gssapi/tests/test_raw.py +++ b/gssapi/tests/test_raw.py @@ -8,9 +8,8 @@ import gssapi.raw as gb import gssapi.raw.misc as gbmisc -from gssapi.tests._utils import _extension_test, _minversion_test -from gssapi.tests._utils import _requires_krb_plugin -from gssapi.tests import k5test as kt +import k5test.unit as ktu +import k5test as kt TARGET_SERVICE_NAME = b'host' @@ -116,7 +115,7 @@ def test_display_name(self): # NB(directxman12): we don't test display_name_ext because the krb5 mech # doesn't actually implement it - @_extension_test('rfc6680', 'RFC 6680') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') def test_inquire_name_not_mech_name(self): base_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -127,7 +126,7 @@ def test_inquire_name_not_mech_name(self): inquire_res.is_mech_name.should_be_false() inquire_res.mech.should_be_none() - @_extension_test('rfc6680', 'RFC 6680') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') def test_inquire_name_mech_name(self): base_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -140,8 +139,9 @@ def test_inquire_name_mech_name(self): inquire_res.mech.should_be_a(gb.OID) inquire_res.mech.should_be(gb.MechType.kerberos) - @_extension_test('rfc6680', 'RFC 6680') - @_extension_test('rfc6680_comp_oid', 'RFC 6680 (COMPOSITE_EXPORT OID)') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') + @ktu.gssapi_extension_test('rfc6680_comp_oid', + 'RFC 6680 (COMPOSITE_EXPORT OID)') def test_import_export_name_composite_no_attrs(self): base_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -159,8 +159,8 @@ def test_import_export_name_composite_no_attrs(self): # NB(directxman12): the greet_client plugin only allows for one value - @_extension_test('rfc6680', 'RFC 6680') - @_requires_krb_plugin('authdata', 'greet_client') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') + @ktu.krb_plugin_test('authdata', 'greet_client') def test_inquire_name_with_attrs(self): base_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -174,8 +174,8 @@ def test_inquire_name_with_attrs(self): inquire_res.attrs.should_be_a(list) inquire_res.attrs.should_be([b'urn:greet:greeting']) - @_extension_test('rfc6680', 'RFC 6680') - @_requires_krb_plugin('authdata', 'greet_client') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') + @ktu.krb_plugin_test('authdata', 'greet_client') def test_basic_get_set_delete_name_attributes_no_auth(self): base_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -204,8 +204,8 @@ def test_basic_get_set_delete_name_attributes_no_auth(self): # gb.exceptions.OperationUnavailableError, canon_name, # 'urn:greet:greeting') - @_extension_test('rfc6680', 'RFC 6680') - @_requires_krb_plugin('authdata', 'greet_client') + @ktu.gssapi_extension_test('rfc6680', 'RFC 6680') + @ktu.krb_plugin_test('authdata', 'greet_client') def test_import_export_name_composite(self): base_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -283,7 +283,7 @@ def test_acquire_creds(self): gb.release_name(name) gb.release_cred(creds) - @_extension_test('cred_imp_exp', 'credentials import-export') + @ktu.gssapi_extension_test('cred_imp_exp', 'credentials import-export') def test_cred_import_export(self): creds = gb.acquire_cred(None).creds token = gb.export_cred(creds) @@ -367,7 +367,7 @@ def test_inquire_context(self): # NB(directxman12): We don't test `process_context_token` because # there is no clear non-deprecated way to test it - @_extension_test('s4u', 'S4U') + @ktu.gssapi_extension_test('s4u', 'S4U') def test_add_cred_impersonate_name(self): target_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -399,7 +399,7 @@ def test_add_cred_impersonate_name(self): new_creds.should_be_a(gb.Creds) - @_extension_test('s4u', 'S4U') + @ktu.gssapi_extension_test('s4u', 'S4U') def test_acquire_creds_impersonate_name(self): target_name = gb.import_name(TARGET_SERVICE_NAME, gb.NameType.hostbased_service) @@ -430,8 +430,9 @@ def test_acquire_creds_impersonate_name(self): # no need to explicitly release any more -- we can just rely on # __dealloc__ (b/c cython) - @_extension_test('s4u', 'S4U') - @_minversion_test('1.11', 'returning delegated S4U2Proxy credentials') + @ktu.gssapi_extension_test('s4u', 'S4U') + @ktu.krb_minversion_test('1.11', + 'returning delegated S4U2Proxy credentials') def test_always_get_delegated_creds(self): svc_princ = SERVICE_PRINCIPAL.decode("UTF-8") self.realm.kinit(svc_princ, flags=['-k', '-f']) @@ -451,7 +452,7 @@ def test_always_get_delegated_creds(self): server_ctx_resp.delegated_creds.shouldnt_be_none() server_ctx_resp.delegated_creds.should_be_a(gb.Creds) - @_extension_test('rfc5588', 'RFC 5588') + @ktu.gssapi_extension_test('rfc5588', 'RFC 5588') def test_store_cred_acquire_cred(self): # we need to acquire a forwardable ticket svc_princ = SERVICE_PRINCIPAL.decode("UTF-8") @@ -484,7 +485,7 @@ def test_store_cred_acquire_cred(self): acq_resp = gb.acquire_cred(deleg_name, usage='initiate') acq_resp.shouldnt_be_none() - @_extension_test('cred_store', 'credentials store') + @ktu.gssapi_extension_test('cred_store', 'credentials store') def test_store_cred_into_acquire_cred(self): CCACHE = 'FILE:{tmpdir}/other_ccache'.format(tmpdir=self.realm.tmpdir) KT = '{tmpdir}/other_keytab'.format(tmpdir=self.realm.tmpdir) @@ -599,7 +600,7 @@ def test_inquire_mechs_for_name(self): res.shouldnt_be_none() res.should_include(gb.MechType.kerberos) - @_extension_test('password', 'Password') + @ktu.gssapi_extension_test('password', 'Password') def test_acquire_cred_with_password(self): password = self.realm.password('user') self.realm.kinit(self.realm.user_princ, password=password) @@ -620,7 +621,7 @@ def test_acquire_cred_with_password(self): output_ttl.should_be_a(int) - @_extension_test('password_add', 'Password (add)') + @ktu.gssapi_extension_test('password_add', 'Password (add)') def test_add_cred_with_password(self): password = self.realm.password('user') self.realm.kinit(self.realm.user_princ, password=password) @@ -1041,7 +1042,7 @@ def test_basic_wrap_unwrap(self): unwrapped_message.shouldnt_be_empty() unwrapped_message.should_be(b'test message') - @_extension_test('dce', 'DCE (IOV/AEAD)') + @ktu.gssapi_extension_test('dce', 'DCE (IOV/AEAD)') def test_basic_iov_wrap_unwrap_prealloc(self): init_data = b'some encrypted data' init_other_data = b'some other encrypted data' @@ -1085,7 +1086,7 @@ def test_basic_iov_wrap_unwrap_prealloc(self): init_message[2].value.should_be(init_data) init_message[3].value.should_be(init_other_data) - @_extension_test('dce', 'DCE (IOV/AEAD)') + @ktu.gssapi_extension_test('dce', 'DCE (IOV/AEAD)') def test_basic_iov_wrap_unwrap_autoalloc(self): init_data = b'some encrypted data' init_other_data = b'some other encrypted data' @@ -1117,7 +1118,7 @@ def test_basic_iov_wrap_unwrap_autoalloc(self): init_message[2].value.should_be(init_data) init_message[3].value.should_be(init_other_data) - @_extension_test('dce', 'DCE (IOV/AEAD)') + @ktu.gssapi_extension_test('dce', 'DCE (IOV/AEAD)') def test_basic_aead_wrap_unwrap(self): assoc_data = b'some sig data' (wrapped_message, conf) = gb.wrap_aead(self.client_ctx, @@ -1143,7 +1144,7 @@ def test_basic_aead_wrap_unwrap(self): unwrapped_message.shouldnt_be_empty() unwrapped_message.should_be(b'test message') - @_extension_test('dce', 'DCE (IOV/AEAD)') + @ktu.gssapi_extension_test('dce', 'DCE (IOV/AEAD)') def test_basic_aead_wrap_unwrap_no_assoc(self): (wrapped_message, conf) = gb.wrap_aead(self.client_ctx, b'test message') @@ -1167,7 +1168,7 @@ def test_basic_aead_wrap_unwrap_no_assoc(self): unwrapped_message.shouldnt_be_empty() unwrapped_message.should_be(b'test message') - @_extension_test('dce', 'DCE (IOV/AEAD)') + @ktu.gssapi_extension_test('dce', 'DCE (IOV/AEAD)') def test_basic_aead_wrap_unwrap_bad_assoc_raises_error(self): assoc_data = b'some sig data' (wrapped_message, conf) = gb.wrap_aead(self.client_ctx, @@ -1183,7 +1184,7 @@ def test_basic_aead_wrap_unwrap_bad_assoc_raises_error(self): gb.unwrap_aead.should_raise(gb.BadMICError, self.server_ctx, wrapped_message, b'some other sig data') - @_extension_test('iov_mic', 'IOV MIC') + @ktu.gssapi_extension_test('iov_mic', 'IOV MIC') def test_get_mic_iov(self): init_message = gb.IOV(b'some data', (gb.IOVBufferType.sign_only, b'some sig data'), @@ -1194,7 +1195,7 @@ def test_get_mic_iov(self): init_message[2].type.should_be(gb.IOVBufferType.mic_token) init_message[2].value.shouldnt_be_empty() - @_extension_test('iov_mic', 'IOV MIC') + @ktu.gssapi_extension_test('iov_mic', 'IOV MIC') def test_basic_verify_mic_iov(self): init_message = gb.IOV(b'some data', (gb.IOVBufferType.sign_only, b'some sig data'), @@ -1209,7 +1210,7 @@ def test_basic_verify_mic_iov(self): qop_used.should_be_an_integer() - @_extension_test('iov_mic', 'IOV MIC') + @ktu.gssapi_extension_test('iov_mic', 'IOV MIC') def test_verify_mic_iov_bad_mic_raises_error(self): init_message = gb.IOV(b'some data', (gb.IOVBufferType.sign_only, b'some sig data'), @@ -1220,7 +1221,7 @@ def test_verify_mic_iov_bad_mic_raises_error(self): gb.verify_mic_iov.should_raise(gb.GSSError, self.server_ctx, init_message) - @_extension_test('iov_mic', 'IOV MIC') + @ktu.gssapi_extension_test('iov_mic', 'IOV MIC') def test_get_mic_iov_length(self): init_message = gb.IOV(b'some data', (gb.IOVBufferType.sign_only, b'some sig data'), diff --git a/test-requirements.txt b/test-requirements.txt index b037c748..49418afd 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,4 +4,4 @@ nose_parameterized shouldbe six Cython - +k5test diff --git a/tox.ini b/tox.ini index bd0d018b..1576b4ab 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py27,py33,py34 +envlist = py27,py33,py34,py35 [testenv] # NB(sross): disabling E225,E226,E227,E901 make pep8 think Cython is ok