diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..558e095 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,83 @@ +name: Test k5test +on: + push: + branches: + - main + paths-ignore: + - CODE_OF_CONDUCT.md + - K5TEST-LICENSE.txt + - LICENSE.txt + - README.md + + pull_request: + branches: + - main + paths-ignore: + - CODE_OF_CONDUCT.md + - K5TEST-LICENSE.txt + - LICENSE.txt + - README.md + + release: + types: + - published + + schedule: + - cron: 0 9 * * * + +jobs: + test: + name: test + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - uses: actions/setup-python@v2 + + - name: Test + run: | + echo "::group::Installing Python Requirements" + + python -m pip install --upgrade pip setuptools wheel + python -m pip install --requirement requirements-dev.txt + python -m pip install . + + echo "::endgroup::" + + echo "::group::Running Sanity Checks" + + python -m black . --check + python -m isort . --check-only + + echo "::endgroup::" + + publish: + name: publish + needs: + - test + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - uses: actions/setup-python@v2 + + - name: Build sdist and wheel + run: | + python -m pip install --upgrade pip setuptools wheel + python setup.py bdist_wheel --universal + python setup.py sdist + + - name: Capture sdist and wheel + uses: actions/upload-artifact@v2 + with: + name: k5test + path: dist/* + + - name: Publish + if: startsWith(github.ref, 'refs/tags/v') + uses: pypa/gh-action-pypi-publish@release/v1 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..034549c --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,10 @@ +repos: +- repo: https://github.com/psf/black + rev: 21.9b0 + hooks: + - id: black + +- repo: https://github.com/PyCQA/isort + rev: 5.9.3 + hooks: + - id: isort diff --git a/k5test/_utils.py b/k5test/_utils.py index e289fac..906e978 100644 --- a/k5test/_utils.py +++ b/k5test/_utils.py @@ -1,12 +1,12 @@ import os -import sys import subprocess +import sys # general use def get_output(*args, **kwargs): res = subprocess.check_output(*args, shell=True, **kwargs) - decoded = res.decode('utf-8') + decoded = res.decode("utf-8") return decoded.strip() @@ -27,7 +27,7 @@ def import_gssapi_extension(name): """ try: - path = 'gssapi.raw.ext_{0}'.format(name) + path = "gssapi.raw.ext_{0}".format(name) __import__(path) return sys.modules[path] except ImportError: @@ -44,33 +44,31 @@ def find_plugin_dir(): return _PLUGIN_DIR # if we've set a LD_LIBRARY_PATH, use that first - ld_path_raw = os.environ.get('LD_LIBRARY_PATH') + ld_path_raw = os.environ.get("LD_LIBRARY_PATH") if ld_path_raw is not None: # first, try assuming it's just a normal install - ld_paths = [path for path in ld_path_raw.split(':') if path] + ld_paths = [path for path in ld_path_raw.split(":") if path] for ld_path in ld_paths: if not os.path.exists(ld_path): continue - _PLUGIN_DIR = _decide_plugin_dir( - _find_plugin_dirs_installed(ld_path)) + _PLUGIN_DIR = _decide_plugin_dir(_find_plugin_dirs_installed(ld_path)) if _PLUGIN_DIR is None: - _PLUGIN_DIR = _decide_plugin_dir( - _find_plugin_dirs_src(ld_path)) + _PLUGIN_DIR = _decide_plugin_dir(_find_plugin_dirs_src(ld_path)) if _PLUGIN_DIR is not None: break # if there was no LD_LIBRARY_PATH, or the above failed if _PLUGIN_DIR is None: - lib_dir = os.path.join(get_output('krb5-config --prefix'), 'lib64') + lib_dir = os.path.join(get_output("krb5-config --prefix"), "lib64") _PLUGIN_DIR = _decide_plugin_dir(_find_plugin_dirs_installed(lib_dir)) # /usr/lib64 seems only to be distinct on Fedora/RHEL/Centos family if _PLUGIN_DIR is None: - lib_dir = os.path.join(get_output('krb5-config --prefix'), 'lib') + lib_dir = os.path.join(get_output("krb5-config --prefix"), "lib") _PLUGIN_DIR = _decide_plugin_dir(_find_plugin_dirs_installed(lib_dir)) if _PLUGIN_DIR is not None: @@ -97,23 +95,25 @@ def _decide_plugin_dir(dirs): def _find_plugin_dirs_installed(search_path): try: - options_raw = get_output('find %s/ -type d \( ! -executable -o ! -readable \) ' - '-prune -o ' - '-type d -path "*/krb5/plugins" -print' % search_path, - stderr=subprocess.STDOUT) + options_raw = get_output( + "find %s/ -type d \( ! -executable -o ! -readable \) " + "-prune -o " + '-type d -path "*/krb5/plugins" -print' % search_path, + stderr=subprocess.STDOUT, + ) except subprocess.CalledProcessError: options_raw = None if options_raw: - return options_raw.split('\n') + return options_raw.split("\n") else: return None def _find_plugin_dirs_src(search_path): - options_raw = get_output('find %s/../ -type d -name plugins' % search_path) + options_raw = get_output("find %s/../ -type d -name plugins" % search_path) if options_raw: - return options_raw.split('\n') + return options_raw.split("\n") else: return None diff --git a/k5test/realm.py b/k5test/realm.py index 3763620..8b24141 100644 --- a/k5test/realm.py +++ b/k5test/realm.py @@ -28,20 +28,19 @@ # - removed some Python 2 specific code import abc import copy +import logging import os import shlex import shutil import signal import socket import string -import sys import subprocess +import sys import tempfile -import logging from k5test import _utils - _LOG = logging.getLogger(__name__) @@ -58,9 +57,10 @@ def _cfg_merge(cfg1, cfg2): value1 = result[key] if isinstance(value1, dict): if not isinstance(value2, dict): - raise TypeError("value at key '{key}' not dict: " - "{type}".format(key=key, - type=type(value2))) + raise TypeError( + "value at key '{key}' not dict: " + "{type}".format(key=key, type=type(value2)) + ) result[key] = _cfg_merge(value1, value2) else: result[key] = copy.deepcopy(value2) @@ -70,10 +70,8 @@ def _cfg_merge(cfg1, cfg2): def _discover_path(name, default, paths): stderr_out = subprocess.DEVNULL try: - path = subprocess.check_output(['which', name], - stderr=stderr_out).strip() - path = path.decode(sys.getfilesystemencoding() or - sys.getdefaultencoding()) + path = subprocess.check_output(["which", name], stderr=stderr_out).strip() + path = path.decode(sys.getfilesystemencoding() or sys.getdefaultencoding()) _LOG.debug(f"Using discovered path for {name} ({path})") return path except subprocess.CalledProcessError as e: @@ -89,58 +87,71 @@ def __new__(cls, *args, **kwargs): provider_cls = cls if provider_cls == K5Realm: - krb5_config = _discover_path('krb5-config', - '/usr/bin/krb5-config', kwargs) + krb5_config = _discover_path("krb5-config", "/usr/bin/krb5-config", kwargs) try: krb5_version = subprocess.check_output( - [krb5_config, '--version'], stderr=subprocess.STDOUT) + [krb5_config, "--version"], stderr=subprocess.STDOUT + ) krb5_version = krb5_version.decode( - sys.getfilesystemencoding() or sys.getdefaultencoding()) + sys.getfilesystemencoding() or sys.getdefaultencoding() + ) # macOS output doesn't contain Heimdal - if 'heimdal' in krb5_version.lower() or ( - sys.platform == 'darwin' and - krb5_config == '/usr/bin/krb5-config'): + if "heimdal" in krb5_version.lower() or ( + sys.platform == "darwin" and krb5_config == "/usr/bin/krb5-config" + ): provider_cls = HeimdalRealm else: provider_cls = MITRealm except Exception as e: - _LOG.debug(f"Failed to determine gssapi provider, defaulting " - f"to MIT: {e}") + _LOG.debug( + f"Failed to determine gssapi provider, defaulting " f"to MIT: {e}" + ) provider_cls = MITRealm return super(K5Realm, cls).__new__(provider_cls) - def __init__(self, realm='KRBTEST.COM', portbase=61000, - krb5_conf=None, kdc_conf=None, create_kdb=True, - krbtgt_keysalt=None, create_user=True, get_creds=True, - create_host=True, start_kdc=True, start_kadmind=False, - existing=None, **paths): + def __init__( + self, + realm="KRBTEST.COM", + portbase=61000, + krb5_conf=None, + kdc_conf=None, + create_kdb=True, + krbtgt_keysalt=None, + create_user=True, + get_creds=True, + create_host=True, + start_kdc=True, + start_kadmind=False, + existing=None, + **paths, + ): if existing is not None: self.tmpdir = existing self.is_existing = True else: - self.tmpdir = tempfile.mkdtemp(suffix='-krbtest') + self.tmpdir = tempfile.mkdtemp(suffix="-krbtest") self.is_existing = False self.realm = realm self.portbase = portbase - self.user_princ = 'user@' + self.realm - self.admin_princ = 'user/admin@' + self.realm - self.host_princ = 'host/%s@%s' % (self.hostname, self.realm) - self.nfs_princ = 'nfs/%s@%s' % (self.hostname, self.realm) - self.krbtgt_princ = 'krbtgt/%s@%s' % (self.realm, self.realm) - self.keytab = os.path.join(self.tmpdir, 'keytab') - self.client_keytab = os.path.join(self.tmpdir, 'client_keytab') - self.ccache = os.path.join(self.tmpdir, 'ccache') - self.kadmin_ccache = os.path.join(self.tmpdir, 'kadmin_ccache') + self.user_princ = "user@" + self.realm + self.admin_princ = "user/admin@" + self.realm + self.host_princ = "host/%s@%s" % (self.hostname, self.realm) + self.nfs_princ = "nfs/%s@%s" % (self.hostname, self.realm) + self.krbtgt_princ = "krbtgt/%s@%s" % (self.realm, self.realm) + self.keytab = os.path.join(self.tmpdir, "keytab") + self.client_keytab = os.path.join(self.tmpdir, "client_keytab") + self.ccache = os.path.join(self.tmpdir, "ccache") + self.kadmin_ccache = os.path.join(self.tmpdir, "kadmin_ccache") self._kdc_proc = None self._kadmind_proc = None - krb5_conf_path = os.path.join(self.tmpdir, 'krb5.conf') - kdc_conf_path = os.path.join(self.tmpdir, 'kdc.conf') + krb5_conf_path = os.path.join(self.tmpdir, "krb5.conf") + kdc_conf_path = os.path.join(self.tmpdir, "kdc.conf") self.env = self._make_env(krb5_conf_path, kdc_conf_path) self._daemons = [] @@ -148,11 +159,9 @@ def __init__(self, realm='KRBTEST.COM', portbase=61000, self._init_paths(**paths) if existing is None: - self._create_conf(_cfg_merge(self._krb5_conf, krb5_conf), - krb5_conf_path) + self._create_conf(_cfg_merge(self._krb5_conf, krb5_conf), krb5_conf_path) if self._kdc_conf or kdc_conf: - self._create_conf(_cfg_merge(self._kdc_conf, kdc_conf), - kdc_conf_path) + self._create_conf(_cfg_merge(self._kdc_conf, kdc_conf), kdc_conf_path) self._create_acl() self._create_dictfile() @@ -161,8 +170,8 @@ def __init__(self, realm='KRBTEST.COM', portbase=61000, if krbtgt_keysalt and create_kdb: self.change_password(self.krbtgt_princ, keysalt=krbtgt_keysalt) if create_user and create_kdb: - self.addprinc(self.user_princ, self.password('user')) - self.addprinc(self.admin_princ, self.password('admin')) + self.addprinc(self.user_princ, self.password("user")) + self.addprinc(self.admin_princ, self.password("admin")) if create_host and create_kdb: self.addprinc(self.host_princ) self.extract_keytab(self.host_princ, self.keytab) @@ -171,9 +180,10 @@ def __init__(self, realm='KRBTEST.COM', portbase=61000, if start_kadmind and create_kdb: self.start_kadmind() - if (get_creds and ((create_kdb and create_user and start_kdc) - or self.is_existing)): - self.kinit(self.user_princ, self.password('user')) + if get_creds and ( + (create_kdb and create_user and start_kdc) or self.is_existing + ): + self.kinit(self.user_princ, self.password("user")) self.klist() @abc.abstractproperty @@ -209,8 +219,7 @@ def extract_keytab(self, princname, keytab): pass @abc.abstractmethod - def kinit(self, princname, password=None, flags=None, verbose=True, - **keywords): + def kinit(self, princname, password=None, flags=None, verbose=True, **keywords): pass @abc.abstractmethod @@ -247,74 +256,78 @@ def _init_paths(self, **paths): setattr(self, attr, value) def _create_conf(self, profile, filename): - with open(filename, 'w') as conf_file: + with open(filename, "w") as conf_file: for section, contents in profile.items(): - conf_file.write('[%s]\n' % section) + conf_file.write("[%s]\n" % section) self._write_cfg_section(conf_file, contents, 1) def _write_cfg_section(self, conf_file, contents, indent_level): - indent = '\t' * indent_level + indent = "\t" * indent_level for name, value in contents.items(): name = self._subst_cfg_value(name) if isinstance(value, dict): # A dictionary value yields a list subsection. - conf_file.write('%s%s = {\n' % (indent, name)) + conf_file.write("%s%s = {\n" % (indent, name)) self._write_cfg_section(conf_file, value, indent_level + 1) - conf_file.write('%s}\n' % indent) + conf_file.write("%s}\n" % indent) elif isinstance(value, list): # A list value yields multiple values for the same name. for item in value: item = self._subst_cfg_value(item) - conf_file.write('%s%s = %s\n' % (indent, name, item)) + conf_file.write("%s%s = %s\n" % (indent, name, item)) elif isinstance(value, str): # A string value yields a straightforward variable setting. value = self._subst_cfg_value(value) - conf_file.write('%s%s = %s\n' % (indent, name, value)) + conf_file.write("%s%s = %s\n" % (indent, name, value)) elif value is not None: - raise TypeError("Unknown config type at key '{key}': " - "{type}".format(key=name, type=type(value))) + raise TypeError( + "Unknown config type at key '{key}': " + "{type}".format(key=name, type=type(value)) + ) @property def hostname(self): - return 'localhost' if sys.platform == 'darwin' else socket.getfqdn() + return "localhost" if sys.platform == "darwin" else socket.getfqdn() def _subst_cfg_value(self, value): template = string.Template(value) - return template.substitute(realm=self.realm, - tmpdir=self.tmpdir, - hostname=self.hostname, - port0=self.portbase, - port1=self.portbase + 1, - port2=self.portbase + 2, - port3=self.portbase + 3, - port4=self.portbase + 4, - port5=self.portbase + 5, - port6=self.portbase + 6, - port7=self.portbase + 7, - port8=self.portbase + 8, - port9=self.portbase + 9) + return template.substitute( + realm=self.realm, + tmpdir=self.tmpdir, + hostname=self.hostname, + port0=self.portbase, + port1=self.portbase + 1, + port2=self.portbase + 2, + port3=self.portbase + 3, + port4=self.portbase + 4, + port5=self.portbase + 5, + port6=self.portbase + 6, + port7=self.portbase + 7, + port8=self.portbase + 8, + port9=self.portbase + 9, + ) def _create_acl(self): - filename = os.path.join(self.tmpdir, 'acl') - with open(filename, 'w') as acl_file: - acl_file.write('%s *\n' % self.admin_princ) - acl_file.write('kiprop/%s@%s p\n' % (self.hostname, self.realm)) + filename = os.path.join(self.tmpdir, "acl") + with open(filename, "w") as acl_file: + acl_file.write("%s *\n" % self.admin_princ) + acl_file.write("kiprop/%s@%s p\n" % (self.hostname, self.realm)) def _create_dictfile(self): - filename = os.path.join(self.tmpdir, 'dictfile') - with open(filename, 'w') as dict_file: - dict_file.write('weak_password\n') + filename = os.path.join(self.tmpdir, "dictfile") + with open(filename, "w") as dict_file: + dict_file.write("weak_password\n") def _make_env(self, krb5_conf_path, kdc_conf_path): env = {} - env['KRB5_CONFIG'] = krb5_conf_path - env['KRB5_KDC_PROFILE'] = kdc_conf_path or os.devnull - env['KRB5CCNAME'] = self.ccache - env['KRB5_KTNAME'] = self.keytab - env['KRB5_CLIENT_KTNAME'] = self.client_keytab - env['KRB5RCACHEDIR'] = self.tmpdir - env['KPROPD_PORT'] = str(self.kprop_port()) - env['KPROP_PORT'] = str(self.kprop_port()) + env["KRB5_CONFIG"] = krb5_conf_path + env["KRB5_KDC_PROFILE"] = kdc_conf_path or os.devnull + env["KRB5CCNAME"] = self.ccache + env["KRB5_KTNAME"] = self.keytab + env["KRB5_CLIENT_KTNAME"] = self.client_keytab + env["KRB5RCACHEDIR"] = self.tmpdir + env["KPROPD_PORT"] = str(self.kprop_port()) + env["KPROP_PORT"] = str(self.kprop_port()) return env def run(self, args, env=None, input=None, expected_code=0): @@ -326,22 +339,27 @@ def run(self, args, env=None, input=None, expected_code=0): else: infile = subprocess.DEVNULL - proc = subprocess.Popen(args, stdin=infile, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, env=env) + proc = subprocess.Popen( + args, + stdin=infile, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + ) if input: inbytes = input.encode() else: inbytes = None (outdata, blank_errdata) = proc.communicate(inbytes) code = proc.returncode - cmd = ' '.join(args) + cmd = " ".join(args) outstr = outdata.decode() - _LOG.debug('[OUTPUT FROM `{args}`]\n{output}\n'.format(args=cmd, - output=outstr)) + _LOG.debug("[OUTPUT FROM `{args}`]\n{output}\n".format(args=cmd, output=outstr)) if code != expected_code: - raise Exception("Unexpected return code " - "for command `{args}`: {code}".format(args=cmd, - code=code)) + raise Exception( + "Unexpected return code " + "for command `{args}`: {code}".format(args=cmd, code=code) + ) return outdata @@ -359,19 +377,27 @@ def _start_daemon(self, args, env=None, sentinel=None): env = self.env stdout = subprocess.PIPE if sentinel else subprocess.DEVNULL - proc = subprocess.Popen(args, stdin=subprocess.DEVNULL, stdout=stdout, - stderr=subprocess.STDOUT, env=env) - cmd = ' '.join(args) + proc = subprocess.Popen( + args, + stdin=subprocess.DEVNULL, + stdout=stdout, + stderr=subprocess.STDOUT, + env=env, + ) + cmd = " ".join(args) while sentinel: line = proc.stdout.readline().decode() if line == "": code = proc.wait() - raise Exception('`{args}` failed to start ' - 'with code {code}'.format(args=cmd, - code=code)) + raise Exception( + "`{args}` failed to start " + "with code {code}".format(args=cmd, code=code) + ) else: - _LOG.debug('[OUTPUT FROM `{args}`]\n' - '{output}\n'.format(args=cmd, output=line)) + _LOG.debug( + "[OUTPUT FROM `{args}`]\n" + "{output}\n".format(args=cmd, output=line) + ) if sentinel in line: break @@ -386,12 +412,12 @@ def _stop_daemon(self, proc): self._daemons.remove(proc) def stop_kdc(self): - assert(self._kdc_proc is not None) + assert self._kdc_proc is not None self._stop_daemon(self._kdc_proc) self._kdc_proc = None def stop_kadmind(self): - assert(self._kadmind_proc is not None) + assert self._kadmind_proc is not None self._stop_daemon(self._kadmind_proc) self._kadmind_proc = None @@ -409,11 +435,11 @@ def password(self, name): return name + str(os.path.basename(self.tmpdir)) def special_env(self, name, has_kdc_conf, krb5_conf=None, kdc_conf=None): - krb5_conf_path = os.path.join(self.tmpdir, f'krb5.conf.{name}') + krb5_conf_path = os.path.join(self.tmpdir, f"krb5.conf.{name}") krb5_conf = _cfg_merge(self._krb5_conf, krb5_conf) self._create_conf(krb5_conf, krb5_conf_path) if has_kdc_conf and self._kdc_conf: - kdc_conf_path = os.path.join(self.tmpdir, f'kdc.conf.{name}') + kdc_conf_path = os.path.join(self.tmpdir, f"kdc.conf.{name}") kdc_conf = _cfg_merge(self._kdc_conf, kdc_conf) self._create_conf(kdc_conf, kdc_conf_path) else: @@ -429,96 +455,101 @@ def kill_daemons(self): class MITRealm(K5Realm): @property def provider(self): - return 'mit' + return "mit" @property def _default_paths(self): return [ - ('kdb5_util', 'kdb5_util', '/usr/sbin/kdb5_util'), - ('krb5kdc', 'krb5kdc', '/usr/sbin/krb5kdc'), - ('kadmin', 'kadmin', '/usr/bin/kadmin'), - ('kadmin_local', 'kadmin.local', '/usr/sbin/kadmin.local'), - ('kadmind', 'kadmind', '/usr/sbin/kadmind'), - ('kprop', 'kprop', '/usr/sbin/kprop'), - ('_kinit', 'kinit', '/usr/bin/kinit'), - ('_klist', 'klist', '/usr/bin/klist'), + ("kdb5_util", "kdb5_util", "/usr/sbin/kdb5_util"), + ("krb5kdc", "krb5kdc", "/usr/sbin/krb5kdc"), + ("kadmin", "kadmin", "/usr/bin/kadmin"), + ("kadmin_local", "kadmin.local", "/usr/sbin/kadmin.local"), + ("kadmind", "kadmind", "/usr/sbin/kadmind"), + ("kprop", "kprop", "/usr/sbin/kprop"), + ("_kinit", "kinit", "/usr/bin/kinit"), + ("_klist", "klist", "/usr/bin/klist"), ] @property def _krb5_conf(self): return { - 'libdefaults': { - 'default_realm': '$realm', - 'dns_lookup_kdc': 'false'}, - 'realms': { - '$realm': { - 'kdc': '$hostname:$port0', - 'admin_server': '$hostname:$port1', - 'kpasswd_server': '$hostname:$port2'}}} + "libdefaults": {"default_realm": "$realm", "dns_lookup_kdc": "false"}, + "realms": { + "$realm": { + "kdc": "$hostname:$port0", + "admin_server": "$hostname:$port1", + "kpasswd_server": "$hostname:$port2", + } + }, + } @property def _kdc_conf(self): plugin_dir = _utils.find_plugin_dir() db_module_dir = None if plugin_dir: - db_module_dir = os.path.join(plugin_dir, 'kdc') + db_module_dir = os.path.join(plugin_dir, "kdc") return { - 'realms': { - '$realm': { - 'database_module': 'db', - 'iprop_port': '$port4', - 'key_stash_file': '$tmpdir/stash', - 'acl_file': '$tmpdir/acl', - 'dict_file': '$tmpdir/dictfile', - 'kadmind_port': '$port1', - 'kpasswd_port': '$port2', - 'kdc_ports': '$port0', - 'kdc_tcp_ports': '$port0', - 'database_name': '$tmpdir/db'}}, - 'dbmodules': { - 'db_module_dir': db_module_dir, - 'db': {'db_library': 'db2', 'database_name': '$tmpdir/db'}}, - 'logging': { - 'admin_server': 'FILE:$tmpdir/kadmind5.log', - 'kdc': 'FILE:$tmpdir/kdc.log', - 'default': 'FILE:$tmpdir/others.log'}} + "realms": { + "$realm": { + "database_module": "db", + "iprop_port": "$port4", + "key_stash_file": "$tmpdir/stash", + "acl_file": "$tmpdir/acl", + "dict_file": "$tmpdir/dictfile", + "kadmind_port": "$port1", + "kpasswd_port": "$port2", + "kdc_ports": "$port0", + "kdc_tcp_ports": "$port0", + "database_name": "$tmpdir/db", + } + }, + "dbmodules": { + "db_module_dir": db_module_dir, + "db": {"db_library": "db2", "database_name": "$tmpdir/db"}, + }, + "logging": { + "admin_server": "FILE:$tmpdir/kadmind5.log", + "kdc": "FILE:$tmpdir/kdc.log", + "default": "FILE:$tmpdir/others.log", + }, + } def create_kdb(self): - self.run([self.kdb5_util, 'create', '-W', '-s', '-P', 'master']) + self.run([self.kdb5_util, "create", "-W", "-s", "-P", "master"]) def addprinc(self, princname, password=None): - args = ['addprinc'] + args = ["addprinc"] if password: - args.extend(['-pw', password]) + args.extend(["-pw", password]) else: - args.append('-randkey') + args.append("-randkey") args.append(princname) self.run_kadminl(args) def change_password(self, principal, password=None, keysalt=None): - args = ['cpw'] + args = ["cpw"] if password: - args.extend(['-pw', password]) + args.extend(["-pw", password]) else: - args.append('-randkey') + args.append("-randkey") if keysalt: - args.extend('-e', keysalt) + args.extend("-e", keysalt) args.append(principal) self.run_kadminl(args) def extract_keytab(self, princname, keytab): - self.run_kadminl(f'ktadd -k {keytab} -norandkey {princname}') + self.run_kadminl(f"ktadd -k {keytab} -norandkey {princname}") - def kinit(self, princname, password=None, flags=None, verbose=True, - **keywords): + def kinit(self, princname, password=None, flags=None, verbose=True, **keywords): cmd = [self._kinit] if verbose: - cmd.append('-V') + cmd.append("-V") if flags: cmd.extend(flags) cmd.append(princname) @@ -530,141 +561,160 @@ def klist(self, ccache=None, **keywords): return self.run([self._klist, ccache or self.ccache], **keywords) def klist_keytab(self, keytab=None, **keywords): - return self.run([self._klist, '-k', keytab or self.keytab], - **keywords) + return self.run([self._klist, "-k", keytab or self.keytab], **keywords) def prep_kadmin(self, princname=None, pw=None, flags=None): if princname is None: princname = self.admin_princ - pw = self.password('admin') - return self.kinit(princname, pw, - flags=['-S', 'kadmin/admin', - '-c', self.kadmin_ccache] + (flags or [])) + pw = self.password("admin") + return self.kinit( + princname, + pw, + flags=["-S", "kadmin/admin", "-c", self.kadmin_ccache] + (flags or []), + ) def run_kadmin(self, query, **keywords): - return self.run([self.kadmin, '-c', self.kadmin_ccache, '-q', query], - **keywords) + return self.run( + [self.kadmin, "-c", self.kadmin_ccache, "-q", query], **keywords + ) def run_kadminl(self, query, **keywords): if isinstance(query, list): query = " ".join([shlex.quote(q) for q in query]) - return self.run([self.kadmin_local, '-q', query], **keywords) + return self.run([self.kadmin_local, "-q", query], **keywords) def start_kdc(self, args=None, env=None): if self._kdc_proc: raise Exception("KDC has already started") - start_args = [self.krb5kdc, '-n'] + start_args = [self.krb5kdc, "-n"] if args: start_args.extend(args) - self._kdc_proc = self._start_daemon(start_args, env, 'starting...') + self._kdc_proc = self._start_daemon(start_args, env, "starting...") def start_kadmind(self, env): if self._kadmind_proc: raise Exception("kadmind has already started") - dump_path = os.path.join(self.tmpdir, 'dump') - self._kadmind_proc = self._start_daemon([self.kadmind, '-nofork', '-W', - '-p', self.kdb5_util, - '-K', self.kprop, - '-F', dump_path], env, - 'starting...') + dump_path = os.path.join(self.tmpdir, "dump") + self._kadmind_proc = self._start_daemon( + [ + self.kadmind, + "-nofork", + "-W", + "-p", + self.kdb5_util, + "-K", + self.kprop, + "-F", + dump_path, + ], + env, + "starting...", + ) class HeimdalRealm(K5Realm): @property def provider(self): - return 'heimdal' + return "heimdal" @property def _default_paths(self): - base = '/System/Library/PrivateFrameworks/Heimdal.framework/Helpers' - if sys.platform != 'darwin': - base = '/usr/libexec' + base = "/System/Library/PrivateFrameworks/Heimdal.framework/Helpers" + if sys.platform != "darwin": + base = "/usr/libexec" return [ - ('krb5kdc', 'kdc', os.path.join(base, 'kdc')), - ('kadmin', 'kadmin', '/usr/bin/kadmin'), - ('kadmin_local', 'kadmin', '/usr/bin/kadmin'), - ('kadmind', 'kadmind', os.path.join(base, 'kadmind')), - ('_kinit', 'kinit', '/usr/bin/kinit'), - ('_klist', 'klist', '/usr/bin/klist'), - ('_ktutil', 'ktutil', '/usr/bin/ktutil'), + ("krb5kdc", "kdc", os.path.join(base, "kdc")), + ("kadmin", "kadmin", "/usr/bin/kadmin"), + ("kadmin_local", "kadmin", "/usr/bin/kadmin"), + ("kadmind", "kadmind", os.path.join(base, "kadmind")), + ("_kinit", "kinit", "/usr/bin/kinit"), + ("_klist", "klist", "/usr/bin/klist"), + ("_ktutil", "ktutil", "/usr/bin/ktutil"), ] @property def _krb5_conf(self): return { - 'libdefaults': { - 'default_realm': '$realm', - 'default_keytab_name': 'FILE:$tmpdir/keytab', - 'dns_lookup_kdc': 'false', - 'dns_lookup_realm': 'false'}, - 'realms': { - '$realm': { - 'kdc': '$hostname:$port0', - 'admin_server': '$hostname:$port1', - 'kpasswd_server': '$hostname:$port2'}}, - 'logging': { - 'kadmind': 'FILE:$tmpdir/kadmind.log', - 'kdc': 'FILE:$tmpdir/kdc.log', - 'kpasswdd': 'FILE:$tmpdir/kpasswdd.log', - 'krb5': 'FILE:$tmpdir/krb5.log', - 'default': 'FILE:$tmpdir/others.log'}, - 'kdc': { - 'database': { - 'dbname': '$tmpdir/db', - 'mkey_file': '$tmpdir/stash', - 'acl_file': '$tmpdir/acl', - 'log_file': '$tmpdir/db.log'}, - 'ports': '$port0'}} + "libdefaults": { + "default_realm": "$realm", + "default_keytab_name": "FILE:$tmpdir/keytab", + "dns_lookup_kdc": "false", + "dns_lookup_realm": "false", + }, + "realms": { + "$realm": { + "kdc": "$hostname:$port0", + "admin_server": "$hostname:$port1", + "kpasswd_server": "$hostname:$port2", + } + }, + "logging": { + "kadmind": "FILE:$tmpdir/kadmind.log", + "kdc": "FILE:$tmpdir/kdc.log", + "kpasswdd": "FILE:$tmpdir/kpasswdd.log", + "krb5": "FILE:$tmpdir/krb5.log", + "default": "FILE:$tmpdir/others.log", + }, + "kdc": { + "database": { + "dbname": "$tmpdir/db", + "mkey_file": "$tmpdir/stash", + "acl_file": "$tmpdir/acl", + "log_file": "$tmpdir/db.log", + }, + "ports": "$port0", + }, + } @property def _kdc_conf(self): return def create_kdb(self): - self.run_kadminl(['stash', f'--key-file={self.tmpdir}/stash' - '--random-password']) - self.run_kadminl(['init', self.realm], input="\n\n") + self.run_kadminl( + ["stash", f"--key-file={self.tmpdir}/stash" "--random-password"] + ) + self.run_kadminl(["init", self.realm], input="\n\n") def addprinc(self, princname, password=None): - args = ['add', '--use-defaults'] + args = ["add", "--use-defaults"] if password: - args.append(f'--password={password}') + args.append(f"--password={password}") else: - args.append('--random-key') + args.append("--random-key") args.append(princname) self.run_kadminl(args) def change_password(self, principal, password=None, keysalt=None): - args = ['change_password'] + args = ["change_password"] if password: - args.append(f'--password={password}') + args.append(f"--password={password}") else: - args.append('--random-key') + args.append("--random-key") if keysalt: - args.extend('-e', keysalt) + args.extend("-e", keysalt) args.append(principal) self.run_kadminl(args) def extract_keytab(self, princname, keytab): - self.run_kadminl(['ext', f'--keytab={keytab}', princname]) + self.run_kadminl(["ext", f"--keytab={keytab}", princname]) - def kinit(self, princname, password=None, flags=None, verbose=True, - **keywords): + def kinit(self, princname, password=None, flags=None, verbose=True, **keywords): cmd = [self._kinit] input = None if password: input = password + "\n" - cmd.append('--password-file=STDIN') + cmd.append("--password-file=STDIN") if flags: cmd.extend(flags) @@ -673,12 +723,10 @@ def kinit(self, princname, password=None, flags=None, verbose=True, return self.run(cmd, input=input, **keywords) def klist(self, ccache=None, **keywords): - return self.run([self._klist, '-c', ccache or self.ccache], - **keywords) + return self.run([self._klist, "-c", ccache or self.ccache], **keywords) def klist_keytab(self, keytab=None, **keywords): - return self.run([self._ktutil, '-k', keytab or self.keytab, 'list'], - **keywords) + return self.run([self._ktutil, "-k", keytab or self.keytab, "list"], **keywords) def prep_kadmin(self, princname=None, pw=None, flags=None): raise NotImplementedError() # Not needed right now @@ -690,10 +738,10 @@ def run_kadminl(self, query, **keywords): if not isinstance(query, list): query = [query] - args = [self.kadmin_local, '--local'] - krb5_config = self.env.get('KRB5_CONFIG', None) + args = [self.kadmin_local, "--local"] + krb5_config = self.env.get("KRB5_CONFIG", None) if krb5_config: - args.append(f'--config-file={krb5_config}') + args.append(f"--config-file={krb5_config}") return self.run(args + query, **keywords) @@ -703,20 +751,20 @@ def start_kdc(self, args=None, env=None): start_args = [self.krb5kdc] - if sys.platform == 'darwin': - start_args.append('--no-sandbox') + if sys.platform == "darwin": + start_args.append("--no-sandbox") - krb5_config = self.env.get('KRB5_CONFIG', None) + krb5_config = self.env.get("KRB5_CONFIG", None) if krb5_config: - start_args.append('--config-file=%s' % krb5_config) + start_args.append("--config-file=%s" % krb5_config) if args: start_args.extend(args) # The KDC won't display the output to stdout, so there's no sentinel # to check. Instead, read the log file for it. - kdc_log = os.path.join(self.tmpdir, 'kdc.log') - with open(kdc_log, mode='w+') as log_fd: + kdc_log = os.path.join(self.tmpdir, "kdc.log") + with open(kdc_log, mode="w+") as log_fd: self._kdc_proc = self._start_daemon(start_args, env) while True: @@ -728,7 +776,7 @@ def start_kadmind(self, env=None): if self._kadmind_proc: raise Exception("kadmind has already started") - config_file = f'--config-file={self._krb5_conf}' - port = '--ports=%s' % (self.portbase + 1) + config_file = f"--config-file={self._krb5_conf}" + port = "--ports=%s" % (self.portbase + 1) args = [self.kadmind, config_file, port] self._kadmind_proc = self._start_daemon(args) diff --git a/k5test/unit.py b/k5test/unit.py index 3c64b9e..caa8315 100644 --- a/k5test/unit.py +++ b/k5test/unit.py @@ -1,8 +1,7 @@ -import unittest import os +import unittest -from k5test import realm -from k5test import _utils +from k5test import _utils, realm # test case class @@ -22,8 +21,10 @@ def gssapi_extension_test(extension_name, extension_text): def make_ext_test(func): def ext_test(self, *args, **kwargs): if _utils.import_gssapi_extension(extension_name) is None: - self.skipTest("The %s GSSAPI extension is not supported by " - "your GSSAPI implementation" % extension_text) + self.skipTest( + "The %s GSSAPI extension is not supported by " + "your GSSAPI implementation" % extension_text + ) else: func(self, *args, **kwargs) @@ -39,16 +40,20 @@ def krb_minversion_test(target_version, problem, provider=None): global _KRB_VERSION if _KRB_VERSION is None: _KRB_VERSION = _utils.get_output("krb5-config --version") - _KRB_VERSION = _KRB_VERSION.split(' ')[-1].split('.') + _KRB_VERSION = _KRB_VERSION.split(" ")[-1].split(".") def make_ext_test(func): def ext_test(self, *args, **kwargs): - if _KRB_VERSION < target_version.split('.') and ( - not provider or self.realm.lower() == provider.lower()): - self.skipTest("Your GSSAPI (version %s) is known to have " - "problems with %s" % (_KRB_VERSION, problem)) + if _KRB_VERSION < target_version.split(".") and ( + not provider or self.realm.lower() == provider.lower() + ): + self.skipTest( + "Your GSSAPI (version %s) is known to have " + "problems with %s" % (_KRB_VERSION, problem) + ) else: func(self, *args, **kwargs) + return ext_test return make_ext_test @@ -60,15 +65,15 @@ def krb_plugin_test(plugin_type, plugin_name): krb5_plugin_path = _utils.find_plugin_dir() plugin_path = None if krb5_plugin_path: - plugin_path = os.path.join(krb5_plugin_path, plugin_type, - f"{plugin_name}.so") + plugin_path = os.path.join(krb5_plugin_path, plugin_type, f"{plugin_name}.so") def make_krb_plugin_test(func): def krb_plugin_test(self, *args, **kwargs): if not plugin_path or not os.path.exists(plugin_path): - self.skipTest("You do not have the GSSAPI {type}" - "plugin {name} installed".format( - type=plugin_type, name=plugin_name)) + self.skipTest( + "You do not have the GSSAPI {type}" + "plugin {name} installed".format(type=plugin_type, name=plugin_name) + ) else: func(self, *args, **kwargs) @@ -82,8 +87,10 @@ def make_krb_provider_test(func): def krb_provider_test(self, *args, **kwargs): provider_list = [p.lower() for p in providers] if self.realm.provider.lower() not in provider_list: - self.skipTest(f"Your GSSAPI (provider {self.realm.provider}) " - f"is known to have problems with {problem}") + self.skipTest( + f"Your GSSAPI (provider {self.realm.provider}) " + f"is known to have problems with {problem}" + ) else: func(self, *args, **kwargs) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f2ba903 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools >= 40.6.0", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.isort] +profile = "black" diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..4e09bc1 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +black +isort +pre-commit \ No newline at end of file diff --git a/setup.py b/setup.py index 1d9f222..e5909f6 100755 --- a/setup.py +++ b/setup.py @@ -1,36 +1,34 @@ #!/usr/bin/env python from setuptools import setup - setup( - name='k5test', - version='0.10.0', - author='The Python GSSAPI Team', - author_email='sross@redhat.com', - packages=['k5test'], - description='A library for testing Python applications in ' - 'self-contained Kerberos 5 environments', - long_description=open('README.md').read(), - license='LICENSE.txt', + name="k5test", + version="0.10.1", + author="The Python GSSAPI Team", + author_email="sross@redhat.com", + packages=["k5test"], + description="A library for testing Python applications in " + "self-contained Kerberos 5 environments", + long_description=open("README.md").read(), + license="LICENSE.txt", url="https://github.com/pythongssapi/k5test", python_requires=">=3.6", classifiers=[ - 'Development Status :: 4 - Beta', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: ISC License (ISCL)', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: Implementation :: CPython', - 'Topic :: Security', + "Development Status :: 4 - Beta", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Intended Audience :: Developers", + "License :: OSI Approved :: ISC License (ISCL)", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: Implementation :: CPython", + "Topic :: Security", ], - keywords=['gssapi', 'security'], + keywords=["gssapi", "security"], install_requires=[], - extras_require={ - 'extension_test': ['gssapi'] - } + extras_require={"extension_test": ["gssapi"]}, )