Blame SOURCES/0003-Backport-upsteam-PR-1156.patch

c49324
From 57c67e2b359e9544ecd5a0ba264adf7aa7c67991 Mon Sep 17 00:00:00 2001
c49324
From: rpm-build <rpm-build>
c49324
Date: Mon, 14 Nov 2022 21:47:40 -0300
c49324
Subject: [PATCH 3/3] Backport upsteam PR#1156
c49324
c49324
From: https://github.com/keylime/keylime/pull/1156
c49324
c49324
We had partially addressed the issue we encountered when parsing some
c49324
certificates with python-cryptography by writing cert_utils module that
c49324
provided a single helper to parse the pubkey from a certificate.
c49324
c49324
However, we still were doing the EK validation in tpm_main using
c49324
python-cryptography, which means we would fall into the same parsing
c49324
problem again, since during the validation it would read all the certs
c49324
in the tpm cert store to check if it is signing the presented EK.
c49324
c49324
We address this issue by moving the EK cert verification to cert_utils:
c49324
we use the same approach as before, i.e. we parse the cert with pyasn1
c49324
when python-cryptography fails, and then use python-cryptography to do
c49324
the actual signature verification, as before.
c49324
c49324
By moving the method to cert_utils, it also becomes simpler to test it,
c49324
so in this commit we more tests to verify the methods work as expected.
c49324
c49324
Additionally, the updated EK verification is also capable of handling
c49324
ECDSA signatures
c49324
---
c49324
 keylime.conf                |   2 +
c49324
 keylime/cert_utils.py       | 144 ++++++++++++++++++++++++++++++++++--
c49324
 keylime/registrar_common.py |   7 +-
c49324
 keylime/tenant.py           |  21 ++----
c49324
 keylime/tpm/tpm_main.py     |  47 +-----------
c49324
 keylime/tpm_ek_ca.py        |  10 +--
c49324
 scripts/ek-openssl-verify   |  98 ++++++++++++++++++++++++
c49324
 test/run_tests.sh           |   8 +-
c49324
 test/test_cert_utils.py     | 142 ++++++++++++++++++++++++++++++-----
c49324
 9 files changed, 380 insertions(+), 99 deletions(-)
c49324
 create mode 100755 scripts/ek-openssl-verify
c49324
c49324
diff --git a/keylime.conf b/keylime.conf
c49324
index 331e57a..d896f9f 100644
c49324
--- a/keylime.conf
c49324
+++ b/keylime.conf
c49324
@@ -501,6 +501,8 @@ require_ek_cert = True
c49324
 # PROVKEYS - contains a json document containing EK, EKcert, and AIK from the
c49324
 # provider.  EK and AIK are in PEM format.  The EKcert is in base64 encoded
c49324
 # DER format.
c49324
+# TPM_CERT_STORE - contains the path to the TPM certificates store, e.g.:
c49324
+# "/var/lib/keylime/tpm_cert_store".
c49324
 #
c49324
 # Set to blank to disable this check.  See warning above if require_ek_cert
c49324
 # is "False".
c49324
diff --git a/keylime/cert_utils.py b/keylime/cert_utils.py
c49324
index d014aed..d2fc54d 100644
c49324
--- a/keylime/cert_utils.py
c49324
+++ b/keylime/cert_utils.py
c49324
@@ -1,13 +1,143 @@
c49324
-from cryptography.hazmat.primitives.serialization import load_der_public_key
c49324
+import io
c49324
+import os.path
c49324
+import subprocess
c49324
+import sys
c49324
+
c49324
+from cryptography import exceptions as crypto_exceptions
c49324
+from cryptography import x509
c49324
+from cryptography.hazmat.backends import default_backend
c49324
+from cryptography.hazmat.primitives.asymmetric import ec, padding
c49324
+from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
c49324
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
c49324
 from pyasn1.codec.der import decoder, encoder
c49324
-from pyasn1_modules import rfc2459
c49324
+from pyasn1_modules import pem, rfc2459
c49324
 
c49324
+from keylime import config, keylime_logging, tpm_ek_ca
c49324
 
c49324
 # Issue #944 -- python-cryptography won't parse malformed certs,
c49324
 # such as some Nuvoton ones we have encountered in the field.
c49324
 # Unfortunately, we still have to deal with such certs anyway.
c49324
-# Let's read the EK cert with pyasn1 instead of python-cryptography.
c49324
-def read_x509_der_cert_pubkey(der_cert_data):
c49324
-    """Returns the public key of a DER-encoded X.509 certificate"""
c49324
-    der509 = decoder.decode(der_cert_data, asn1Spec=rfc2459.Certificate())[0]
c49324
-    return load_der_public_key(encoder.encode(der509["tbsCertificate"]["subjectPublicKeyInfo"]))
c49324
+
c49324
+# Here we provide some helpers that use pyasn1 to parse the certificates
c49324
+# when parsing them with python-cryptography fails, and in this case, we
c49324
+# try to read the parsed certificate again into python-cryptograhy.
c49324
+
c49324
+logger = keylime_logging.init_logging("cert_utils")
c49324
+
c49324
+
c49324
+def x509_der_cert(der_cert_data: bytes):
c49324
+    """Load an x509 certificate provided in DER format
c49324
+    :param der_cert_data: the DER bytes of the certificate
c49324
+    :type der_cert_data: bytes
c49324
+    :returns: cryptography.x509.Certificate
c49324
+    """
c49324
+    try:
c49324
+        return x509.load_der_x509_certificate(data=der_cert_data, backend=default_backend())
c49324
+    except Exception as e:
c49324
+        logger.warning("Failed to parse DER data with python-cryptography: %s", e)
c49324
+        pyasn1_cert = decoder.decode(der_cert_data, asn1Spec=rfc2459.Certificate())[0]
c49324
+        return x509.load_der_x509_certificate(data=encoder.encode(pyasn1_cert), backend=default_backend())
c49324
+
c49324
+
c49324
+def x509_pem_cert(pem_cert_data: str):
c49324
+    """Load an x509 certificate provided in PEM format
c49324
+    :param pem_cert_data: the base-64 encoded PEM certificate
c49324
+    :type pem_cert_data: str
c49324
+    :returns: cryptography.x509.Certificate
c49324
+    """
c49324
+    try:
c49324
+        return x509.load_pem_x509_certificate(data=pem_cert_data.encode("utf-8"), backend=default_backend())
c49324
+    except Exception as e:
c49324
+        logger.warning("Failed to parse PEM data with python-cryptography: %s", e)
c49324
+        # Let's read the DER bytes from the base-64 PEM.
c49324
+        der_data = pem.readPemFromFile(io.StringIO(pem_cert_data))
c49324
+        # Now we can load it as we do in x509_der_cert().
c49324
+        pyasn1_cert = decoder.decode(der_data, asn1Spec=rfc2459.Certificate())[0]
c49324
+        return x509.load_der_x509_certificate(data=encoder.encode(pyasn1_cert), backend=default_backend())
c49324
+
c49324
+
c49324
+def verify_ek(ekcert, tpm_cert_store=config.get("tenant", "tpm_cert_store")):
c49324
+    """Verify that the provided EK certificate is signed by a trusted root
c49324
+    :param ekcert: The Endorsement Key certificate in DER format
c49324
+    :returns: True if the certificate can be verified, False otherwise
c49324
+    """
c49324
+    try:
c49324
+        trusted_certs = tpm_ek_ca.cert_loader(tpm_cert_store)
c49324
+    except Exception as e:
c49324
+        logger.warning("Error loading trusted certificates from the TPM cert store: %s", e)
c49324
+        return False
c49324
+
c49324
+    try:
c49324
+        ek509 = x509_der_cert(ekcert)
c49324
+        for cert_file, pem_cert in trusted_certs.items():
c49324
+            signcert = x509_pem_cert(pem_cert)
c49324
+            if ek509.issuer != signcert.subject:
c49324
+                continue
c49324
+
c49324
+            signcert_pubkey = signcert.public_key()
c49324
+            try:
c49324
+                if isinstance(signcert_pubkey, RSAPublicKey):
c49324
+                    signcert_pubkey.verify(
c49324
+                        ek509.signature,
c49324
+                        ek509.tbs_certificate_bytes,
c49324
+                        padding.PKCS1v15(),
c49324
+                        ek509.signature_hash_algorithm,
c49324
+                    )
c49324
+                elif isinstance(signcert_pubkey, EllipticCurvePublicKey):
c49324
+                    signcert_pubkey.verify(
c49324
+                        ek509.signature,
c49324
+                        ek509.tbs_certificate_bytes,
c49324
+                        ec.ECDSA(ek509.signature_hash_algorithm),
c49324
+                    )
c49324
+                else:
c49324
+                    logger.warning("Unsupported public key type: %s", type(signcert_pubkey))
c49324
+                    continue
c49324
+            except crypto_exceptions.InvalidSignature:
c49324
+                continue
c49324
+
c49324
+            logger.debug("EK cert matched cert: %s", cert_file)
c49324
+            return True
c49324
+    except Exception as e:
c49324
+        # Log the exception so we don't lose the raw message
c49324
+        logger.exception(e)
c49324
+        raise Exception("Error processing ek/ekcert. Does this TPM have a valid EK?").with_traceback(sys.exc_info()[2])
c49324
+
c49324
+    logger.error("No Root CA matched EK Certificate")
c49324
+    return False
c49324
+
c49324
+
c49324
+def verify_ek_script(script, env, cwd):
c49324
+    if script is None:
c49324
+        logger.warning("External check script (%s) not specified", script)
c49324
+        return False
c49324
+
c49324
+    script_path = os.path.abspath(script)
c49324
+    if not os.path.isfile(script_path):
c49324
+        if cwd is None or not os.path.isfile(os.path.abspath(os.path.join(cwd, script))):
c49324
+            logger.warning("External check script (%s) not found; please make sure its path is correct", script)
c49324
+            return False
c49324
+        script_path = os.path.abspath(os.path.join(cwd, script))
c49324
+
c49324
+    try:
c49324
+        proc = subprocess.run(
c49324
+            [script_path],
c49324
+            env=env,
c49324
+            shell=False,
c49324
+            cwd=cwd,
c49324
+            stderr=subprocess.STDOUT,
c49324
+            stdout=subprocess.PIPE,
c49324
+            check=False,
c49324
+        )
c49324
+        if proc.returncode != 0:
c49324
+            errmsg = ""
c49324
+            if proc.stdout is not None:
c49324
+                errmsg = proc.stdout.decode("utf-8")
c49324
+            logger.error("External check script failed to validate EK: %s", errmsg)
c49324
+            return False
c49324
+        logger.debug("External check script successfully to validated EK")
c49324
+        if proc.stdout is not None:
c49324
+            logger.info("ek_check output: %s", proc.stdout.decode("utf-8"))
c49324
+    except subprocess.CalledProcessError as e:
c49324
+        logger.error("Error while trying to run external check script to validate EK: %s", e)
c49324
+        return False
c49324
+    return True
c49324
diff --git a/keylime/registrar_common.py b/keylime/registrar_common.py
c49324
index 2c32d19..fb37e5b 100644
c49324
--- a/keylime/registrar_common.py
c49324
+++ b/keylime/registrar_common.py
c49324
@@ -261,11 +261,8 @@ class UnprotectedHandler(BaseHTTPRequestHandler, SessionManager):
c49324
                 # Note, we don't validate the EKCert here, other than the implicit
c49324
                 #  "is it a valid x509 cert" check. So it's still untrusted.
c49324
                 # This will be validated by the tenant.
c49324
-                ek_tpm = base64.b64encode(
c49324
-                    tpm2_objects.ek_low_tpm2b_public_from_pubkey(
c49324
-                        cert_utils.read_x509_der_cert_pubkey(base64.b64decode(ekcert))
c49324
-                    )
c49324
-                ).decode()
c49324
+                cert = cert_utils.x509_der_cert(base64.b64decode(ekcert))
c49324
+                ek_tpm = base64.b64encode(tpm2_objects.ek_low_tpm2b_public_from_pubkey(cert.public_key())).decode()
c49324
 
c49324
             aik_attrs = tpm2_objects.get_tpm2b_public_object_attributes(
c49324
                 base64.b64decode(aik_tpm),
c49324
diff --git a/keylime/tenant.py b/keylime/tenant.py
c49324
index cc53623..dd9c09c 100644
c49324
--- a/keylime/tenant.py
c49324
+++ b/keylime/tenant.py
c49324
@@ -5,7 +5,6 @@ import io
c49324
 import json
c49324
 import logging
c49324
 import os
c49324
-import subprocess
c49324
 import sys
c49324
 import tempfile
c49324
 import time
c49324
@@ -15,7 +14,7 @@ import requests
c49324
 from cryptography.hazmat.primitives import serialization as crypto_serialization
c49324
 
c49324
 from keylime import api_version as keylime_api_version
c49324
-from keylime import ca_util, config, crypto, keylime_logging, registrar_client, signing, web_util
c49324
+from keylime import ca_util, cert_utils, config, crypto, keylime_logging, registrar_client, signing, web_util
c49324
 from keylime.agentstates import AgentAttestState
c49324
 from keylime.cli import options, policies
c49324
 from keylime.cmd import user_data_encrypt
c49324
@@ -516,19 +515,11 @@ class Tenant:
c49324
             env["EK_CERT"] = ""
c49324
 
c49324
         env["PROVKEYS"] = json.dumps(self.registrar_data.get("provider_keys", {}))
c49324
-        with subprocess.Popen(
c49324
-            script, env=env, shell=True, cwd=config.WORK_DIR, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
c49324
-        ) as proc:
c49324
-            retval = proc.wait()
c49324
-            if retval != 0:
c49324
-                raise UserError("External check script failed to validate EK")
c49324
-            logger.debug("External check script successfully to validated EK")
c49324
-            while True:
c49324
-                line = proc.stdout.readline().decode()
c49324
-                if line == "":
c49324
-                    break
c49324
-                logger.debug("ek_check output: %s", line.strip())
c49324
-        return True
c49324
+
c49324
+        # Define the TPM cert store for the external script.
c49324
+        env["TPM_CERT_STORE"] = config.get("tenant", "tpm_cert_store")
c49324
+
c49324
+        return cert_utils.verify_ek_script(script, env, config.WORK_DIR)
c49324
 
c49324
     def do_cv(self):
c49324
         """Initiate v, agent_id and ip and initiate the cloudinit sequence"""
c49324
diff --git a/keylime/tpm/tpm_main.py b/keylime/tpm/tpm_main.py
c49324
index 7bab4fd..35f0a2f 100644
c49324
--- a/keylime/tpm/tpm_main.py
c49324
+++ b/keylime/tpm/tpm_main.py
c49324
@@ -12,14 +12,10 @@ import time
c49324
 import typing
c49324
 import zlib
c49324
 
c49324
-from cryptography import exceptions as crypto_exceptions
c49324
-from cryptography import x509
c49324
-from cryptography.hazmat.backends import default_backend
c49324
 from cryptography.hazmat.primitives import serialization as crypto_serialization
c49324
-from cryptography.hazmat.primitives.asymmetric import padding
c49324
 from packaging.version import Version
c49324
 
c49324
-from keylime import cmd_exec, config, keylime_logging, secure_mount, tpm_ek_ca
c49324
+from keylime import cert_utils, cmd_exec, config, keylime_logging, secure_mount
c49324
 from keylime.agentstates import TPMClockInfo
c49324
 from keylime.common import algorithms, retry
c49324
 from keylime.failure import Component, Failure
c49324
@@ -785,46 +781,7 @@ class tpm(tpm_abstract.AbstractTPM):
c49324
         :param ekcert: The Endorsement Key certificate in DER format
c49324
         :returns: True if the certificate can be verified, false otherwise
c49324
         """
c49324
-        # openssl x509 -inform der -in certificate.cer -out certificate.pem
c49324
-        try:
c49324
-            tpm_ek_ca.check_tpm_cert_store()
c49324
-
c49324
-            ek509 = x509.load_der_x509_certificate(
c49324
-                data=ekcert,
c49324
-                backend=default_backend(),
c49324
-            )
c49324
-
c49324
-            trusted_certs = tpm_ek_ca.cert_loader()
c49324
-            for cert in trusted_certs:
c49324
-                signcert = x509.load_pem_x509_certificate(
c49324
-                    data=cert.encode(),
c49324
-                    backend=default_backend(),
c49324
-                )
c49324
-
c49324
-                if ek509.issuer.rfc4514_string() != signcert.subject.rfc4514_string():
c49324
-                    continue
c49324
-
c49324
-                try:
c49324
-                    signcert.public_key().verify(
c49324
-                        ek509.signature,
c49324
-                        ek509.tbs_certificate_bytes,
c49324
-                        padding.PKCS1v15(),
c49324
-                        ek509.signature_hash_algorithm,
c49324
-                    )
c49324
-                except crypto_exceptions.InvalidSignature:
c49324
-                    continue
c49324
-
c49324
-                logger.debug("EK cert matched cert: %s", cert)
c49324
-                return True
c49324
-        except Exception as e:
c49324
-            # Log the exception so we don't lose the raw message
c49324
-            logger.exception(e)
c49324
-            raise Exception("Error processing ek/ekcert. Does this TPM have a valid EK?").with_traceback(
c49324
-                sys.exc_info()[2]
c49324
-            )
c49324
-
c49324
-        logger.error("No Root CA matched EK Certificate")
c49324
-        return False
c49324
+        return cert_utils.verify_ek(ekcert)
c49324
 
c49324
     def get_tpm_manufacturer(self, output=None):
c49324
         vendorStr = None
c49324
diff --git a/keylime/tpm_ek_ca.py b/keylime/tpm_ek_ca.py
c49324
index 3695f0b..fb66c07 100644
c49324
--- a/keylime/tpm_ek_ca.py
c49324
+++ b/keylime/tpm_ek_ca.py
c49324
@@ -7,8 +7,7 @@ logger = keylime_logging.init_logging("tpm_ek_ca")
c49324
 trusted_certs = {}
c49324
 
c49324
 
c49324
-def check_tpm_cert_store():
c49324
-    tpm_cert_store = config.get("tenant", "tpm_cert_store")
c49324
+def check_tpm_cert_store(tpm_cert_store=config.get("tenant", "tpm_cert_store")):
c49324
     if not os.path.isdir(tpm_cert_store):
c49324
         logger.error("The directory %s does not exist.", tpm_cert_store)
c49324
         raise Exception(f"The directory {tpm_cert_store} does not exist.")
c49324
@@ -21,11 +20,10 @@ def check_tpm_cert_store():
c49324
         raise Exception(f"The directory {tpm_cert_store} does not contain " f"any .pem files")
c49324
 
c49324
 
c49324
-def cert_loader():
c49324
-    tpm_cert_store = config.get("tenant", "tpm_cert_store")
c49324
+def cert_loader(tpm_cert_store=config.get("tenant", "tpm_cert_store")):
c49324
     file_list = glob.glob(os.path.join(tpm_cert_store, "*.pem"))
c49324
-    my_trusted_certs = []
c49324
+    my_trusted_certs = {}
c49324
     for file_path in file_list:
c49324
         with open(file_path, encoding="utf-8") as f_input:
c49324
-            my_trusted_certs.append(f_input.read())
c49324
+            my_trusted_certs[file_path] = f_input.read()
c49324
     return my_trusted_certs
c49324
diff --git a/scripts/ek-openssl-verify b/scripts/ek-openssl-verify
c49324
new file mode 100755
c49324
index 0000000..f91e9b5
c49324
--- /dev/null
c49324
+++ b/scripts/ek-openssl-verify
c49324
@@ -0,0 +1,98 @@
c49324
+#!/bin/sh
c49324
+
c49324
+# This script can be used as the `ek_check_script' (tenant configuration),
c49324
+# to attempt to verify a provided EK_CERT via env var using openssl.
c49324
+
c49324
+# EK             - contains a PEM-encoded version of the public EK
c49324
+# EK_CERT        - contains a base64 DER-encoded EK certificate if one is
c49324
+#                  available.
c49324
+# PROVKEYS       - contains a json document containing EK, EKcert, and AIK
c49324
+#                  from the provider. EK and AIK are in PEM format. The
c49324
+#                  EKcert is in base64-encoded DER format
c49324
+# TPM_CERT_STORE - contains the path of the TPM certificate store.
c49324
+EK=${EK:-}
c49324
+EK_CERT=${EK_CERT:-}
c49324
+PROVKEYS=${PROVKEYS:-}
c49324
+
c49324
+EK_VERIFICATION_LOG=${EK_VERIFICATION_LOG:-/var/log/keylime/ek-verification.log}
c49324
+LOG="${EK_VERIFICATION_LOG}"
c49324
+
c49324
+# Setting log fallback in case we cannot write to the specified file.
c49324
+touch "${LOG}" 2>/dev/null || LOG=/dev/stderr
c49324
+
c49324
+log() {
c49324
+  _stderr=${2:-}
c49324
+  echo "[$(date)] ${1}" >&2 >> "${LOG}"
c49324
+  [ -n "${_stderr}" ] && [ "${LOG}" != '/dev/stderr' ] && echo "${1}" >&2
c49324
+}
c49324
+
c49324
+die() {
c49324
+  log "ERROR: ${1}" _
c49324
+  exit 1
c49324
+}
c49324
+
c49324
+command -v openssl >/dev/null \
c49324
+  || die "openssl CLI was not found in the PATH; please make sure it is installed"
c49324
+
c49324
+[ -n "${EK_CERT}" ] || die "EK_CERT was not provided as an env var"
c49324
+
c49324
+# Cert store directory. If one is not provided via TPM_CERT_STORE env var,
c49324
+# we start by attempting to read tenant.conf.
c49324
+CERT_STORE=${TPM_CERT_STORE:-}
c49324
+
c49324
+if [ -z "${CERT_STORE}" ]; then
c49324
+  KEYLIME_CONFIG_DIR=${KEYLIME_CONFIG_DIR:-/etc/keylime}
c49324
+  [ -d "${KEYLIME_CONFIG_DIR}" ] \
c49324
+    || die "KEYLIME_CONFIG_DIR (${KEYLIME_CONFIG_DIR}) does not seem to exist"
c49324
+
c49324
+  if [ -r "${KEYLIME_CONFIG_DIR}"/tenant.conf ]; then
c49324
+    CERT_STORE="$(grep -w ^tpm_cert_store "${KEYLIME_CONFIG_DIR}"/tenant.conf \
c49324
+                  | tail -1 | cut -d'=' -f2 | tr -d "[:blank:]")"
c49324
+  fi
c49324
+
c49324
+  # Next we try to read any snippets in tenant.conf.d/
c49324
+  if [ -d "${KEYLIME_CONFIG_DIR}"/tenant.conf.d ]; then
c49324
+    for _s in "${KEYLIME_CONFIG_DIR}"/tenant.conf.d/*.conf; do
c49324
+      [ -e "${_s}" ] || continue
c49324
+      _store="$(grep -w ^tpm_cert_store "${_s}" \
c49324
+                | tail -1 | cut -d'=' -f2 | tr -d "[:blank:]")"
c49324
+      [ -n "${_store}" ] && CERT_STORE="${_store}"
c49324
+    done
c49324
+  fi
c49324
+fi
c49324
+
c49324
+[ -n "${CERT_STORE}" ] \
c49324
+  || die "It was not possible to determine the TPM cert store dir from tenant.conf or tenant.conf.d/ snippets"
c49324
+[ -d "${CERT_STORE}" ] \
c49324
+  || die "TPM cert store is not a valid directory (${CERT_STORE})"
c49324
+
c49324
+EK_VERIFICATION_TMPDIR=
c49324
+ek_verification_cleanup() {
c49324
+  [ -d "${EK_VERIFICATION_TMPDIR}" ] || return 0
c49324
+  rm -rf "${EK_VERIFICATION_TMPDIR}"
c49324
+}
c49324
+trap ek_verification_cleanup EXIT
c49324
+
c49324
+mkdir -p "${TMPDIR:-/tmp}"
c49324
+EK_VERIFICATION_TMPDIR="$(mktemp -d)" || \
c49324
+  die "Creating a temp dir for EK verification failed"
c49324
+
c49324
+EK_CERT_FILE_DER="${EK_VERIFICATION_TMPDIR}"/ek.der
c49324
+EK_CERT_FILE_PEM="${EK_VERIFICATION_TMPDIR}"/ek.pem
c49324
+
c49324
+printf '%s' "${EK_CERT}" > "${EK_CERT_FILE_DER}".b64
c49324
+base64 -d "${EK_CERT_FILE_DER}".b64 > "${EK_CERT_FILE_DER}"
c49324
+openssl x509 -inform der -in "${EK_CERT_FILE_DER}" > "${EK_CERT_FILE_PEM}"
c49324
+
c49324
+for c in "${CERT_STORE}"/*.pem; do
c49324
+  [ -e "${c}" ] || continue
c49324
+  log "Checking if ${c} is the issuer of EK cert..."
c49324
+  if openssl verify -partial_chain -CAfile "${c}" "${EK_CERT_FILE_PEM}" \
c49324
+                                           >>"${LOG}" 2>>"${LOG}"; then
c49324
+    log "${EK_CERT} successfully verified by $(basename "${c}")" _
c49324
+    exit 0
c49324
+  fi
c49324
+done
c49324
+
c49324
+die "EK signature did not match certificates from TPM cert store"
c49324
+# vim:set ts=2 sw=2 et:
c49324
diff --git a/test/run_tests.sh b/test/run_tests.sh
c49324
index fc43113..81a6dff 100755
c49324
--- a/test/run_tests.sh
c49324
+++ b/test/run_tests.sh
c49324
@@ -107,19 +107,19 @@ fi
c49324
 # Set correct dependencies
c49324
 # Fedora
c49324
 if [ $PACKAGE_MGR = "dnf" ]; then
c49324
-    PYTHON_PREIN="python3"
c49324
+    PYTHON_PREIN="python3 openssl"
c49324
     PYTHON_DEPS="python3-pip python3-dbus"
c49324
 # RHEL / CentOS etc
c49324
 elif [ $PACKAGE_MGR = "yum" ]; then
c49324
-    PYTHON_PREIN="epel-release python36"
c49324
+    PYTHON_PREIN="epel-release python36 openssl"
c49324
     PYTHON_DEPS="python36-pip python36-dbus"
c49324
 # Ubuntu / Debian
c49324
 elif [ $PACKAGE_MGR = "apt-get" ]; then
c49324
-    PYTHON_PREIN="python3"
c49324
+    PYTHON_PREIN="python3 openssl"
c49324
     PYTHON_DEPS="python3-pip python3-dbus"
c49324
 # SUSE
c49324
 elif [ $PACKAGE_MGR = "zypper" ]; then
c49324
-    PYTHON_PREIN="python3"
c49324
+    PYTHON_PREIN="python3 openssl"
c49324
     PYTHON_DEPS="python3-pip python3-dbus"
c49324
 else
c49324
     echo "No recognized package manager found on this system!" 1>&2
c49324
diff --git a/test/test_cert_utils.py b/test/test_cert_utils.py
c49324
index 4666c0f..bdf6090 100644
c49324
--- a/test/test_cert_utils.py
c49324
+++ b/test/test_cert_utils.py
c49324
@@ -4,17 +4,19 @@ Copyright 2022 Red Hat, Inc.
c49324
 """
c49324
 
c49324
 import base64
c49324
+import os
c49324
 import unittest
c49324
 
c49324
-from keylime import cert_utils
c49324
+import cryptography
c49324
 
c49324
+from keylime import cert_utils, tpm_ek_ca
c49324
 
c49324
-class Cert_Utils_Test(unittest.TestCase):
c49324
-    def test_read_x509_der_cert_pubkey(self):
c49324
-        # The certificate listed in issue #944, from Nuvoton. It fails to
c49324
-        # be parsed by python-cryptography with the following error:
c49324
-        # ValueError: error parsing asn1 value: ParseError { kind: InvalidSetOrdering, location: ["RawCertificate::tbs_cert", "TbsCertificate::issuer", "0", "2"] }
c49324
-        nuvoton_ecdsa_sha256_der = """\
c49324
+CERT_STORE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "tpm_cert_store"))
c49324
+
c49324
+# The certificate listed in issue #944, from Nuvoton. It fails to
c49324
+# be parsed by python-cryptography with the following error:
c49324
+# ValueError: error parsing asn1 value: ParseError { kind: InvalidSetOrdering, location: ["RawCertificate::tbs_cert", "TbsCertificate::issuer", "0", "2"] }
c49324
+nuvoton_ecdsa_sha256_der = """\
c49324
 MIICBjCCAaygAwIBAgIIP5MvnZk8FrswCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3RvbiBU
c49324
 UE0gUm9vdCBDQSAyMTEwMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9uMAkG
c49324
 A1UEBhMCVFcwHhcNMTUxMDE5MDQzMjAwWhcNMzUxMDE1MDQzMjAwWjBVMVMwHwYDVQQDExhOdXZv
c49324
@@ -26,10 +28,10 @@ ajW+9zAfBgNVHSMEGDAWgBSfu3mqD1JieL7RUJKacXHpajW+9zAKBggqhkjOPQQDAgNIADBFAiEA
c49324
 /jiywhOKpiMOUnTfDmXsXfDFokhKVNTXB6Xtqm7J8L4CICjT3/Y+rrSnf8zrBXqWeHDh8Wi41+w2
c49324
 ppq6Ev9orZFI
c49324
 """
c49324
-        # This cert from STMicroelectronics presents a different issue when
c49324
-        # parsed by python-cryptography:
c49324
-        # ValueError: error parsing asn1 value: ParseError { kind: ExtraData }
c49324
-        st_sha256_with_rsa_der = """\
c49324
+# This cert from STMicroelectronics presents a different issue when
c49324
+# parsed by python-cryptography:
c49324
+# ValueError: error parsing asn1 value: ParseError { kind: ExtraData }
c49324
+st_sha256_with_rsa_der = """\
c49324
 MIIEjTCCA3WgAwIBAgIUTL0P5h7nYu2yjVCyaPw1hv89XoIwDQYJKoZIhvcNAQELBQAwVTELMAkG
c49324
 A1UEBhMCQ0gxHjAcBgNVBAoTFVNUTWljcm9lbGVjdHJvbmljcyBOVjEmMCQGA1UEAxMdU1RNIFRQ
c49324
 TSBFSyBJbnRlcm1lZGlhdGUgQ0EgMDUwHhcNMTgwNzExMDAwMDAwWhcNMjgwNzExMDAwMDAwWjAA
c49324
@@ -60,10 +62,116 @@ rTJ1x4NA2ZtQMYyT29Yy1UlkjocAaXL5u0m3Hvz/////////////////////////////////////
c49324
 ////////////////////////////////////////////////////////////////////////////
c49324
 /////w==
c49324
 """
c49324
-        certs = [nuvoton_ecdsa_sha256_der, st_sha256_with_rsa_der]
c49324
-        for c in certs:
c49324
+
c49324
+st_ecdsa_sha256_der = """\
c49324
+MIIDAzCCAqmgAwIBAgIUIymn2ai+UaVx1bM26/wU7I+sJd8wCgYIKoZIzj0EAwIwVjELMAkGA1UE
c49324
+BhMCQ0gxHjAcBgNVBAoTFVNUTWljcm9lbGVjdHJvbmljcyBOVjEnMCUGA1UEAxMeU1RNIFRQTSBF
c49324
+Q0MgSW50ZXJtZWRpYXRlIENBIDAxMB4XDTE4MDcyNjAwMDAwMFoXDTI4MDcyNjAwMDAwMFowADBZ
c49324
+MBMGByqGSM49AgEGCCqGSM49AwEHA0IABBsTz5y2cedVZxG/GsbXQ9bL6EQylWNjx1b/SSp2EHlN
c49324
+aJjtn43iz2zb+qot2UOhQIwPxS5hMCXhasw4XsFXgnijggGpMIIBpTAfBgNVHSMEGDAWgBR+uDbO
c49324
++9+KY3H/czP5utcUYWyWyzBCBgNVHSAEOzA5MDcGBFUdIAAwLzAtBggrBgEFBQcCARYhaHR0cDov
c49324
+L3d3dy5zdC5jb20vVFBNL3JlcG9zaXRvcnkvMFkGA1UdEQEB/wRPME2kSzBJMRYwFAYFZ4EFAgEM
c49324
+C2lkOjUzNTQ0RDIwMRcwFQYFZ4EFAgIMDFNUMzNIVFBIQUhCNDEWMBQGBWeBBQIDDAtpZDowMDQ5
c49324
+MDAwNDBmBgNVHQkEXzBdMBYGBWeBBQIQMQ0wCwwDMi4wAgEAAgF0MEMGBWeBBQISMTowOAIBAAEB
c49324
+/6ADCgEBoQMKAQCiAwoBAKMQMA4WAzMuMQoBBAoBAgEB/6QPMA0WBTE0MC0yCgECAQEAMAwGA1Ud
c49324
+EwEB/wQCMAAwEAYDVR0lBAkwBwYFZ4EFCAEwDgYDVR0PAQH/BAQDAgMIMEsGCCsGAQUFBwEBBD8w
c49324
+PTA7BggrBgEFBQcwAoYvaHR0cDovL3NlY3VyZS5nbG9iYWxzaWduLmNvbS9zdG10cG1lY2NpbnQw
c49324
+MS5jcnQwCgYIKoZIzj0EAwIDSAAwRQIgcNiZkn7poyk6J8Y1Cnwz4nV7YGPb5pBesBg6bk9n6KIC
c49324
+IQCE/jkHb/aPP/T3GtfLNHAdHL4JnofAbsDEuLQxAseeZA==
c49324
+"""
c49324
+
c49324
+
c49324
+def has_strict_x509_parsing():
c49324
+    """Indicates whether python-cryptography has strict x509 parsing."""
c49324
+
c49324
+    # Major release where python-cryptography started being strict
c49324
+    # when parsing x509 certificates.
c49324
+    PYCRYPTO_STRICT_X509_MAJOR = 35
c49324
+    return int(cryptography.__version__.split(".", maxsplit=1)[0]) >= PYCRYPTO_STRICT_X509_MAJOR
c49324
+
c49324
+
c49324
+def expectedFailureIf(condition):
c49324
+    """The test is marked as an expectedFailure if the condition is satisfied."""
c49324
+
c49324
+    def wrapper(func):
c49324
+        if condition:
c49324
+            return unittest.expectedFailure(func)
c49324
+        return func
c49324
+
c49324
+    return wrapper
c49324
+
c49324
+
c49324
+class Cert_Utils_Test(unittest.TestCase):
c49324
+    def test_tpm_cert_store(self):
c49324
+        tpm_ek_ca.check_tpm_cert_store(CERT_STORE_DIR)
c49324
+        my_trusted_certs = tpm_ek_ca.cert_loader(CERT_STORE_DIR)
c49324
+
c49324
+        self.assertNotEqual(len(my_trusted_certs), 0)
c49324
+
c49324
+    def test_cert_store_certs(self):
c49324
+        my_trusted_certs = tpm_ek_ca.cert_loader(CERT_STORE_DIR)
c49324
+        for fname, pem_cert in my_trusted_certs.items():
c49324
             try:
c49324
-                pubkey = cert_utils.read_x509_der_cert_pubkey(base64.b64decode(c))
c49324
-            except Exception:
c49324
-                self.fail("read_x509_der_cert_pubkey() is not expected to raise an exception here")
c49324
-            self.assertIsNotNone(pubkey)
c49324
+                cert = cert_utils.x509_pem_cert(pem_cert)
c49324
+            except Exception as e:
c49324
+                self.fail(f"Failed to load certificate {fname}: {e}")
c49324
+            self.assertIsNotNone(cert)
c49324
+
c49324
+    def test_verify_ek(self):
c49324
+        tests = [
c49324
+            {"cert": st_sha256_with_rsa_der, "expected": True},  # RSA, signed by STM_RSA_05I.pem.
c49324
+            {"cert": st_ecdsa_sha256_der, "expected": True},  # ECC, signed by STM_ECC_01I.pem.
c49324
+        ]
c49324
+        for t in tests:
c49324
+            self.assertEqual(
c49324
+                cert_utils.verify_ek(base64.b64decode(t["cert"]), CERT_STORE_DIR),
c49324
+                t["expected"],
c49324
+                msg=f"Test failed for cert {t['cert']}; expected: {t['expected']}",
c49324
+            )
c49324
+
c49324
+    @expectedFailureIf(has_strict_x509_parsing())
c49324
+    def test_verify_ek_expected_failures(self):
c49324
+        # The following certificates are not compliant, and will fail the
c49324
+        # signature verification with python-cryptography, even though they
c49324
+        # should validate. Marking as expected failure for now.
c49324
+        tests = [
c49324
+            {"cert": nuvoton_ecdsa_sha256_der, "expected": True},  # ECC, signed by NUVO_2110.pem.
c49324
+        ]
c49324
+        for t in tests:
c49324
+            self.assertEqual(
c49324
+                cert_utils.verify_ek(base64.b64decode(t["cert"]), CERT_STORE_DIR),
c49324
+                t["expected"],
c49324
+                msg=f"Test failed for cert {t['cert']}; expected: {t['expected']}",
c49324
+            )
c49324
+
c49324
+    def test_verify_ek_script(self):
c49324
+        # We will be using `nuvoton_ecdsa_sha256_der', which is signed by
c49324
+        # NUVO_2110.pem but fails verification when using python-cryptography
c49324
+        # as it is a malformed cert -- it is the same one we use in
c49324
+        # test_verify_ek_expected_failures().
c49324
+        # With an external script `ek_script_check' that uses openssl, the
c49324
+        # validation works.
c49324
+        cert = nuvoton_ecdsa_sha256_der.replace("\n", "")
c49324
+
c49324
+        self.assertFalse(cert_utils.verify_ek_script(None, None, None))
c49324
+        self.assertFalse(cert_utils.verify_ek_script("/foo/bar", None, None))
c49324
+
c49324
+        script = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "scripts", "ek-openssl-verify"))
c49324
+        # Testing ek-openssl-verify script, but without specifying the
c49324
+        # EK_CERT env var.
c49324
+        self.assertFalse(cert_utils.verify_ek_script(script, None, None))
c49324
+
c49324
+        # Now let's specify the EK_CERT.
c49324
+        env = os.environ.copy()
c49324
+        env["EK_CERT"] = cert
c49324
+        env["TPM_CERT_STORE"] = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "tpm_cert_store"))
c49324
+        self.assertTrue(cert_utils.verify_ek_script(script, env, None))
c49324
+
c49324
+        # Now, let us specify the ek_check_script with a relative path.
c49324
+        script = "ek-openssl-verify"
c49324
+        cwd = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "scripts"))
c49324
+        self.assertTrue(cert_utils.verify_ek_script(script, env, cwd))
c49324
+
c49324
+        # And now we try a bad TPM cert store.
c49324
+        env["TPM_CERT_STORE"] = "/some/bad/directory"
c49324
+        self.assertFalse(cert_utils.verify_ek_script(script, env, cwd))
c49324
-- 
c49324
2.38.1
c49324