sailesh1993 / rpms / cloud-init

Forked from rpms/cloud-init 2 years ago
Blob Blame History Raw
From 3b68aff3b7b1dc567ef6721a269c2d4e054b729f Mon Sep 17 00:00:00 2001
From: Emanuele Giuseppe Esposito <>
Date: Mon, 9 Aug 2021 23:41:44 +0200
Subject: [PATCH] Stop copying ssh system keys and check folder permissions

RH-Author: Emanuele Giuseppe Esposito <>
RH-MergeRequest: 28: Stop copying ssh system keys and check folder permissions (#956)
RH-Commit: [1/1] 7cada613be82f2f525ee56b86ef9f71edf40d2ef (eesposit/cloud-init)
RH-Bugzilla: 1862967
RH-Acked-by: Miroslav Rezanina <>
RH-Acked-by: Eduardo Otubo <>

TESTED: By me and QA
BREW: 38818284

This is a continuation of previous MR 25 and upstream PR #937.
There were still issues when using non-standard file paths like
/etc/ssh/userkeys/%u or /etc/ssh/authorized_keys, and the choice
of storing the keys of all authorized_keys files into a single
one was not ideal. This fix modifies cloudinit to support
all different cases of authorized_keys file locations, and
picks a user-specific file where to copy the new keys that
complies with ssh permissions.

commit 00dbaf1e9ab0e59d81662f0f3561897bef499a3f
Author: Emanuele Giuseppe Esposito <>
Date:   Mon Aug 9 16:49:56 2021 +0200

    Stop copying ssh system keys and check folder permissions (#956)

    In /etc/ssh/sshd_config, it is possible to define a custom
    authorized_keys file that will contain the keys allowed to access the
    machine via the AuthorizedKeysFile option. Cloudinit is able to add
    user-specific keys to the existing ones, but we need to be careful on
    which of the authorized_keys files listed to pick.
    Chosing a file that is shared by all user will cause security
    issues, because the owner of that key can then access also other users.

    We therefore pick an authorized_keys file only if it satisfies the
    following conditions:
    1. it is not a "global" file, ie it must be defined in
       AuthorizedKeysFile with %u, %h or be in  /home/<user>. This avoids
       security issues.
    2. it must comply with ssh permission requirements, otherwise the ssh
       agent won't use that file.

    If it doesn't meet either of those conditions, write to

    We also need to consider the case when the chosen authorized_keys file
    does not exist. In this case, the existing behavior of cloud-init is
    to create the new file. We therefore need to be sure that the file
    complies with ssh permissions too, by setting:
    - the actual file to permission 600, and owned by the user
    - the directories in the path that do not exist must be root owned and
      with permission 755.

Signed-off-by: Emanuele Giuseppe Esposito <>
 cloudinit/           | 133 ++++-
 cloudinit/               |  51 +-
 tests/unittests/ | 952 +++++++++++++++++++++++++-------
 3 files changed, 920 insertions(+), 216 deletions(-)

diff --git a/cloudinit/ b/cloudinit/
index 89057262..b8a3c8f7 100644
--- a/cloudinit/
+++ b/cloudinit/
@@ -249,6 +249,113 @@ def render_authorizedkeysfile_paths(value, homedir, username):
     return rendered
+# Inspired from safe_path() in openssh source code (misc.c).
+def check_permissions(username, current_path, full_path, is_file, strictmodes):
+    """Check if the file/folder in @current_path has the right permissions.
+    We need to check that:
+    1. If StrictMode is enabled, the owner is either root or the user
+    2. the user can access the file/folder, otherwise ssh won't use it
+    3. If StrictMode is enabled, no write permission is given to group
+       and world users (022)
+    """
+    # group/world can only execute the folder (access)
+    minimal_permissions = 0o711
+    if is_file:
+        # group/world can only read the file
+        minimal_permissions = 0o644
+    # 1. owner must be either root or the user itself
+    owner = util.get_owner(current_path)
+    if strictmodes and owner != username and owner != "root":
+        LOG.debug("Path %s in %s must be own by user %s or"
+                  " by root, but instead is own by %s. Ignoring key.",
+                  current_path, full_path, username, owner)
+        return False
+    parent_permission = util.get_permissions(current_path)
+    # 2. the user can access the file/folder, otherwise ssh won't use it
+    if owner == username:
+        # need only the owner permissions
+        minimal_permissions &= 0o700
+    else:
+        group_owner = util.get_group(current_path)
+        user_groups = util.get_user_groups(username)
+        if group_owner in user_groups:
+            # need only the group permissions
+            minimal_permissions &= 0o070
+        else:
+            # need only the world permissions
+            minimal_permissions &= 0o007
+    if parent_permission & minimal_permissions == 0:
+        LOG.debug("Path %s in %s must be accessible by user %s,"
+                  " check its permissions",
+                  current_path, full_path, username)
+        return False
+    # 3. no write permission (w) is given to group and world users (022)
+    # Group and world user can still have +rx.
+    if strictmodes and parent_permission & 0o022 != 0:
+        LOG.debug("Path %s in %s must not give write"
+                  "permission to group or world users. Ignoring key.",
+                  current_path, full_path)
+        return False
+    return True
+def check_create_path(username, filename, strictmodes):
+    user_pwent = users_ssh_info(username)[1]
+    root_pwent = users_ssh_info("root")[1]
+    try:
+        # check the directories first
+        directories = filename.split("/")[1:-1]
+        # scan in order, from root to file name
+        parent_folder = ""
+        # this is to comply also with unit tests, and
+        # strange home directories
+        home_folder = os.path.dirname(user_pwent.pw_dir)
+        for directory in directories:
+            parent_folder += "/" + directory
+            if home_folder.startswith(parent_folder):
+                continue
+            if not os.path.isdir(parent_folder):
+                # directory does not exist, and permission so far are good:
+                # create the directory, and make it accessible by everyone
+                # but owned by root, as it might be used by many users.
+                with util.SeLinuxGuard(parent_folder):
+                    os.makedirs(parent_folder, mode=0o755, exist_ok=True)
+                    util.chownbyid(parent_folder, root_pwent.pw_uid,
+                                   root_pwent.pw_gid)
+            permissions = check_permissions(username, parent_folder,
+                                            filename, False, strictmodes)
+            if not permissions:
+                return False
+        # check the file
+        if not os.path.exists(filename):
+            # if file does not exist: we need to create it, since the
+            # folders at this point exist and have right permissions
+            util.write_file(filename, '', mode=0o600, ensure_dir_exists=True)
+            util.chownbyid(filename, user_pwent.pw_uid, user_pwent.pw_gid)
+        permissions = check_permissions(username, filename,
+                                        filename, True, strictmodes)
+        if not permissions:
+            return False
+    except (IOError, OSError) as e:
+        util.logexc(LOG, str(e))
+        return False
+    return True
 def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
     (ssh_dir, pw_ent) = users_ssh_info(username)
     default_authorizedkeys_file = os.path.join(ssh_dir, 'authorized_keys')
@@ -259,6 +366,7 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
             ssh_cfg = parse_ssh_config_map(sshd_cfg_file)
             key_paths = ssh_cfg.get("authorizedkeysfile",
+            strictmodes = ssh_cfg.get("strictmodes", "yes")
             auth_key_fns = render_authorizedkeysfile_paths(
                 key_paths, pw_ent.pw_dir, username)
@@ -269,31 +377,31 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
                         "config from %r, using 'AuthorizedKeysFile' file "
                         "%r instead", DEF_SSHD_CFG, auth_key_fns[0])
-    # check if one of the keys is the user's one
+    # check if one of the keys is the user's one and has the right permissions
     for key_path, auth_key_fn in zip(key_paths.split(), auth_key_fns):
         if any([
             '%u' in key_path,
             '%h' in key_path,
-            user_authorizedkeys_file = auth_key_fn
+            permissions_ok = check_create_path(username, auth_key_fn,
+                                               strictmodes == "yes")
+            if permissions_ok:
+                user_authorizedkeys_file = auth_key_fn
+                break
     if user_authorizedkeys_file != default_authorizedkeys_file:
             "AuthorizedKeysFile has an user-specific authorized_keys, "
             "using %s", user_authorizedkeys_file)
-    # always store all the keys in the user's private file
-    return (user_authorizedkeys_file, parse_authorized_keys(auth_key_fns))
+    return (
+        user_authorizedkeys_file,
+        parse_authorized_keys([user_authorizedkeys_file])
+    )
 def setup_user_keys(keys, username, options=None):
-    # Make sure the users .ssh dir is setup accordingly
-    (ssh_dir, pwent) = users_ssh_info(username)
-    if not os.path.isdir(ssh_dir):
-        util.ensure_dir(ssh_dir, mode=0o700)
-        util.chownbyid(ssh_dir, pwent.pw_uid, pwent.pw_gid)
     # Turn the 'update' keys given into actual entries
     parser = AuthKeyLineParser()
     key_entries = []
@@ -302,11 +410,10 @@ def setup_user_keys(keys, username, options=None):
     # Extract the old and make the new
     (auth_key_fn, auth_key_entries) = extract_authorized_keys(username)
+    ssh_dir = os.path.dirname(auth_key_fn)
     with util.SeLinuxGuard(ssh_dir, recursive=True):
         content = update_authorized_keys(auth_key_entries, key_entries)
-        util.ensure_dir(os.path.dirname(auth_key_fn), mode=0o700)
-        util.write_file(auth_key_fn, content, mode=0o600)
-        util.chownbyid(auth_key_fn, pwent.pw_uid, pwent.pw_gid)
+        util.write_file(auth_key_fn, content, preserve_mode=True)
 class SshdConfigLine(object):
diff --git a/cloudinit/ b/cloudinit/
index 4e0a72db..343976ad 100644
--- a/cloudinit/
+++ b/cloudinit/
@@ -35,6 +35,7 @@ from base64 import b64decode, b64encode
 from errno import ENOENT
 from functools import lru_cache
 from urllib import parse
+from typing import List
 from cloudinit import importer
 from cloudinit import log as logging
@@ -1830,6 +1831,53 @@ def chmod(path, mode):
             os.chmod(path, real_mode)
+def get_permissions(path: str) -> int:
+    """
+    Returns the octal permissions of the file/folder pointed by the path,
+    encoded as an int.
+    @param path: The full path of the file/folder.
+    """
+    return stat.S_IMODE(os.stat(path).st_mode)
+def get_owner(path: str) -> str:
+    """
+    Returns the owner of the file/folder pointed by the path.
+    @param path: The full path of the file/folder.
+    """
+    st = os.stat(path)
+    return pwd.getpwuid(st.st_uid).pw_name
+def get_group(path: str) -> str:
+    """
+    Returns the group of the file/folder pointed by the path.
+    @param path: The full path of the file/folder.
+    """
+    st = os.stat(path)
+    return grp.getgrgid(st.st_gid).gr_name
+def get_user_groups(username: str) -> List[str]:
+    """
+    Returns a list of all groups to which the user belongs
+    @param username: the user we want to check
+    """
+    groups = []
+    for group in grp.getgrall():
+        if username in group.gr_mem:
+            groups.append(group.gr_name)
+    gid = pwd.getpwnam(username).pw_gid
+    groups.append(grp.getgrgid(gid).gr_name)
+    return groups
 def write_file(
@@ -1856,8 +1904,7 @@ def write_file(
     if preserve_mode:
-            file_stat = os.stat(filename)
-            mode = stat.S_IMODE(file_stat.st_mode)
+            mode = get_permissions(filename)
         except OSError:
diff --git a/tests/unittests/ b/tests/unittests/
index bcb8044f..a66788bf 100644
--- a/tests/unittests/
+++ b/tests/unittests/
@@ -1,6 +1,9 @@
 # This file is part of cloud-init. See LICENSE file for license information.
+import os
 from collections import namedtuple
+from functools import partial
 from unittest.mock import patch
 from cloudinit import ssh_util
@@ -8,13 +11,48 @@ from cloudinit.tests import helpers as test_helpers
 from cloudinit import util
-FakePwEnt = namedtuple(
-    'FakePwEnt',
-    ['pw_dir', 'pw_gecos', 'pw_name', 'pw_passwd', 'pw_shell', 'pwd_uid'])
+FakePwEnt = namedtuple('FakePwEnt', [
+    'pw_name',
+    'pw_passwd',
+    'pw_uid',
+    'pw_gid',
+    'pw_gecos',
+    'pw_dir',
+    'pw_shell',
 FakePwEnt.__new__.__defaults__ = tuple(
     "UNSET_%s" % n for n in FakePwEnt._fields)
+def mock_get_owner(updated_permissions, value):
+    try:
+        return updated_permissions[value][0]
+    except ValueError:
+        return util.get_owner(value)
+def mock_get_group(updated_permissions, value):
+    try:
+        return updated_permissions[value][1]
+    except ValueError:
+        return util.get_group(value)
+def mock_get_user_groups(username):
+    return username
+def mock_get_permissions(updated_permissions, value):
+    try:
+        return updated_permissions[value][2]
+    except ValueError:
+        return util.get_permissions(value)
+def mock_getpwnam(users, username):
+    return users[username]
 # Do not use these public keys, most of them are fetched from
 # the testdata for OpenSSH, and their private keys are available
@@ -552,12 +590,30 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase):
                 "/opt/%u/keys", "/home/bobby", "bobby"))
+    def test_user_file(self):
+        self.assertEqual(
+            ["/opt/bobby"],
+            ssh_util.render_authorizedkeysfile_paths(
+                "/opt/%u", "/home/bobby", "bobby"))
+    def test_user_file2(self):
+        self.assertEqual(
+            ["/opt/bobby/bobby"],
+            ssh_util.render_authorizedkeysfile_paths(
+                "/opt/%u/%u", "/home/bobby", "bobby"))
     def test_multiple(self):
             ["/keys/path1", "/keys/path2"],
                 "/keys/path1 /keys/path2", "/home/bobby", "bobby"))
+    def test_multiple2(self):
+        self.assertEqual(
+            ["/keys/path1", "/keys/bobby"],
+            ssh_util.render_authorizedkeysfile_paths(
+                "/keys/path1 /keys/%u", "/home/bobby", "bobby"))
     def test_relative(self):
@@ -581,269 +637,763 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase):
 class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase):
-    @patch("cloudinit.ssh_util.pwd.getpwnam")
-    def test_multiple_authorizedkeys_file_order1(self, m_getpwnam):
-        fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
-        m_getpwnam.return_value = fpw
-        user_ssh_folder = "%s/.ssh" % fpw.pw_dir
-        # /tmp/home2/bobby/.ssh/authorized_keys = rsa
-        authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder)
-        util.write_file(authorized_keys, VALID_CONTENT['rsa'])
-        # /tmp/home2/bobby/.ssh/user_keys = dsa
-        user_keys = self.tmp_path('user_keys', dir=user_ssh_folder)
-        util.write_file(user_keys, VALID_CONTENT['dsa'])
-        # /tmp/sshd_config
+    def create_fake_users(self, names, mock_permissions,
+                          m_get_group, m_get_owner, m_get_permissions,
+                          m_getpwnam, users):
+        homes = []
+        root = '/tmp/root'
+        fpw = FakePwEnt(pw_name="root", pw_dir=root)
+        users["root"] = fpw
+        for name in names:
+            home = '/tmp/home/' + name
+            fpw = FakePwEnt(pw_name=name, pw_dir=home)
+            users[name] = fpw
+            homes.append(home)
+        m_get_permissions.side_effect = partial(
+            mock_get_permissions, mock_permissions)
+        m_get_owner.side_effect = partial(mock_get_owner, mock_permissions)
+        m_get_group.side_effect = partial(mock_get_group, mock_permissions)
+        m_getpwnam.side_effect = partial(mock_getpwnam, users)
+        return homes
+    def create_user_authorized_file(self, home, filename, content_key, keys):
+        user_ssh_folder = "%s/.ssh" % home
+        # /tmp/home/<user>/.ssh/authorized_keys = content_key
+        authorized_keys = self.tmp_path(filename, dir=user_ssh_folder)
+        util.write_file(authorized_keys, VALID_CONTENT[content_key])
+        keys[authorized_keys] = content_key
+        return authorized_keys
+    def create_global_authorized_file(self, filename, content_key, keys):
+        authorized_keys = self.tmp_path(filename, dir='/tmp')
+        util.write_file(authorized_keys, VALID_CONTENT[content_key])
+        keys[authorized_keys] = content_key
+        return authorized_keys
+    def create_sshd_config(self, authorized_keys_files):
         sshd_config = self.tmp_path('sshd_config', dir="/tmp")
-            "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys)
+            "AuthorizedKeysFile " + authorized_keys_files
+        return sshd_config
+    def execute_and_check(self, user, sshd_config, solution, keys,
+                          delete_keys=True):
         (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw.pw_name, sshd_config)
+            user, sshd_config)
         content = ssh_util.update_authorized_keys(auth_key_entries, [])
-        self.assertEqual(user_keys, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['rsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
+        self.assertEqual(auth_key_fn, solution)
+        for path, key in keys.items():
+            if path == solution:
+                self.assertTrue(VALID_CONTENT[key] in content)
+            else:
+                self.assertFalse(VALID_CONTENT[key] in content)
+        if delete_keys and os.path.isdir("/tmp/home/"):
+            util.delete_dir_contents("/tmp/home/")
-    def test_multiple_authorizedkeys_file_order2(self, m_getpwnam):
-        fpw = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie')
-        m_getpwnam.return_value = fpw
-        user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_single_user_two_local_files(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        user_bobby = 'bobby'
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+        }
+        homes = self.create_fake_users(
+            [user_bobby], mock_permissions, m_get_group, m_get_owner,
+            m_get_permissions, m_getpwnam, users
+        )
+        home = homes[0]
-        # /tmp/home/suzie/.ssh/authorized_keys = rsa
-        authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder)
-        util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home, 'authorized_keys', 'rsa', keys
+        )
-        # /tmp/home/suzie/.ssh/user_keys = dsa
-        user_keys = self.tmp_path('user_keys', dir=user_ssh_folder)
-        util.write_file(user_keys, VALID_CONTENT['dsa'])
+        # /tmp/home/bobby/.ssh/user_keys = dsa
+        user_keys = self.create_user_authorized_file(
+            home, 'user_keys', 'dsa', keys
+        )
         # /tmp/sshd_config
-        sshd_config = self.tmp_path('sshd_config', dir="/tmp")
-        util.write_file(
-            sshd_config,
-            "AuthorizedKeysFile %s %s" % (user_keys, authorized_keys)
+        options = "%s %s" % (authorized_keys, user_keys)
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_single_user_two_local_files_inverted(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        user_bobby = 'bobby'
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+        }
+        homes = self.create_fake_users(
+            [user_bobby], mock_permissions, m_get_group, m_get_owner,
+            m_get_permissions, m_getpwnam, users
+        home = homes[0]
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home, 'authorized_keys', 'rsa', keys
+        )
-        self.assertEqual(authorized_keys, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['rsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
+        # /tmp/home/bobby/.ssh/user_keys = dsa
+        user_keys = self.create_user_authorized_file(
+            home, 'user_keys', 'dsa', keys
+        )
-    @patch("cloudinit.ssh_util.pwd.getpwnam")
-    def test_multiple_authorizedkeys_file_local_global(self, m_getpwnam):
-        fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
-        m_getpwnam.return_value = fpw
-        user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+        # /tmp/sshd_config
+        options = "%s %s" % (user_keys, authorized_keys)
+        sshd_config = self.create_sshd_config(options)
-        # /tmp/home2/bobby/.ssh/authorized_keys = rsa
-        authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder)
-        util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+        self.execute_and_check(user_bobby, sshd_config, user_keys, keys)
-        # /tmp/home2/bobby/.ssh/user_keys = dsa
-        user_keys = self.tmp_path('user_keys', dir=user_ssh_folder)
-        util.write_file(user_keys, VALID_CONTENT['dsa'])
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_single_user_local_global_files(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        user_bobby = 'bobby'
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+        }
+        homes = self.create_fake_users(
+            [user_bobby], mock_permissions, m_get_group, m_get_owner,
+            m_get_permissions, m_getpwnam, users
+        )
+        home = homes[0]
-        # /tmp/etc/ssh/authorized_keys = ecdsa
-        authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys',
-                                               dir="/tmp")
-        util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home, 'authorized_keys', 'rsa', keys
+        )
-        # /tmp/sshd_config
-        sshd_config = self.tmp_path('sshd_config', dir="/tmp")
-        util.write_file(
-            sshd_config,
-            "AuthorizedKeysFile %s %s %s" % (authorized_keys_global,
-                                             user_keys, authorized_keys)
+        # /tmp/home/bobby/.ssh/user_keys = dsa
+        user_keys = self.create_user_authorized_file(
+            home, 'user_keys', 'dsa', keys
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+        authorized_keys_global = self.create_global_authorized_file(
+            'etc/ssh/authorized_keys', 'ecdsa', keys
+        )
-        self.assertEqual(authorized_keys, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['rsa'] in content)
-        self.assertTrue(VALID_CONTENT['ecdsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
+        options = "%s %s %s" % (authorized_keys_global, user_keys,
+                                authorized_keys)
+        sshd_config = self.create_sshd_config(options)
-    @patch("cloudinit.ssh_util.pwd.getpwnam")
-    def test_multiple_authorizedkeys_file_local_global2(self, m_getpwnam):
-        fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
-        m_getpwnam.return_value = fpw
-        user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+        self.execute_and_check(user_bobby, sshd_config, user_keys, keys)
-        # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa
-        authorized_keys = self.tmp_path('authorized_keys2',
-                                        dir=user_ssh_folder)
-        util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_single_user_local_global_files_inverted(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        user_bobby = 'bobby'
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600),
+            '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600),
+        }
+        homes = self.create_fake_users(
+            [user_bobby], mock_permissions, m_get_group, m_get_owner,
+            m_get_permissions, m_getpwnam, users
+        )
+        home = homes[0]
-        # /tmp/home2/bobby/.ssh/user_keys3 = dsa
-        user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder)
-        util.write_file(user_keys, VALID_CONTENT['dsa'])
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home, 'authorized_keys2', 'rsa', keys
+        )
-        # /tmp/etc/ssh/authorized_keys = ecdsa
-        authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys',
-                                               dir="/tmp")
-        util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+        # /tmp/home/bobby/.ssh/user_keys = dsa
+        user_keys = self.create_user_authorized_file(
+            home, 'user_keys3', 'dsa', keys
+        )
-        # /tmp/sshd_config
-        sshd_config = self.tmp_path('sshd_config', dir="/tmp")
-        util.write_file(
-            sshd_config,
-            "AuthorizedKeysFile %s %s %s" % (authorized_keys_global,
-                                             authorized_keys, user_keys)
+        authorized_keys_global = self.create_global_authorized_file(
+            'etc/ssh/authorized_keys', 'ecdsa', keys
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+        options = "%s %s %s" % (authorized_keys_global, authorized_keys,
+                                user_keys)
+        sshd_config = self.create_sshd_config(options)
-        self.assertEqual(user_keys, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['rsa'] in content)
-        self.assertTrue(VALID_CONTENT['ecdsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
+        self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
-    def test_multiple_authorizedkeys_file_global(self, m_getpwnam):
-        fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
-        m_getpwnam.return_value = fpw
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_single_user_global_file(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        user_bobby = 'bobby'
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+        }
+        homes = self.create_fake_users(
+            [user_bobby], mock_permissions, m_get_group, m_get_owner,
+            m_get_permissions, m_getpwnam, users
+        )
+        home = homes[0]
         # /tmp/etc/ssh/authorized_keys = rsa
-        authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys',
-                                               dir="/tmp")
-        util.write_file(authorized_keys_global, VALID_CONTENT['rsa'])
+        authorized_keys_global = self.create_global_authorized_file(
+            'etc/ssh/authorized_keys', 'rsa', keys
+        )
-        # /tmp/sshd_config
-        sshd_config = self.tmp_path('sshd_config')
-        util.write_file(
-            sshd_config,
-            "AuthorizedKeysFile %s" % (authorized_keys_global)
+        options = "%s" % authorized_keys_global
+        sshd_config = self.create_sshd_config(options)
+        default = "%s/.ssh/authorized_keys" % home
+        self.execute_and_check(user_bobby, sshd_config, default, keys)
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_local_file_standard(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/home/suzie': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_suzie = 'suzie'
+        homes = self.create_fake_users(
+            [user_bobby, user_suzie], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        home_bobby = homes[0]
+        home_suzie = homes[1]
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home_bobby, 'authorized_keys', 'rsa', keys
+        )
-        self.assertEqual("%s/.ssh/authorized_keys" % fpw.pw_dir, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['rsa'] in content)
+        # /tmp/home/suzie/.ssh/authorized_keys = rsa
+        authorized_keys2 = self.create_user_authorized_file(
+            home_suzie, 'authorized_keys', '', keys
+        )
+        options = ".ssh/authorized_keys"
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(
+            user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+        )
+        self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
-    def test_multiple_authorizedkeys_file_multiuser(self, m_getpwnam):
-        fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
-        m_getpwnam.return_value = fpw
-        user_ssh_folder = "%s/.ssh" % fpw.pw_dir
-        # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa
-        authorized_keys = self.tmp_path('authorized_keys2',
-                                        dir=user_ssh_folder)
-        util.write_file(authorized_keys, VALID_CONTENT['rsa'])
-        # /tmp/home2/bobby/.ssh/user_keys3 = dsa
-        user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder)
-        util.write_file(user_keys, VALID_CONTENT['dsa'])
-        fpw2 = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie')
-        user_ssh_folder = "%s/.ssh" % fpw2.pw_dir
-        # /tmp/home/suzie/.ssh/authorized_keys2 =
-        authorized_keys2 = self.tmp_path('authorized_keys2',
-                                         dir=user_ssh_folder)
-        util.write_file(authorized_keys2,
-                        VALID_CONTENT[''])
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_local_file_custom(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600),
+            '/tmp/home/suzie': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh/authorized_keys2': ('suzie', 'suzie', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_suzie = 'suzie'
+        homes = self.create_fake_users(
+            [user_bobby, user_suzie], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        home_bobby = homes[0]
+        home_suzie = homes[1]
-        # /tmp/etc/ssh/authorized_keys = ecdsa
-        authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2',
-                                               dir="/tmp")
-        util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+        # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home_bobby, 'authorized_keys2', 'rsa', keys
+        )
-        # /tmp/sshd_config
-        sshd_config = self.tmp_path('sshd_config', dir="/tmp")
-        util.write_file(
-            sshd_config,
-            "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s" %
-            (authorized_keys_global, user_keys)
+        # /tmp/home/suzie/.ssh/authorized_keys2 = rsa
+        authorized_keys2 = self.create_user_authorized_file(
+            home_suzie, 'authorized_keys2', '', keys
-        # process first user
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+        options = ".ssh/authorized_keys2"
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(
+            user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+        )
+        self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
-        self.assertEqual(user_keys, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['rsa'] in content)
-        self.assertTrue(VALID_CONTENT['ecdsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
-        self.assertFalse(VALID_CONTENT[''] in content)
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_local_global_files(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600),
+            '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600),
+            '/tmp/home/suzie': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh/authorized_keys2': ('suzie', 'suzie', 0o600),
+            '/tmp/home/suzie/.ssh/user_keys3': ('suzie', 'suzie', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_suzie = 'suzie'
+        homes = self.create_fake_users(
+            [user_bobby, user_suzie], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        home_bobby = homes[0]
+        home_suzie = homes[1]
-        m_getpwnam.return_value = fpw2
-        # process second user
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw2.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+        # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+        self.create_user_authorized_file(
+            home_bobby, 'authorized_keys2', 'rsa', keys
+        )
+        # /tmp/home/bobby/.ssh/user_keys3 = dsa
+        user_keys = self.create_user_authorized_file(
+            home_bobby, 'user_keys3', 'dsa', keys
+        )
+        # /tmp/home/suzie/.ssh/authorized_keys2 = rsa
+        authorized_keys2 = self.create_user_authorized_file(
+            home_suzie, 'authorized_keys2', '', keys
+        )
+        # /tmp/etc/ssh/authorized_keys = ecdsa
+        authorized_keys_global = self.create_global_authorized_file(
+            'etc/ssh/authorized_keys2', 'ecdsa', keys
+        )
+        options = "%s %s %%h/.ssh/authorized_keys2" % \
+            (authorized_keys_global, user_keys)
+        sshd_config = self.create_sshd_config(options)
-        self.assertEqual(authorized_keys2, auth_key_fn)
-        self.assertTrue(VALID_CONTENT[''] in content)
-        self.assertTrue(VALID_CONTENT['ecdsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
-        self.assertFalse(VALID_CONTENT['rsa'] in content)
+        self.execute_and_check(
+            user_bobby, sshd_config, user_keys, keys, delete_keys=False
+        )
+        self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+    @patch("cloudinit.util.get_user_groups")
-    def test_multiple_authorizedkeys_file_multiuser2(self, m_getpwnam):
-        fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home/bobby')
-        m_getpwnam.return_value = fpw
-        user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_local_global_files_badguy(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam,
+        m_get_user_groups
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600),
+            '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600),
+            '/tmp/home/badguy': ('root', 'root', 0o755),
+            '/tmp/home/badguy/home': ('root', 'root', 0o755),
+            '/tmp/home/badguy/home/bobby': ('root', 'root', 0o655),
+        }
+        user_bobby = 'bobby'
+        user_badguy = 'badguy'
+        home_bobby, *_ = self.create_fake_users(
+            [user_bobby, user_badguy], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        m_get_user_groups.side_effect = mock_get_user_groups
         # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
-        authorized_keys = self.tmp_path('authorized_keys2',
-                                        dir=user_ssh_folder)
-        util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+        authorized_keys = self.create_user_authorized_file(
+            home_bobby, 'authorized_keys2', 'rsa', keys
+        )
         # /tmp/home/bobby/.ssh/user_keys3 = dsa
-        user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder)
-        util.write_file(user_keys, VALID_CONTENT['dsa'])
+        user_keys = self.create_user_authorized_file(
+            home_bobby, 'user_keys3', 'dsa', keys
+        )
-        fpw2 = FakePwEnt(pw_name='badguy', pw_dir='/tmp/home/badguy')
-        user_ssh_folder = "%s/.ssh" % fpw2.pw_dir
         # /tmp/home/badguy/home/bobby = ""
         authorized_keys2 = self.tmp_path('home/bobby', dir="/tmp/home/badguy")
+        util.write_file(authorized_keys2, '')
         # /tmp/etc/ssh/authorized_keys = ecdsa
-        authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2',
-                                               dir="/tmp")
-        util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+        authorized_keys_global = self.create_global_authorized_file(
+            'etc/ssh/authorized_keys2', 'ecdsa', keys
+        )
         # /tmp/sshd_config
-        sshd_config = self.tmp_path('sshd_config', dir="/tmp")
-        util.write_file(
-            sshd_config,
-            "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s %s" %
-            (authorized_keys_global, user_keys, authorized_keys2)
+        options = "%s %%h/.ssh/authorized_keys2 %s %s" % \
+            (authorized_keys2, authorized_keys_global, user_keys)
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(
+            user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+        )
+        self.execute_and_check(
+            user_badguy, sshd_config, authorized_keys2, keys
-        # process first user
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+    @patch("cloudinit.util.get_user_groups")
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_unaccessible_file(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam,
+        m_get_user_groups
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/etc': ('root', 'root', 0o755),
+            '/tmp/etc/ssh': ('root', 'root', 0o755),
+            '/tmp/etc/ssh/userkeys': ('root', 'root', 0o700),
+            '/tmp/etc/ssh/userkeys/bobby': ('bobby', 'bobby', 0o600),
+            '/tmp/etc/ssh/userkeys/badguy': ('badguy', 'badguy', 0o600),
+            '/tmp/home/badguy': ('badguy', 'badguy', 0o700),
+            '/tmp/home/badguy/.ssh': ('badguy', 'badguy', 0o700),
+            '/tmp/home/badguy/.ssh/authorized_keys':
+                ('badguy', 'badguy', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_badguy = 'badguy'
+        homes = self.create_fake_users(
+            [user_bobby, user_badguy], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        m_get_user_groups.side_effect = mock_get_user_groups
+        home_bobby = homes[0]
+        home_badguy = homes[1]
-        self.assertEqual(user_keys, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['rsa'] in content)
-        self.assertTrue(VALID_CONTENT['ecdsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home_bobby, 'authorized_keys', 'rsa', keys
+        )
+        # /tmp/etc/ssh/userkeys/bobby = dsa
+        # assume here that we can bypass userkeys, despite permissions
+        self.create_global_authorized_file(
+            'etc/ssh/userkeys/bobby', 'dsa', keys
+        )
-        m_getpwnam.return_value = fpw2
-        # process second user
-        (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
-            fpw2.pw_name, sshd_config)
-        content = ssh_util.update_authorized_keys(auth_key_entries, [])
+        # /tmp/home/badguy/.ssh/authorized_keys =
+        authorized_keys2 = self.create_user_authorized_file(
+            home_badguy, 'authorized_keys', '', keys
+        )
-        # badguy should not take the key from the other user!
-        self.assertEqual(authorized_keys2, auth_key_fn)
-        self.assertTrue(VALID_CONTENT['ecdsa'] in content)
-        self.assertTrue(VALID_CONTENT['dsa'] in content)
-        self.assertFalse(VALID_CONTENT['rsa'] in content)
+        # /tmp/etc/ssh/userkeys/badguy = ecdsa
+        self.create_global_authorized_file(
+            'etc/ssh/userkeys/badguy', 'ecdsa', keys
+        )
+        # /tmp/sshd_config
+        options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys"
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(
+            user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+        )
+        self.execute_and_check(
+            user_badguy, sshd_config, authorized_keys2, keys
+        )
+    @patch("cloudinit.util.get_user_groups")
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_accessible_file(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam,
+        m_get_user_groups
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/etc': ('root', 'root', 0o755),
+            '/tmp/etc/ssh': ('root', 'root', 0o755),
+            '/tmp/etc/ssh/userkeys': ('root', 'root', 0o755),
+            '/tmp/etc/ssh/userkeys/bobby': ('bobby', 'bobby', 0o600),
+            '/tmp/etc/ssh/userkeys/badguy': ('badguy', 'badguy', 0o600),
+            '/tmp/home/badguy': ('badguy', 'badguy', 0o700),
+            '/tmp/home/badguy/.ssh': ('badguy', 'badguy', 0o700),
+            '/tmp/home/badguy/.ssh/authorized_keys':
+                ('badguy', 'badguy', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_badguy = 'badguy'
+        homes = self.create_fake_users(
+            [user_bobby, user_badguy], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        m_get_user_groups.side_effect = mock_get_user_groups
+        home_bobby = homes[0]
+        home_badguy = homes[1]
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        self.create_user_authorized_file(
+            home_bobby, 'authorized_keys', 'rsa', keys
+        )
+        # /tmp/etc/ssh/userkeys/bobby = dsa
+        # assume here that we can bypass userkeys, despite permissions
+        authorized_keys = self.create_global_authorized_file(
+            'etc/ssh/userkeys/bobby', 'dsa', keys
+        )
+        # /tmp/home/badguy/.ssh/authorized_keys =
+        self.create_user_authorized_file(
+            home_badguy, 'authorized_keys', '', keys
+        )
+        # /tmp/etc/ssh/userkeys/badguy = ecdsa
+        authorized_keys2 = self.create_global_authorized_file(
+            'etc/ssh/userkeys/badguy', 'ecdsa', keys
+        )
+        # /tmp/sshd_config
+        options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys"
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(
+            user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+        )
+        self.execute_and_check(
+            user_badguy, sshd_config, authorized_keys2, keys
+        )
+    @patch("cloudinit.util.get_user_groups")
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_hardcoded_single_user_file(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam,
+        m_get_user_groups
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/home/suzie': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_suzie = 'suzie'
+        homes = self.create_fake_users(
+            [user_bobby, user_suzie], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        home_bobby = homes[0]
+        home_suzie = homes[1]
+        m_get_user_groups.side_effect = mock_get_user_groups
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home_bobby, 'authorized_keys', 'rsa', keys
+        )
+        # /tmp/home/suzie/.ssh/authorized_keys =
+        self.create_user_authorized_file(
+            home_suzie, 'authorized_keys', '', keys
+        )
+        # /tmp/sshd_config
+        options = "%s" % (authorized_keys)
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(
+            user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+        )
+        default = "%s/.ssh/authorized_keys" % home_suzie
+        self.execute_and_check(user_suzie, sshd_config, default, keys)
+    @patch("cloudinit.util.get_user_groups")
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_hardcoded_single_user_file_inverted(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam,
+        m_get_user_groups
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/home/suzie': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_suzie = 'suzie'
+        homes = self.create_fake_users(
+            [user_bobby, user_suzie], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        home_bobby = homes[0]
+        home_suzie = homes[1]
+        m_get_user_groups.side_effect = mock_get_user_groups
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        self.create_user_authorized_file(
+            home_bobby, 'authorized_keys', 'rsa', keys
+        )
+        # /tmp/home/suzie/.ssh/authorized_keys =
+        authorized_keys2 = self.create_user_authorized_file(
+            home_suzie, 'authorized_keys', '', keys
+        )
+        # /tmp/sshd_config
+        options = "%s" % (authorized_keys2)
+        sshd_config = self.create_sshd_config(options)
+        default = "%s/.ssh/authorized_keys" % home_bobby
+        self.execute_and_check(
+            user_bobby, sshd_config, default, keys, delete_keys=False
+        )
+        self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+    @patch("cloudinit.util.get_user_groups")
+    @patch("cloudinit.ssh_util.pwd.getpwnam")
+    @patch("cloudinit.util.get_permissions")
+    @patch("cloudinit.util.get_owner")
+    @patch("cloudinit.util.get_group")
+    def test_two_users_hardcoded_user_files(
+        self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam,
+        m_get_user_groups
+    ):
+        keys = {}
+        users = {}
+        mock_permissions = {
+            '/tmp/home/bobby': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700),
+            '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600),
+            '/tmp/home/suzie': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700),
+            '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600),
+        }
+        user_bobby = 'bobby'
+        user_suzie = 'suzie'
+        homes = self.create_fake_users(
+            [user_bobby, user_suzie], mock_permissions, m_get_group,
+            m_get_owner, m_get_permissions, m_getpwnam, users
+        )
+        home_bobby = homes[0]
+        home_suzie = homes[1]
+        m_get_user_groups.side_effect = mock_get_user_groups
+        # /tmp/home/bobby/.ssh/authorized_keys = rsa
+        authorized_keys = self.create_user_authorized_file(
+            home_bobby, 'authorized_keys', 'rsa', keys
+        )
+        # /tmp/home/suzie/.ssh/authorized_keys =
+        authorized_keys2 = self.create_user_authorized_file(
+            home_suzie, 'authorized_keys', '', keys
+        )
+        # /tmp/etc/ssh/authorized_keys = ecdsa
+        authorized_keys_global = self.create_global_authorized_file(
+            'etc/ssh/authorized_keys', 'ecdsa', keys
+        )
+        # /tmp/sshd_config
+        options = "%s %s %s" % \
+            (authorized_keys_global, authorized_keys, authorized_keys2)
+        sshd_config = self.create_sshd_config(options)
+        self.execute_and_check(
+            user_bobby, sshd_config, authorized_keys, keys, delete_keys=False
+        )
+        self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
 # vi: ts=4 expandtab