From ad820beebae89c886f1ba4f0d2ddac4ca36857b7 Mon Sep 17 00:00:00 2001 From: Jakub Hrozek Date: Tue, 13 Dec 2016 17:17:16 +0100 Subject: [PATCH 28/36] TESTS: Add integration tests for the KCM responder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewed-by: Michal Židek Reviewed-by: Simo Sorce Reviewed-by: Lukáš Slebodník --- contrib/ci/configure.sh | 7 + contrib/ci/deps.sh | 6 + src/tests/intg/Makefile.am | 4 + src/tests/intg/kdc.py | 175 +++++++++++++++++++++ src/tests/intg/krb5utils.py | 156 +++++++++++++++++++ src/tests/intg/test_kcm.py | 361 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 709 insertions(+) create mode 100644 src/tests/intg/kdc.py create mode 100644 src/tests/intg/krb5utils.py create mode 100644 src/tests/intg/test_kcm.py diff --git a/contrib/ci/configure.sh b/contrib/ci/configure.sh index 8e779cfe634a7555e0e8e3b52f42c07e62980fbc..7590743c2aa5fe31bcdf1a3e92a3f482dbec699b 100644 --- a/contrib/ci/configure.sh +++ b/contrib/ci/configure.sh @@ -38,6 +38,13 @@ if [[ "$DISTRO_BRANCH" == -redhat-redhatenterprise*-6.*- || "--disable-cifs-idmap-plugin" "--with-syslog=syslog" "--without-python3-bindings" + "--without-kcm" + ) +fi + +if [[ "$DISTRO_BRANCH" == -redhat-fedora-2[0-2]* ]]; then + CONFIGURE_ARG_LIST+=( + "--without-kcm" ) fi diff --git a/contrib/ci/deps.sh b/contrib/ci/deps.sh index c525e62e8c1d5b9fa042dee4ad03790dbceba242..4467e117c3a896a7f01ef7cb9e94fe28c2ea2838 100644 --- a/contrib/ci/deps.sh +++ b/contrib/ci/deps.sh @@ -47,6 +47,8 @@ if [[ "$DISTRO_BRANCH" == -redhat-* ]]; then uid_wrapper python-requests curl-devel + krb5-server + krb5-workstation ) _DEPS_LIST_SPEC=` sed -e 's/@PACKAGE_VERSION@/0/g' \ @@ -122,6 +124,10 @@ if [[ "$DISTRO_BRANCH" == -debian-* ]]; then libhttp-parser-dev libjansson-dev libcurl4-openssl-dev + krb5-kdc + krb5-admin-server + krb5-user + uuid-dev ) DEPS_INTGCHECK_SATISFIED=true fi diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am index 1d36fa0d2d50307fbc871f5b2a6f0cb1cc95db81..8526beace09b15c99aa27ac98d5038d1980f6a71 100644 --- a/src/tests/intg/Makefile.am +++ b/src/tests/intg/Makefile.am @@ -26,6 +26,9 @@ dist_noinst_DATA = \ files_ops.py \ test_files_ops.py \ test_files_provider.py \ + kdc.py \ + krb5utils.py \ + test_kcm.py \ $(NULL) config.py: config.py.m4 @@ -80,5 +83,6 @@ intgcheck-installed: config.py passwd group NSS_WRAPPER_MODULE_FN_PREFIX="sss" \ UID_WRAPPER=1 \ UID_WRAPPER_ROOT=1 \ + NON_WRAPPED_UID=$$(echo $$UID) \ fakeroot $(PYTHON2) $(PYTEST) -v --tb=native $(INTGCHECK_PYTEST_ARGS) . rm -f $(DESTDIR)$(logpath)/* diff --git a/src/tests/intg/kdc.py b/src/tests/intg/kdc.py new file mode 100644 index 0000000000000000000000000000000000000000..dec33a979916c0979561afb22dc39d6eb8894ff3 --- /dev/null +++ b/src/tests/intg/kdc.py @@ -0,0 +1,175 @@ +# +# MIT Kerberos server class +# +# Copyright (c) 2016 Red Hat, Inc. +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import os +import signal +import shutil +import subprocess + +from util import * + + +class KDC(object): + """ + MIT Kerberos KDC instance + """ + + def __init__(self, basedir, realm, + includedir=None, + kdc_port=10088, + kadmin_port=10749, + master_key='master'): + self.basedir = basedir + self.realm = realm + self.kdc_port = kdc_port + self.kadmin_port = kadmin_port + self.master_key = master_key + + self.kdc_basedir = self.basedir + "/var/krb5kdc" + self.includedir = includedir or (self.kdc_basedir + "/include") + self.kdc_logdir = self.kdc_basedir + "/log" + self.kdc_conf_path = self.kdc_basedir + "/kdc.conf" + self.krb5_conf_path = self.kdc_basedir + "/krb5.conf" + + self.kdc_pid_file = self.kdc_basedir + "/kdc.pid" + + self.acl_file = self.kdc_basedir + "/kadm5.acl" + + self.admin_princ = "admin/admin@" + self.realm + + def start_kdc(self, extra_args=[]): + args = ["krb5kdc", '-P', self.kdc_pid_file] + extra_args + return self._run_in_env(args, self.get_krb5_env()) + + def stop_kdc(self): + try: + with open(self.kdc_pid_file, "r") as pid_file: + os.kill(int(pid_file.read()), signal.SIGTERM) + except IOError as ioex: + if ioex.errno == 2: + pass + else: + raise ioex + + def teardown(self): + self.stop_kdc() + shutil.rmtree(self.kdc_basedir) + + def set_up(self): + self._create_config() + self._create_acl() + self._create_kdb() + + def get_krb5_env(self): + my_env = os.environ + my_env['KRB5_CONFIG'] = self.krb5_conf_path + my_env['KRB5_KDC_PROFILE'] = self.kdc_conf_path + return my_env + + def add_config(self, include_files): + for name, contents in include_files.items(): + include_fpath = os.path.join(self.includedir, name) + with open(include_fpath, 'w') as include_file: + include_file.write(contents) + + def add_principal(self, princ, password=None): + args = ["kadmin.local", "-q"] + if password is None: + args += ["addprinc -randkey %s" % (princ)] + else: + args += ["addprinc -pw %s %s" % (password, princ)] + return self._run_in_env(args, self.get_krb5_env()) + + def _run_in_env(self, args, env): + cmd = subprocess.Popen(args, env=env) + out, err = cmd.communicate() + return cmd.returncode, out, err + + def _create_config(self): + try: + os.makedirs(self.kdc_basedir) + os.makedirs(self.kdc_logdir) + os.makedirs(self.includedir) + except OSError as osex: + if osex.errno == 17: + pass + + kdc_conf = self._format_kdc_conf() + with open(self.kdc_conf_path, 'w') as kdc_conf_file: + kdc_conf_file.write(kdc_conf) + + krb5_conf = self._format_krb5_conf() + with open(self.krb5_conf_path, 'w') as krb5_conf_file: + krb5_conf_file.write(krb5_conf) + + def _create_acl(self): + with open(self.acl_file, 'w') as acl_fobject: + acl_fobject.write(self.admin_princ) + + def _create_kdb(self): + self._run_in_env( + ['kdb5_util', 'create', '-W', '-s', '-P', self.master_key], + self.get_krb5_env() + ) + + def _format_kdc_conf(self): + database_path = self.kdc_basedir + "/principal" + key_stash = self.kdc_basedir + "/stash." + self.realm + + kdc_logfile = "FILE:" + self.kdc_logdir + "/krb5kdc.log" + kadmin_logfile = "FILE:" + self.kdc_logdir + "/kadmin.log" + libkrb5_logfile = "FILE:" + self.kdc_logdir + "/libkrb5.log" + + kdc_conf = unindent(""" + [kdcdefaults] + kdc_ports = {self.kdc_port} + kdc_tcp_ports = {self.kdc_port} + + [realms] + {self.realm} = {{ + kadmind_port = {self.kadmin_port} + database_name = {database_path} + key_stash_file = {key_stash} + acl_file = {self.acl_file} + }} + + [logging] + kdc = {kdc_logfile} + admin_server = {kadmin_logfile} + default = {libkrb5_logfile} + """).format(**locals()) + return kdc_conf + + def _format_krb5_conf(self): + kdc_uri = "localhost:%d" % self.kdc_port + kadmin_uri = "localhost:%d" % self.kadmin_port + + krb5_conf = unindent(""" + includedir {self.includedir} + + [libdefaults] + default_realm = {self.realm} + dns_lookup_kdc = false + dns_lookup_realm = false + + [realms] + {self.realm} = {{ + kdc = {kdc_uri} + admin_server = {kadmin_uri} + }} + """).format(**locals()) + return krb5_conf diff --git a/src/tests/intg/krb5utils.py b/src/tests/intg/krb5utils.py new file mode 100644 index 0000000000000000000000000000000000000000..775cffd0bbfa011f2d8ffc1169dccfef96d78fab --- /dev/null +++ b/src/tests/intg/krb5utils.py @@ -0,0 +1,156 @@ +# +# MIT Kerberos server class +# +# Copyright (c) 2016 Red Hat, Inc. +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import os +import subprocess + + +class NoPrincipals(Exception): + def __init__(self): + Exception.__init__(self, 'No principals in the collection') + + +class PrincNotFound(Exception): + def __init__(self, principal): + Exception.__init__(self, 'Principal %s not found' % principal) + + +class Krb5Utils(object): + """ + Helper class to test Kerberos command line utilities + """ + def __init__(self, krb5_conf_path): + self.krb5_conf_path = krb5_conf_path + + def _run_in_env(self, args, stdin=None, extra_env=None): + my_env = os.environ + my_env['KRB5_CONFIG'] = self.krb5_conf_path + + if 'KRB5CCNAME' in my_env: + del my_env['KRB5CCNAME'] + if extra_env is not None: + my_env.update(extra_env) + + cmd = subprocess.Popen(args, + env=my_env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = cmd.communicate(stdin) + return cmd.returncode, out.decode('utf-8'), err.decode('utf-8') + + def kinit(self, principal, password, env=None): + args = ["kinit", principal] + return self._run_in_env(args, password.encode('utf-8'), env) + + def kvno(self, principal, env=None): + args = ["kvno", principal] + return self._run_in_env(args, env) + + def kdestroy(self, all_ccaches=False, env=None): + args = ["kdestroy"] + if all_ccaches is True: + args += ["-A"] + retval, _, _ = self._run_in_env(args, env) + return retval + + def kswitch(self, principal, env=None): + args = ["kswitch", '-p', principal] + retval, _, _ = self._run_in_env(args, env) + return retval + + def _check_klist_l(self, line, exp_principal, exp_cache): + try: + princ, cache = line.split() + except ValueError: + return False + + if exp_cache is not None and cache != exp_cache: + return False + + if exp_principal != princ: + return False + + return True + + def num_princs(self, env=None): + args = ["klist", "-l"] + retval, out, err = self._run_in_env(args, extra_env=env) + if retval != 0: + return 0 + + outlines = [l for l in out.split('\n') if len(l) > 1] + return len(outlines) - 2 + + def list_princs(self, env=None): + args = ["klist", "-l"] + retval, out, err = self._run_in_env(args, extra_env=env) + if retval == 1: + raise NoPrincipals + elif retval != 0: + raise Exception("klist failed: %d: %s\n", retval, err) + + outlines = out.split('\n') + if len(outlines) < 2: + raise Exception("Not enough output from klist -l") + + return [l for l in outlines[2:] if len(l) > 0] + + def has_principal(self, exp_principal, exp_cache=None, env=None): + try: + princlist = self.list_princs(env) + except NoPrincipals: + return False + + for line in princlist: + matches = self._check_klist_l(line, exp_principal, exp_cache) + if matches is True: + return True + + return False + + def default_principal(self, env=None): + principals = self.list_princs(env) + return principals[0].split()[0] + + def _parse_klist_a(self, out): + dflprinc = None + thisrealm = None + ccache_dict = dict() + + for line in [l for l in out.split('\n') if len(l) > 0]: + if line.startswith("Default principal"): + dflprinc = line.split()[2] + thisrealm = '@' + dflprinc.split('@')[1] + elif thisrealm is not None and line.endswith(thisrealm): + svc = line.split()[-1] + if dflprinc in ccache_dict: + ccache_dict[dflprinc].append(svc) + else: + ccache_dict[dflprinc] = [svc] + + return ccache_dict + + def list_all_princs(self, env=None): + args = ["klist", "-A"] + retval, out, err = self._run_in_env(args, extra_env=env) + if retval == 1: + raise NoPrincipals + elif retval != 0: + raise Exception("klist -A failed: %d: %s\n", retval, err) + + return self._parse_klist_a(out) diff --git a/src/tests/intg/test_kcm.py b/src/tests/intg/test_kcm.py new file mode 100644 index 0000000000000000000000000000000000000000..ad1e4923bfe339cb040464757431d2ef3bf57ce1 --- /dev/null +++ b/src/tests/intg/test_kcm.py @@ -0,0 +1,361 @@ +# +# KCM responder integration tests +# +# Copyright (c) 2016 Red Hat, Inc. +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import os +import os.path +import stat +import subprocess +import pytest +import socket +import time +import signal + +import kdc +import krb5utils +import config +from util import unindent, run_shell + +class KcmTestEnv(object): + def __init__(self, k5kdc, k5util): + self.k5kdc = k5kdc + self.k5util = k5util + self.counter = 0 + + def my_uid(self): + s_myuid = os.environ['NON_WRAPPED_UID'] + return int(s_myuid) + + def ccname(self, my_uid=None): + if my_uid is None: + my_uid = self.my_uid() + + return "KCM:%d" % my_uid + + +@pytest.fixture(scope="module") +def kdc_instance(request): + """Kerberos server instance fixture""" + kdc_instance = kdc.KDC(config.PREFIX, "KCMTEST") + try: + kdc_instance.set_up() + kdc_instance.start_kdc() + except: + kdc_instance.teardown() + raise + request.addfinalizer(kdc_instance.teardown) + return kdc_instance + + +def create_conf_fixture(request, contents): + """Generate sssd.conf and add teardown for removing it""" + with open(config.CONF_PATH, "w") as conf: + conf.write(contents) + os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) + request.addfinalizer(lambda: os.unlink(config.CONF_PATH)) + + +def create_sssd_kcm_fixture(sock_path, request): + if subprocess.call(['sssd', "--genconf"]) != 0: + raise Exception("failed to regenerate confdb") + + resp_path = os.path.join(config.LIBEXEC_PATH, "sssd", "sssd_kcm") + if not os.access(resp_path, os.X_OK): + # It would be cleaner to use pytest.mark.skipif on the package level + # but upstream insists on supporting RHEL-6.. + pytest.skip("No KCM responder, skipping") + + kcm_pid = os.fork() + assert kcm_pid >= 0 + + if kcm_pid == 0: + if subprocess.call([resp_path, "--uid=0", "--gid=0"]) != 0: + print("sssd_kcm failed to start") + sys.exit(99) + else: + abs_sock_path = os.path.join(config.RUNSTATEDIR, sock_path) + sck = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + for _ in range(1, 10): + try: + sck.connect(abs_sock_path) + except: + time.sleep(0.1) + else: + break + sck.close() + assert os.path.exists(abs_sock_path) + + def kcm_teardown(): + if kcm_pid == 0: + return + os.kill(kcm_pid, signal.SIGTERM) + + request.addfinalizer(kcm_teardown) + return kcm_pid + + +@pytest.fixture +def setup_for_kcm(request, kdc_instance): + """ + Just set up the local provider for tests and enable the KCM + responder + """ + kcm_path = os.path.join(config.RUNSTATEDIR, "kcm.socket") + + sssd_conf = unindent("""\ + [sssd] + domains = local + services = nss + + [domain/local] + id_provider = local + + [kcm] + socket_path = {kcm_path} + """).format(**locals()) + + kcm_socket_include = unindent(""" + [libdefaults] + default_ccache_name = KCM: + kcm_socket = {kcm_path} + """).format(**locals()) + kdc_instance.add_config({'kcm_socket': kcm_socket_include}) + + create_conf_fixture(request, sssd_conf) + create_sssd_kcm_fixture(kcm_path, request) + + k5util = krb5utils.Krb5Utils(kdc_instance.krb5_conf_path) + + return KcmTestEnv(kdc_instance, k5util) + + +def test_kcm_init_list_destroy(setup_for_kcm): + """ + Test that kinit, kdestroy and klist work with KCM + """ + testenv = setup_for_kcm + testenv.k5kdc.add_principal("kcmtest", "Secret123") + + ok = testenv.k5util.has_principal("kcmtest@KCMTEST") + assert ok is False + nprincs = testenv.k5util.num_princs() + assert nprincs == 0 + + out, _, _ = testenv.k5util.kinit("kcmtest", "Secret123") + assert out == 0 + nprincs = testenv.k5util.num_princs() + assert nprincs == 1 + + exp_ccname = testenv.ccname() + ok = testenv.k5util.has_principal("kcmtest@KCMTEST", exp_ccname) + assert ok is True + + out = testenv.k5util.kdestroy() + assert out == 0 + + ok = testenv.k5util.has_principal("kcmtest@KCMTEST") + assert ok is False + nprincs = testenv.k5util.num_princs() + assert nprincs == 0 + + +def test_kcm_overwrite(setup_for_kcm): + """ + That that reusing a ccache reinitializes the cache and doesn't + add the same principal twice + """ + testenv = setup_for_kcm + testenv.k5kdc.add_principal("kcmtest", "Secret123") + exp_ccache = {'kcmtest@KCMTEST': ['krbtgt/KCMTEST@KCMTEST']} + + assert testenv.k5util.num_princs() == 0 + + out, _, _ = testenv.k5util.kinit("kcmtest", "Secret123") + assert out == 0 + assert exp_ccache == testenv.k5util.list_all_princs() + + out, _, _ = testenv.k5util.kinit("kcmtest", "Secret123") + assert out == 0 + assert exp_ccache == testenv.k5util.list_all_princs() + + +def test_collection_init_list_destroy(setup_for_kcm): + """ + Test that multiple principals and service tickets can be stored + in a collection. + """ + testenv = setup_for_kcm + testenv.k5kdc.add_principal("alice", "alicepw") + testenv.k5kdc.add_principal("bob", "bobpw") + testenv.k5kdc.add_principal("carol", "carolpw") + testenv.k5kdc.add_principal("host/somehostname") + + assert testenv.k5util.num_princs() == 0 + + out, _, _ = testenv.k5util.kinit("alice", "alicepw") + assert out == 0 + assert testenv.k5util.default_principal() == 'alice@KCMTEST' + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 1 + assert cc_coll['alice@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert 'bob@KCMTEST' not in cc_coll + assert 'carol@KCMTEST' not in cc_coll + + out, _, _ = testenv.k5util.kinit("bob", "bobpw") + assert out == 0 + assert testenv.k5util.default_principal() == 'bob@KCMTEST' + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 2 + assert cc_coll['alice@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert cc_coll['bob@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert 'carol@KCMTEST' not in cc_coll + + out, _, _ = testenv.k5util.kinit("carol", "carolpw") + assert out == 0 + assert testenv.k5util.default_principal() == 'carol@KCMTEST' + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 3 + assert cc_coll['alice@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert cc_coll['bob@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert cc_coll['carol@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + + out, _, _ = testenv.k5util.kvno('host/somehostname') + assert out == 0 + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 3 + assert set(cc_coll['carol@KCMTEST']) == set(['krbtgt/KCMTEST@KCMTEST', + 'host/somehostname@KCMTEST']) + + out = testenv.k5util.kdestroy() + assert out == 0 + assert testenv.k5util.default_principal() == 'bob@KCMTEST' + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 2 + assert cc_coll['alice@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert cc_coll['bob@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert 'carol@KCMTEST' not in cc_coll + + # FIXME - a bug in libkrb5? + #out = testenv.k5util.kdestroy(all_ccaches=True) + #assert out == 0 + #cc_coll = testenv.k5util.list_all_princs() + #assert len(cc_coll) == 0 + + +def test_kswitch(setup_for_kcm): + """ + Test switching between principals + """ + testenv = setup_for_kcm + testenv.k5kdc.add_principal("alice", "alicepw") + testenv.k5kdc.add_principal("bob", "bobpw") + testenv.k5kdc.add_principal("host/somehostname") + testenv.k5kdc.add_principal("host/differenthostname") + + out, _, _ = testenv.k5util.kinit("alice", "alicepw") + assert out == 0 + assert testenv.k5util.default_principal() == 'alice@KCMTEST' + + out, _, _ = testenv.k5util.kinit("bob", "bobpw") + assert out == 0 + assert testenv.k5util.default_principal() == 'bob@KCMTEST' + + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 2 + assert cc_coll['alice@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + assert cc_coll['bob@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + + out = testenv.k5util.kswitch("alice@KCMTEST") + assert testenv.k5util.default_principal() == 'alice@KCMTEST' + out, _, _ = testenv.k5util.kvno('host/somehostname') + assert out == 0 + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 2 + assert set(cc_coll['alice@KCMTEST']) == set(['krbtgt/KCMTEST@KCMTEST', + 'host/somehostname@KCMTEST']) + assert cc_coll['bob@KCMTEST'] == ['krbtgt/KCMTEST@KCMTEST'] + + out = testenv.k5util.kswitch("bob@KCMTEST") + assert testenv.k5util.default_principal() == 'bob@KCMTEST' + out, _, _ = testenv.k5util.kvno('host/differenthostname') + assert out == 0 + cc_coll = testenv.k5util.list_all_princs() + assert len(cc_coll) == 2 + assert set(cc_coll['alice@KCMTEST']) == set(['krbtgt/KCMTEST@KCMTEST', + 'host/somehostname@KCMTEST']) + assert set(cc_coll['bob@KCMTEST']) == set([ + 'krbtgt/KCMTEST@KCMTEST', + 'host/differenthostname@KCMTEST']) + + +def test_subsidiaries(setup_for_kcm): + """ + Test that subsidiary caches are usable and KCM: without specifying UID + can be used to identify the collection + """ + testenv = setup_for_kcm + testenv.k5kdc.add_principal("alice", "alicepw") + testenv.k5kdc.add_principal("bob", "bobpw") + testenv.k5kdc.add_principal("host/somehostname") + testenv.k5kdc.add_principal("host/differenthostname") + + out, _, _ = testenv.k5util.kinit("alice", "alicepw") + assert out == 0 + out, _, _ = testenv.k5util.kvno('host/somehostname') + + out, _, _ = testenv.k5util.kinit("bob", "bobpw") + assert out == 0 + out, _, _ = testenv.k5util.kvno('host/differenthostname') + + exp_cc_coll = dict() + exp_cc_coll['alice@KCMTEST'] = 'host/somehostname@KCMTEST' + exp_cc_coll['bob@KCMTEST'] = 'host/differenthostname@KCMTEST' + + klist_l = testenv.k5util.list_princs() + princ_ccache = dict() + for line in klist_l: + princ, subsidiary = line.split() + princ_ccache[princ] = subsidiary + + for princ, subsidiary in princ_ccache.items(): + env = {'KRB5CCNAME': subsidiary} + cc_coll = testenv.k5util.list_all_princs(env=env) + assert len(cc_coll) == 1 + assert princ in cc_coll + assert exp_cc_coll[princ] in cc_coll[princ] + + cc_coll = testenv.k5util.list_all_princs(env={'KRB5CCNAME': 'KCM:'}) + assert len(cc_coll) == 2 + assert set(cc_coll['alice@KCMTEST']) == set(['krbtgt/KCMTEST@KCMTEST', + 'host/somehostname@KCMTEST']) + assert set(cc_coll['bob@KCMTEST']) == set([ + 'krbtgt/KCMTEST@KCMTEST', + 'host/differenthostname@KCMTEST']) + + +def test_kdestroy_nocache(setup_for_kcm): + """ + Destroying a non-existing ccache should not throw an error + """ + testenv = setup_for_kcm + testenv.k5kdc.add_principal("alice", "alicepw") + out, _, _ = testenv.k5util.kinit("alice", "alicepw") + assert out == 0 + + testenv.k5util.kdestroy() + assert out == 0 + out = testenv.k5util.kdestroy() + assert out == 0 -- 2.9.3