diff -N -u -r python-nss-0.14.0/doc/ChangeLog python-nss-0.14.1/doc/ChangeLog
--- python-nss-0.14.0/doc/ChangeLog 2013-05-02 17:29:27.000000000 -0400
+++ python-nss-0.14.1/doc/ChangeLog 2013-10-09 09:35:29.000000000 -0400
@@ -1,5 +1,37 @@
-2013-04-24 John Dennis <jdennis@redhat.com> 0.14.0
- External Changes
+2013-10-09 John Dennis <jdennis@redhat.com> 0.14.1
+
+ Modifications only to tests and examples.
+
+ * Fix bug in ssl_example.py and test_client_server.py where complete
+ data was not read from socket. The Beast CVE fix in NSS causes
+ only one octet to be sent in the first socket packet and then the
+ remaining data is sent normally, this is known as 1/n-1 record
+ splitting. The example and test SSL code sent short messages and
+ then did a sock.recv(1024). We had always received the entire
+ message in one sock.recv() call because it was so short. But
+ sock.recv() does not guarantee how much data will be received,
+ thus this was a coding mistake. The solution is straight forward,
+ use newlines as a record separator and call sock.readline()
+ instead of sock.recv(). sock.readline() calls sock.recv()
+ internally until a complete line is read or the socket is closed.
+
+ * Rewrite setup_certs.py, it was written like an expect script
+ reacting to prompts read from a pseudo terminal but it was fragile
+ and would hang on some systems. New version uses temporary
+ password file and writes hardcoded responses to the stdin of
+ certuil and modutil.
+
+ * setup_certs now creates a new sql sytle NSS database (sql:pki)
+
+ * All tests and examples now load the sql:pki database. Command line
+ arg and variable changed from dbdir to db_name to reflect the
+ database specification is no longer just a directory.
+
+ * All command line process in test and examples now uses modern
+ argparse module instead of deprecated getopt and optparse. Some
+ command line args were tweaked.
+
+2013-04-24 John Dennis <jdennis@redhat.com> 0.14.0 External Changes
----------------
The primary enhancements in this version is support of certifcate
diff -N -u -r python-nss-0.14.0/doc/examples/cert_dump.py python-nss-0.14.1/doc/examples/cert_dump.py
--- python-nss-0.14.0/doc/examples/cert_dump.py 2013-04-23 13:36:53.000000000 -0400
+++ python-nss-0.14.1/doc/examples/cert_dump.py 2013-10-08 12:59:24.000000000 -0400
@@ -18,10 +18,10 @@
components of a cert.
'''
+import argparse
+import getpass
import os
import sys
-import getopt
-import getpass
from nss.error import NSPRError
import nss.io as io
@@ -93,57 +93,34 @@
# -----------------------------------------------------------------------------
-usage_str = '''
--p --pem read the certifcate in PEM ascii format (default)
--d --der read the certifcate in DER binary format
--P --print-cert print the cert using the internal rendering code
-'''
-
-def usage():
- print usage_str
-
-try:
- opts, args = getopt.getopt(sys.argv[1:], "hpdP",
- ["help", "pem", "der", "print-cert"])
-except getopt.GetoptError:
- # print help information and exit:
- usage()
- sys.exit(2)
-
-
-filename = 'cert.der'
-is_pem_format = True
-print_cert = False
-
-for o, a in opts:
- if o in ("-H", "--help"):
- usage()
- sys.exit()
- elif o in ("-p", "--pem"):
- is_pem_format = True
- elif o in ("-d", "--der"):
- is_pem_format = False
- elif o in ("-P", "--print-cert"):
- print_cert = True
-
-
-filename = sys.argv[1]
+parser = argparse.ArgumentParser(description='cert formatting example',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument('-f', '--cert-format', choices=['pem', 'der'],
+ help='format of input cert')
+parser.add_argument('-p', '--print-cert', action='store_true',
+ help='print the cert using the internal rendering code')
+parser.add_argument('cert_file', nargs=1,
+ help='input cert file to process')
+
+parser.set_defaults(cert_format='pem',
+ print_cert=False
+ )
+options = parser.parse_args()
# Perform basic configuration and setup
nss.nss_init_nodb()
-if len(args):
- filename = args[0]
+filename = options.cert_file[0]
print "certificate filename=%s" % (filename)
# Read the certificate as DER encoded data
-si = nss.read_der_from_file(filename, is_pem_format)
+si = nss.read_der_from_file(filename, options.cert_format == 'pem')
# Parse the DER encoded data returning a Certificate object
cert = nss.Certificate(si)
# Useful for comparing the internal cert rendering to what this script generates.
-if print_cert:
+if options.print_cert:
print cert
# Get the extension list from the certificate
diff -N -u -r python-nss-0.14.0/doc/examples/httplib_example.py python-nss-0.14.1/doc/examples/httplib_example.py
--- python-nss-0.14.0/doc/examples/httplib_example.py 2013-04-15 13:05:46.000000000 -0400
+++ python-nss-0.14.1/doc/examples/httplib_example.py 2013-10-08 13:04:03.000000000 -0400
@@ -4,13 +4,13 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
-import sys
+import argparse
import errno
-import getopt
-import urlparse
-import httplib
import getpass
+import httplib
import logging
+import sys
+import urlparse
from nss.error import NSPRError
import nss.io as io
@@ -19,14 +19,6 @@
#------------------------------------------------------------------------------
-httplib_debug_level = 0
-logging_debug_level = logging.INFO
-certdir = 'pki'
-password = ''
-nickname = ''
-url = 'https://sourceforge.net/projects/python'
-use_ssl = True
-use_connection_class = True
timeout_secs = 3
#------------------------------------------------------------------------------
@@ -94,28 +86,6 @@
logging.debug('cert valid %s for "%s"', cert_is_valid, cert.subject)
return cert_is_valid
-def client_auth_data_callback(ca_names, chosen_nickname, password, certdb):
- cert = None
- if chosen_nickname:
- try:
- cert = nss.find_cert_from_nickname(chosen_nickname, password)
- priv_key = nss.find_key_by_any_cert(cert, password)
- return cert, priv_key
- except NSPRError:
- return False
- else:
- nicknames = nss.get_cert_nicknames(certdb, nss.SEC_CERT_NICKNAMES_USER)
- for nickname in nicknames:
- try:
- cert = nss.find_cert_from_nickname(nickname, password)
- if cert.check_valid_times():
- if cert.has_signer_in_ca_names(ca_names):
- priv_key = nss.find_key_by_any_cert(cert, password)
- return cert, priv_key
- except NSPRError:
- return False
- return False
-
def password_callback(slot, retry, password):
if not retry and password: return password
return getpass.getpass("Enter password for %s: " % slot.token_name);
@@ -136,7 +106,7 @@
if not dbdir:
raise RuntimeError("dbdir is required")
- logging.debug('%s init %s', self.__class__.__name__, host)
+ logging.debug('%s init host=%s dbdir=%s', self.__class__.__name__, host, dbdir)
if not nss.nss_is_initialized(): nss.nss_init(dbdir)
self.sock = None
ssl.set_domestic_policy()
@@ -231,33 +201,46 @@
#------------------------------------------------------------------------------
-opts, args = getopt.getopt(sys.argv[1:],
- 'Dd:n:w:sScC',
- ['debuglevel','certdir=','nickname=','password=',
- 'use-ssl', 'no-ssl', 'use-connection-class', 'no-connection-class'])
-for o, a in opts:
- if o in('-D', '--httplib_debug_level'):
- httplib_debug_level = httplib_debug_level + 1
- elif o in ("-d", "--certdir"):
- certdir = a
- elif o in ("-n", "--nickname"):
- nickname = a
- elif o in ("-w", "--password"):
- password = a
- elif o in ("-s", "--use-ssl"):
- use_ssl = True
- elif o in ("-S", "--no-ssl"):
- use_ssl = False
- elif o in ("-c", "--use-connection-class"):
- use_connection_class = True
- elif o in ("-C", "--no-connection-class"):
- use_connection_class = False
+parser = argparse.ArgumentParser(description='httplib example')
-if len(args) > 0:
- url = args[0]
+parser.add_argument('-d', '--db-name',
+ help='NSS database name (e.g. "sql:pki")')
+parser.add_argument('--db-passwd',
+ help='NSS database password')
-if httplib_debug_level > 0:
+parser.add_argument('-s', '--ssl', dest='use_ssl', action='store_true',
+ help='use SSL connection')
+
+parser.add_argument('-S', '--no-ssl', dest='use_ssl', action='store_false',
+ help='do not use SSL connection')
+
+parser.add_argument('-c', '--connection-class', dest='use_connection_class', action='store_true',
+ help='use connection class')
+
+parser.add_argument('-C', '--no-connection-class', dest='use_connection_class', action='store_false',
+ help='do not use connection class')
+
+parser.add_argument('-D', '--httplib-debug-level', action='count',
+ help='httplib debug level')
+
+parser.add_argument('url', nargs=1,
+ help='URL to open (e.g. "https://sourceforge.net/projects/python"')
+
+parser.set_defaults(db_name = 'sql:pki',
+ db_passwd = 'db_passwd',
+ httplib_debug_level = 0,
+ use_ssl = True,
+ use_connection_class = True,
+ )
+
+options = parser.parse_args()
+
+
+url = options.url[0]
+
+logging_debug_level = logging.INFO
+if options.httplib_debug_level > 0:
logging_debug_level = logging.DEBUG
else:
logging_debug_level = logging.INFO
@@ -269,7 +252,7 @@
# Perform basic configuration and setup
url_components = urlparse.urlsplit(url)
-if use_ssl:
+if options.use_ssl:
url_components.schema = 'https'
else:
url_components.schema = 'http'
@@ -280,14 +263,14 @@
print "ERROR: bad url \"%s\"" % (url)
sys.exit(1)
-if use_connection_class:
- if use_ssl:
+if options.use_connection_class:
+ if options.use_ssl:
logging.info("Start (using NSSConnection class) %s", url)
- conn = NSSConnection(url_components.netloc, 443, dbdir="/etc/pki/nssdb")
+ conn = NSSConnection(url_components.netloc, 443, dbdir=options.db_name)
else:
logging.info("Start (using NSPRConnection class) %s", url)
conn = NSPRConnection(url_components.netloc, 80)
- conn.set_debuglevel(httplib_debug_level)
+ conn.set_debuglevel(options.httplib_debug_level)
conn.connect()
conn.request("GET", "/")
response = conn.getresponse()
@@ -302,13 +285,13 @@
print data
conn.close()
else:
- if use_ssl:
+ if options.use_ssl:
logging.info("Start (using NSSHTTPS class) %s", url)
- h = NSSHTTPS(url_components.netloc, 443, dbdir="/etc/pki/nssdb")
+ h = NSSHTTPS(url_components.netloc, 443, dbdir=options.db_name)
else:
logging.info("Start (using NSPRHTTP class) %s", url)
h = NSPRHTTP(url_components.netloc, 80)
- h.set_debuglevel(httplib_debug_level)
+ h.set_debuglevel(options.httplib_debug_level)
h.connect()
h.putrequest('GET', '/')
h.endheaders()
diff -N -u -r python-nss-0.14.0/doc/examples/ssl_example.py python-nss-0.14.1/doc/examples/ssl_example.py
--- python-nss-0.14.0/doc/examples/ssl_example.py 2013-04-15 13:05:46.000000000 -0400
+++ python-nss-0.14.1/doc/examples/ssl_example.py 2013-10-09 00:07:54.000000000 -0400
@@ -7,10 +7,10 @@
import warnings
warnings.simplefilter( "always", DeprecationWarning)
+import argparse
+import getpass
import os
import sys
-import getopt
-import getpass
from nss.error import NSPRError
import nss.io as io
@@ -24,19 +24,7 @@
REQUEST_CLIENT_CERT_ALWAYS = 3
REQUIRE_CLIENT_CERT_ALWAYS = 4
-# command line parameters, default them to something reasonable
-client = False
-server = False
-password = 'db_passwd'
-use_ssl = True
-client_cert_action = NO_CLIENT_CERT
-certdir = 'pki'
-hostname = os.uname()[1]
-server_nickname = 'test_server'
-client_nickname = 'test_user'
-port = 1234
timeout_secs = 3
-family = io.PR_AF_UNSPEC
# -----------------------------------------------------------------------------
# Utility Functions
@@ -63,7 +51,7 @@
if pin_args is None:
pin_args = ()
- print "cert:\n%s" % cert
+ print "peer cert:\n%s" % cert
# Define how the cert is being used based upon the is_server flag. This may
# seem backwards, but isn't. If we're a server we're trying to validate a
@@ -149,30 +137,30 @@
valid_addr = False
# Get the IP Address of our server
try:
- addr_info = io.AddrInfo(hostname)
+ addr_info = io.AddrInfo(options.hostname)
except Exception, e:
- print "could not resolve host address \"%s\"" % hostname
+ print "could not resolve host address \"%s\"" % options.hostname
return
for net_addr in addr_info:
- if family != io.PR_AF_UNSPEC:
- if net_addr.family != family: continue
- net_addr.port = port
+ if options.family != io.PR_AF_UNSPEC:
+ if net_addr.family != options.family: continue
+ net_addr.port = options.port
- if use_ssl:
+ if options.use_ssl:
sock = ssl.SSLSocket(net_addr.family)
# Set client SSL socket options
sock.set_ssl_option(ssl.SSL_SECURITY, True)
sock.set_ssl_option(ssl.SSL_HANDSHAKE_AS_CLIENT, True)
- sock.set_hostname(hostname)
+ sock.set_hostname(options.hostname)
# Provide a callback which notifies us when the SSL handshake is complete
sock.set_handshake_callback(handshake_callback)
# Provide a callback to supply our client certificate info
- sock.set_client_auth_data_callback(client_auth_data_callback, client_nickname,
- password, nss.get_default_certdb())
+ sock.set_client_auth_data_callback(client_auth_data_callback, options.client_nickname,
+ options.password, nss.get_default_certdb())
# Provide a callback to verify the servers certificate
sock.set_auth_certificate_callback(auth_certificate_callback,
@@ -192,17 +180,18 @@
if not valid_addr:
print "Could not establish valid address for \"%s\" in family %s" % \
- (hostname, io.addr_family_name(family))
+ (options.hostname, io.addr_family_name(options.family))
return
# Talk to the server
try:
- sock.send("Hello")
- buf = sock.recv(1024)
+ sock.send('Hello' + '\n') # newline is protocol record separator
+ buf = sock.readline()
if not buf:
print "client lost connection"
sock.close()
return
+ buf = buf.rstrip() # remove newline record separator
print "client received: %s" % (buf)
except Exception, e:
print e.strerror
@@ -221,7 +210,7 @@
try:
sock.close()
- if use_ssl:
+ if options.use_ssl:
ssl.clear_session_cache()
except Exception, e:
print e
@@ -231,36 +220,34 @@
# -----------------------------------------------------------------------------
def Server():
- global family
-
- # Perform basic SSL server configuration
- ssl.set_default_cipher_pref(ssl.SSL_RSA_WITH_NULL_MD5, True)
- ssl.config_server_session_id_cache()
-
- # Get our certificate and private key
- server_cert = nss.find_cert_from_nickname(server_nickname, password)
- priv_key = nss.find_key_by_any_cert(server_cert, password)
- server_cert_kea = server_cert.find_kea_type();
-
- print "server cert:\n%s" % server_cert
-
# Setup an IP Address to listen on any of our interfaces
- if family == io.PR_AF_UNSPEC:
- family = io.PR_AF_INET
- net_addr = io.NetworkAddress(io.PR_IpAddrAny, port, family)
+ if options.family == io.PR_AF_UNSPEC:
+ options.family = io.PR_AF_INET
+ net_addr = io.NetworkAddress(io.PR_IpAddrAny, options.port, options.family)
+
+ if options.use_ssl:
+ # Perform basic SSL server configuration
+ ssl.set_default_cipher_pref(ssl.SSL_RSA_WITH_NULL_MD5, True)
+ ssl.config_server_session_id_cache()
+
+ # Get our certificate and private key
+ server_cert = nss.find_cert_from_nickname(options.server_nickname, options.password)
+ priv_key = nss.find_key_by_any_cert(server_cert, options.password)
+ server_cert_kea = server_cert.find_kea_type();
+
+ print "server cert:\n%s" % server_cert
- if use_ssl:
sock = ssl.SSLSocket(net_addr.family)
# Set server SSL socket options
- sock.set_pkcs11_pin_arg(password)
+ sock.set_pkcs11_pin_arg(options.password)
sock.set_ssl_option(ssl.SSL_SECURITY, True)
sock.set_ssl_option(ssl.SSL_HANDSHAKE_AS_SERVER, True)
# If we're doing client authentication then set it up
- if client_cert_action >= REQUEST_CLIENT_CERT_ONCE:
+ if options.client_cert_action >= REQUEST_CLIENT_CERT_ONCE:
sock.set_ssl_option(ssl.SSL_REQUEST_CERTIFICATE, True)
- if client_cert_action == REQUIRE_CLIENT_CERT_ONCE:
+ if options.client_cert_action == REQUIRE_CLIENT_CERT_ONCE:
sock.set_ssl_option(ssl.SSL_REQUIRE_CERTIFICATE, True)
sock.set_auth_certificate_callback(auth_certificate_callback, nss.get_default_certdb())
@@ -278,7 +265,7 @@
while True:
# Accept a connection from a client
client_sock, client_addr = sock.accept()
- if use_ssl:
+ if options.use_ssl:
client_sock.set_handshake_callback(handshake_callback)
print "client connect from: %s" % (client_addr)
@@ -286,14 +273,15 @@
while True:
try:
# Handle the client connection
- buf = client_sock.recv(1024)
+ buf = client_sock.readline()
if not buf:
print "server lost lost connection to %s" % (client_addr)
break
+ buf = buf.rstrip() # remove newline record separator
print "server received: %s" % (buf)
- client_sock.send("Goodbye")
+ client_sock.send('Goodbye' + '\n') # newline is protocol record separator
try:
client_sock.shutdown(io.PR_SHUTDOWN_RCV)
client_sock.close()
@@ -308,7 +296,7 @@
try:
sock.shutdown()
sock.close()
- if use_ssl:
+ if options.use_ssl:
ssl.shutdown_server_session_id_cache()
except Exception, e:
print e
@@ -316,141 +304,121 @@
# -----------------------------------------------------------------------------
-usage_str = '''
--C --client run as the client (default: %(client)s)
--S --server run as the server (default: %(server)s)
--d --certdir certificate directory (default: %(certdir)s)
--h --hostname host to connect to (default: %(hostname)s)
--f --family may be inet|inet6|unspec (default: %(family)s)
- if unspec client tries all addresses returned by AddrInfo
- server binds to IPv4 "any" wildcard address
- if inet client tries IPv4 addresses returned by AddrInfo
- server binds to IPv4 "any" wildcard address
- if inet6 client tries IPv6 addresses returned by AddrInfo
- server binds to IPv6 "any" wildcard address
--4 --inet set family to inet (see family)
--6 --inet6 set family to inet6 (see family)
--n --server_nickname server certificate nickname (default: %(server_nickname)s)
--N --client_nickname client certificate nickname (default: %(client_nickname)s)
--w --password certificate database password (default: %(password)s)
--p --port host port (default: %(port)s)
--e --encrypt use SSL (default) (default: %(encrypt)s)
--E --noencrypt don't use SSL (default: %(noencrypt)s)
--f --require_cert_once (default: %(require_cert_once)s)
--F --require_cert_always (default: %(require_cert_always)s)
--r --request_cert_once (default: %(request_cert_once)s)
--R --request_cert_always (default: %(request_cert_always)s)
--H --help
-''' % {
- 'client' : client,
- 'server' : server,
- 'certdir' : certdir,
- 'hostname' : hostname,
- 'family' : io.addr_family_name(family),
- 'server_nickname' : server_nickname,
- 'client_nickname' : client_nickname,
- 'password' : password,
- 'port' : port,
- 'encrypt' : use_ssl is True,
- 'noencrypt' : use_ssl is False,
- 'require_cert_once' : client_cert_action == REQUIRE_CLIENT_CERT_ONCE,
- 'require_cert_always' : client_cert_action == REQUIRE_CLIENT_CERT_ALWAYS,
- 'request_cert_once' : client_cert_action == REQUEST_CLIENT_CERT_ONCE,
- 'request_cert_always' : client_cert_action == REQUEST_CLIENT_CERT_ALWAYS,
- }
-
-def usage():
- print usage_str
-
-try:
- opts, args = getopt.getopt(sys.argv[1:], "Hd:h:f:46n:N:w:p:CSeE",
- ["help", "certdir=", "hostname=",
- "family", "inet", "inet6",
- "server_nickname=", "client_nickname=",
- "password=", "port=",
- "client", "server", "encrypt", "noencrypt",
- "require_cert_once", "require_cert_always",
- "request_cert_once", "request_cert_always",
- ])
-except getopt.GetoptError:
- # print help information and exit:
- usage()
- sys.exit(2)
-
-
-for o, a in opts:
- if o in ("-d", "--certdir"):
- certdir = a
- elif o in ("-h", "--hostname"):
- hostname = a
- elif o in ("-f", "--family"):
- if a == "inet":
+class FamilyArgAction(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ value = values[0]
+ if value == "inet":
family = io.PR_AF_INET
- elif a == "inet6":
+ elif value == "inet6":
family = io.PR_AF_INET6
- elif a == "unspec":
+ elif value == "unspec":
family = io.PR_AF_UNSPEC
else:
- print "unknown address family (%s)" % (a)
- usage()
- sys.exit()
- elif o in ("-4", "--inet"):
- family = io.PR_AF_INET
- elif o in ("-6", "--inet6"):
- family = io.PR_AF_INET6
- elif o in ("-n", "--server_nickname"):
- server_nickname = a
- elif o in ("-N", "--client_nickname"):
- client_nickname = a
- elif o in ("-w", "--password"):
- password = a
- elif o in ("-p", "--port"):
- port = int(a)
- elif o in ("-C", "--client"):
- client = True
- elif o in ("-S", "--server"):
- server = True
- elif o in ("-e", "--encrypt"):
- use_ssl = True
- elif o in ("-E", "--noencrypt"):
- use_ssl = False
- elif o in ("--require_cert_once"):
- client_cert_action = REQUIRE_CLIENT_CERT_ONCE
- elif o in ("--require_cert_always"):
- client_cert_action = REQUIRE_CLIENT_CERT_ALWAYS
- elif o in ("--request_cert_once"):
- client_cert_action = REQUEST_CLIENT_CERT_ONCE
- elif o in ("--request_cert_always"):
- client_cert_action = REQUEST_CLIENT_CERT_ALWAYS
- elif o in ("-H", "--help"):
- usage()
- sys.exit()
- else:
- usage()
- sys.exit()
+ raise argparse.ArgumentError(self, "unknown address family (%s)" % (value))
+ setattr(namespace, self.dest, family)
+
+parser = argparse.ArgumentParser(description='SSL example')
+
+parser.add_argument('-C', '--client', action='store_true',
+ help='run as the client')
+
+parser.add_argument('-S', '--server', action='store_true',
+ help='run as the server')
+
+parser.add_argument('-d', '--db-name',
+ help='NSS database name (e.g. "sql:pki")')
+
+parser.add_argument('-H', '--hostname',
+ help='host to connect to')
+
+parser.add_argument('-f', '--family',
+ choices=['unspec', 'inet', 'inet6'],
+ dest='family', action=FamilyArgAction, nargs=1,
+ help='''
+ If unspec client tries all addresses returned by AddrInfo,
+ server binds to IPv4 "any" wildcard address.
+
+ If inet client tries IPv4 addresses returned by AddrInfo,
+ server binds to IPv4 "any" wildcard address.
+
+ If inet6 client tries IPv6 addresses returned by AddrInfo,
+ server binds to IPv6 "any" wildcard address''')
-if client and server:
+parser.add_argument('-4', '--inet',
+ dest='family', action='store_const', const=io.PR_AF_INET,
+ help='set family to inet (see family)')
+
+parser.add_argument('-6', '--inet6',
+ dest='family', action='store_const', const=io.PR_AF_INET6,
+ help='set family to inet6 (see family)')
+
+parser.add_argument('-n', '--server-nickname',
+ help='server certificate nickname')
+
+parser.add_argument('-N', '--client-nickname',
+ help='client certificate nickname')
+
+parser.add_argument('-w', '--password',
+ help='certificate database password')
+
+parser.add_argument('-p', '--port', type=int,
+ help='host port')
+
+parser.add_argument('-e', '--encrypt', dest='use_ssl', action='store_true',
+ help='use SSL connection')
+
+parser.add_argument('-E', '--no-encrypt', dest='use_ssl', action='store_false',
+ help='do not use SSL connection')
+
+parser.add_argument('--require-cert-once', dest='client_cert_action',
+ action='store_const', const=REQUIRE_CLIENT_CERT_ONCE)
+
+parser.add_argument('--require-cert-always', dest='client_cert_action',
+ action='store_const', const=REQUIRE_CLIENT_CERT_ALWAYS)
+
+parser.add_argument('--request-cert-once', dest='client_cert_action',
+ action='store_const', const=REQUEST_CLIENT_CERT_ONCE)
+
+parser.add_argument('--request-cert-always', dest='client_cert_action',
+ action='store_const', const=REQUEST_CLIENT_CERT_ALWAYS)
+
+parser.set_defaults(client = False,
+ server = False,
+ db_name = 'sql:pki',
+ hostname = os.uname()[1],
+ family = io.PR_AF_UNSPEC,
+ server_nickname = 'test_server',
+ client_nickname = 'test_user',
+ password = 'db_passwd',
+ port = 1234,
+ use_ssl = True,
+ client_cert_action = NO_CLIENT_CERT,
+ )
+
+options = parser.parse_args()
+
+if options.client and options.server:
print "can't be both client and server"
sys.exit(1)
-if not (client or server):
+if not (options.client or options.server):
print "must be one of client or server"
sys.exit(1)
# Perform basic configuration and setup
-if certdir is None:
- nss.nss_init_nodb()
+if options.use_ssl:
+ nss.nss_init(options.db_name)
else:
- nss.nss_init(certdir)
+ nss.nss_init_nodb()
ssl.set_domestic_policy()
nss.set_password_callback(password_callback)
# Run as a client or as a server
-if client:
+if options.client:
print "starting as client"
Client()
-if server:
+if options.server:
print "starting as server"
Server()
@@ -458,4 +426,3 @@
nss.nss_shutdown()
except Exception, e:
print e
-
diff -N -u -r python-nss-0.14.0/doc/examples/verify_cert.py python-nss-0.14.1/doc/examples/verify_cert.py
--- python-nss-0.14.0/doc/examples/verify_cert.py 2013-04-15 13:05:46.000000000 -0400
+++ python-nss-0.14.1/doc/examples/verify_cert.py 2013-10-08 13:04:51.000000000 -0400
@@ -1,7 +1,7 @@
#!/usr/bin/python
+import argparse
import sys
-import optparse
import nss.nss as nss
import nss.error as nss_error
@@ -75,75 +75,74 @@
lines.extend(nss.make_line_fmt_tuples(level, msg))
lines.extend(obj.format_lines(level+1))
return nss.indented_format(lines)
-
+
#-------------------------------------------------------------------------------
def main():
- # Command line argument processing
- parser = optparse.OptionParser()
-
- parser.set_defaults(dbdir = '/etc/pki/nssdb',
- db_passwd = 'db_passwd',
- input_format = 'pem',
- check_sig = True,
- print_cert = False,
- with_log = True,
- check_ca = True,
- )
+ global options
- param_group = optparse.OptionGroup(parser, 'NSS Database',
- 'Specify & control the NSS Database')
+ parser = argparse.ArgumentParser(description='certificate validation example')
- param_group.add_option('-d', '--dbdir', dest='dbdir',
- help='NSS database directory, default="%default"')
- param_group.add_option('-P', '--db-passwd', dest='db_passwd',
- help='NSS database password, default="%default"')
+ # === NSS Database Group ===
+ group = parser.add_argument_group('NSS Database',
+ 'Specify & control the NSS Database')
+ group.add_argument('-d', '--db-name',
+ help='NSS database name (e.g. "sql:pki")')
- parser.add_option_group(param_group)
+ group.add_argument('-P', '--db-passwd',
+ help='NSS database password')
- param_group = optparse.OptionGroup(parser, 'Certificate',
- 'Specify how the certificate is loaded')
+ # === Certificate Group ===
+ group = parser.add_argument_group('Certificate',
+ 'Specify how the certificate is loaded')
- param_group.add_option('-f', '--file', dest='cert_filename',
- help='read cert from file')
- param_group.add_option('--format', dest='input_format', choices=['pem', 'der'],
- help='import format for certificate (der|pem) default="%default"')
- param_group.add_option('-n', '--nickname', dest='cert_nickname',
- help='load cert from NSS database by looking it up under this nickname')
+ group.add_argument('-f', '--file', dest='cert_filename',
+ help='read cert from file')
+ group.add_argument('-F', '--input-format', choices=['pem', 'der'],
+ help='format of input cert')
- parser.add_option_group(param_group)
+ group.add_argument('-n', '--nickname', dest='cert_nickname',
+ help='load cert from NSS database by looking it up under this nickname')
- param_group = optparse.OptionGroup(parser, 'Validation',
- 'Control the validation')
+ # === Validation Group ===
+ group = parser.add_argument_group('Validation',
+ 'Control the validation')
- param_group.add_option('-u', '--usage', dest='cert_usage', action='append', choices=cert_usage_map.keys(),
- help='may be specified multiple times, default="CheckAllUsages", may be one of: %s' % ', '.join(sorted(cert_usage_map.keys())))
- param_group.add_option('-c', '--check-sig', action='store_true', dest='check_sig',
- help='check signature default=%default')
- param_group.add_option('-C', '--no-check-sig', action='store_false', dest='check_sig',
+ group.add_argument('-u', '--usage', dest='cert_usage', action='append', choices=cert_usage_map.keys(),
+ help='certificate usage flags, may be specified multiple times')
+ group.add_argument('-c', '--check-sig', action='store_true', dest='check_sig',
help='check signature')
- param_group.add_option('-l', '--log', action='store_true', dest='with_log',
- help='use verify log, default=%default')
- param_group.add_option('-L', '--no-log', action='store_false', dest='with_log',
- help='use verify log, default=%default')
- param_group.add_option('-a', '--check-ca', action='store_true', dest='check_ca',
- help='check if cert is CA, default=%default')
- param_group.add_option('-A', '--no-check-ca', action='store_false', dest='check_ca',
- help='check if cert is CA, default=%default')
-
- parser.add_option_group(param_group)
-
- param_group = optparse.OptionGroup(parser, 'Miscellaneous',
- 'Miscellaneous options')
+ group.add_argument('-C', '--no-check-sig', action='store_false', dest='check_sig',
+ help='do not check signature')
+ group.add_argument('-l', '--log', action='store_true', dest='with_log',
+ help='use verify log')
+ group.add_argument('-L', '--no-log', action='store_false', dest='with_log',
+ help='do not use verify log')
+ group.add_argument('-a', '--check-ca', action='store_true', dest='check_ca',
+ help='check if cert is CA')
+ group.add_argument('-A', '--no-check-ca', action='store_false', dest='check_ca',
+ help='do not check if cert is CA')
+
+ # === Miscellaneous Group ===
+ group = parser.add_argument_group('Miscellaneous',
+ 'Miscellaneous options')
- param_group.add_option('-p', '--print-cert', action='store_true', dest='print_cert',
- help='print the certificate in a friendly fashion, default=%default')
+ group.add_argument('-p', '--print-cert', action='store_true', dest='print_cert',
+ help='print the certificate in a friendly fashion')
- parser.add_option_group(param_group)
- options, args = parser.parse_args()
+ parser.set_defaults(db_name = 'sql:pki',
+ db_passwd = 'db_passwd',
+ input_format = 'pem',
+ check_sig = True,
+ with_log = True,
+ check_ca = True,
+ print_cert = False,
+ )
+
+ options = parser.parse_args()
# Process the command line arguments
@@ -175,9 +174,9 @@
return 1
# Initialize NSS.
- print indented_output('NSS Database', options.dbdir)
+ print indented_output('NSS Database', options.db_name)
print
- nss.nss_init(options.dbdir)
+ nss.nss_init(options.db_name)
certdb = nss.get_default_certdb()
nss.set_password_callback(password_callback)
@@ -194,7 +193,7 @@
except Exception, e:
print e
print >>sys.stderr, 'Unable to load cert nickname "%s" from database "%s"' % \
- (options.cert_nickname, options.dbdir)
+ (options.cert_nickname, options.db_name)
return 1
# Dump the cert if the user wants to see it
@@ -282,5 +281,3 @@
#-------------------------------------------------------------------------------
if __name__ == "__main__":
sys.exit(main())
-
-
diff -N -u -r python-nss-0.14.0/doc/examples/verify_server.py python-nss-0.14.1/doc/examples/verify_server.py
--- python-nss-0.14.0/doc/examples/verify_server.py 2013-04-15 13:05:46.000000000 -0400
+++ python-nss-0.14.1/doc/examples/verify_server.py 2013-10-08 13:00:37.000000000 -0400
@@ -4,10 +4,10 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+import argparse
+import getpass
import os
import sys
-import getopt
-import getpass
from nss.error import NSPRError
import nss.io as io
@@ -16,11 +16,6 @@
# -----------------------------------------------------------------------------
-# command line parameters, default them to something reasonable
-#certdir = '/etc/httpd/alias'
-certdir = '/etc/pki/nssdb'
-hostname = 'www.verisign.com'
-port = 443
timeout_secs = 3
request = '''\
@@ -104,18 +99,18 @@
valid_addr = False
# Get the IP Address of our server
try:
- addr_info = io.AddrInfo(hostname)
+ addr_info = io.AddrInfo(options.hostname)
except:
- print "ERROR: could not resolve hostname \"%s\"" % hostname
+ print "ERROR: could not resolve hostname \"%s\"" % options.hostname
return
for net_addr in addr_info:
- net_addr.port = port
+ net_addr.port = options.port
sock = ssl.SSLSocket(net_addr.family)
# Set client SSL socket options
sock.set_ssl_option(ssl.SSL_SECURITY, True)
sock.set_ssl_option(ssl.SSL_HANDSHAKE_AS_CLIENT, True)
- sock.set_hostname(hostname)
+ sock.set_hostname(options.hostname)
# Provide a callback which notifies us when the SSL handshake is
# complete
@@ -135,7 +130,7 @@
continue
if not valid_addr:
- print "ERROR: could not connect to \"%s\"" % hostname
+ print "ERROR: could not connect to \"%s\"" % options.hostname
return
try:
@@ -158,48 +153,31 @@
# -----------------------------------------------------------------------------
-usage_str = '''
--d --certdir certificate directory (default: %(certdir)s)
--h --hostname host to connect to (default: %(hostname)s)
--p --port host port (default: %(port)s)
-''' % {
- 'certdir' : certdir,
- 'hostname' : hostname,
- 'port' : port,
- }
+parser = argparse.ArgumentParser(description='certificate verification example',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
-def usage():
- print usage_str
+parser.add_argument('-d', '--db-name',
+ help='NSS database name (e.g. "sql:pki")')
-try:
- opts, args = getopt.getopt(sys.argv[1:], "Hd:h:p:",
- ["help", "certdir=", "hostname=",
- "port=",
- ])
-except getopt.GetoptError:
- # print help information and exit:
- usage()
- sys.exit(2)
-
-
-for o, a in opts:
- if o in ("-d", "--certdir"):
- certdir = a
- if o in ("-h", "--hostname"):
- hostname = a
- if o in ("-p", "--port"):
- port = int(a)
- if o in ("-H", "--help"):
- usage()
- sys.exit()
+parser.add_argument('-H', '--hostname',
+ help='host to connect to')
+
+parser.add_argument('-p', '--port', type=int,
+ help='host port')
+
+parser.set_defaults(db_name = 'sql:pki',
+ hostname = 'www.verisign.com',
+ port = 443,
+ )
+
+options = parser.parse_args()
# Perform basic configuration and setup
try:
- nss.nss_init(certdir)
+ nss.nss_init(options.db_name)
ssl.set_domestic_policy()
except Exception, e:
print >>sys.stderr, e.strerror
sys.exit(1)
client()
-
diff -N -u -r python-nss-0.14.0/src/__init__.py python-nss-0.14.1/src/__init__.py
--- python-nss-0.14.0/src/__init__.py 2013-04-24 16:30:26.000000000 -0400
+++ python-nss-0.14.1/src/__init__.py 2013-10-08 14:38:28.000000000 -0400
@@ -163,8 +163,8 @@
- Initialize NSS and indicate the certficate database (CertDB)::
- certdir = './pki'
- ssl.nssinit(certdir)
+ db_name = 'sql:pki'
+ ssl.nssinit(db_name)
- If you are implementing an SSL server call config_secure_server()
(see ssl_example.py)::
@@ -244,7 +244,7 @@
future we can find a solution but the immediate goal of the NSS
Python binding was to expose NSS through Python, not necessarily
to solve the larger integration issue of Python run-time and NSPR
- run-time.
+ run-time.
- NSPR would like to hide the underlying platform socket (in the
NSPR code this is called "osfd"). There are NSPR API's which
@@ -312,5 +312,4 @@
To be added
"""
-__version__ = '0.14.0'
-
+__version__ = '0.14.1'
diff -N -u -r python-nss-0.14.0/test/run_tests python-nss-0.14.1/test/run_tests
--- python-nss-0.14.0/test/run_tests 2013-04-30 11:03:54.000000000 -0400
+++ python-nss-0.14.1/test/run_tests 2013-10-08 12:57:56.000000000 -0400
@@ -1,21 +1,13 @@
#!/usr/bin/python
-import getopt
-import sys
+import argparse
import os
+import sys
import unittest
from util import get_build_dir
#-------------------------------------------------------------------------------
-prog_name = os.path.basename(sys.argv[0])
-
-config = {
- 'in_tree' : True,
-}
-
-#-------------------------------------------------------------------------------
-
def run_tests():
import setup_certs
@@ -23,9 +15,11 @@
import test_cipher
import test_digest
import test_pkcs12
+ import test_misc
+ import test_ocsp
import test_client_server
- setup_certs.setup_certs()
+ setup_certs.setup_certs([])
loader = unittest.TestLoader()
runner = unittest.TextTestRunner()
@@ -43,62 +37,19 @@
#-------------------------------------------------------------------------------
-class Usage(Exception):
- def __init__(self, msg):
- self.msg = msg
-
-def usage():
- 'Print command help.'
-
- return '''\
-%(prog_name)s [-i]
-
--h --help print help
--i --installed runs the test using installed libraries
- instead of "in tree" libraries
-
-Runs unit tests.
-By default test is done "in tree".
-
-Examples:
-
-Run test using libraries built in this tree
-%(prog_name)s
-
-Run post install test
-%(prog_name)s -i
-''' % {'prog_name' : prog_name,
- }
-
-#-------------------------------------------------------------------------------
+def main():
+ parser = argparse.ArgumentParser(description='run the units (installed or in tree)')
+ parser.add_argument('-i', '--installed', action='store_false', dest='in_tree',
+ help='run tests using installed library')
+ parser.add_argument('-t', '--in-tree', action='store_true', dest='in_tree',
+ help='run tests using devel tree')
-def main(argv=None):
- if argv is None:
- argv = sys.argv
-
- try:
- try:
- opts, args = getopt.getopt(argv[1:], 'hi',
- ['help', 'installed',])
- except getopt.GetoptError, e:
- raise Usage(e)
- return 2
-
- for o, a in opts:
- if o in ('-h', '--help'):
- print >>sys.stdout, usage()
- return 0
- elif o in ('-i', '--installed'):
- config['in_tree'] = False
- else:
- raise Usage("command argument '%s' not handled, internal error" % o)
- except Usage, e:
- print >>sys.stderr, e.msg
- print >>sys.stderr, "for help use --help"
- return 2
+ parser.set_defaults(in_tree = False,
+ )
+ options = parser.parse_args()
- if config['in_tree']:
+ if options.in_tree:
# Run the tests "in the tree"
# Rather than testing with installed versions run the test
# with the package built in this tree.
diff -N -u -r python-nss-0.14.0/test/setup_certs.py python-nss-0.14.1/test/setup_certs.py
--- python-nss-0.14.0/test/setup_certs.py 2013-04-18 12:28:05.000000000 -0400
+++ python-nss-0.14.1/test/setup_certs.py 2013-10-17 11:07:09.000000000 -0400
@@ -1,345 +1,506 @@
#!/usr/bin/python
-import traceback
-import getopt
-import sys
-import os
-import errno
+import argparse
+import atexit
import logging
-import subprocess
+import os
import shutil
-import shlex
-import pty
-import tty
-import re
-import time
-
-#-------------------------------------------------------------------------------
-
-__all__ = ["config", "setup_certs"]
-
-if __name__ == '__main__':
- prog_name = os.path.basename(sys.argv[0])
-else:
- prog_name = 'setup_certs'
-
-serial_number = 0
-hostname = os.uname()[1]
-client_username = 'test_user'
-
-config = {
- 'verbose' : False,
- 'debug' : False,
- 'logfile' : 'setup_certs.log',
- 'log_level' : logging.WARN,
- 'interactive' : sys.stdout.isatty(),
- 'dbdir' : os.path.join(os.path.dirname(sys.argv[0]), 'pki'),
- 'db_passwd' : 'db_passwd',
- 'noise_file' : 'noise_file',
- 'ca_subject' : 'CN=Test CA',
- 'ca_nickname' : 'test_ca',
- 'server_subject' : 'CN=%s' % hostname,
- 'server_nickname' : 'test_server',
- 'client_subject' : 'CN=%s' % client_username,
- 'client_nickname' : client_username,
-}
+import subprocess
+import sys
+from string import Template
+import tempfile
#-------------------------------------------------------------------------------
class CmdError(Exception):
- def __init__(self, cmd, exit_code, msg):
- self.cmd = cmd
- self.exit_code = exit_code
- self.msg = msg
+ def __init__(self, cmd_args, returncode, message=None, stdout=None, stderr=None):
+ self.cmd_args = cmd_args
+ self.returncode = returncode
+ if message is None:
+ self.message = 'Failed error=%s, ' % (returncode)
+ if stderr:
+ self.message += '"%s", ' % stderr
+ self.message += 'args=%s' % (cmd_args)
+ else:
+ self.message = message
+ self.stdout = stdout
+ self.stderr = stderr
def __str__(self):
- return "Command \"%s\"\nFailed with exit code = %s\nOutput was:\n%s\n" % \
- (self.cmd, self.exit_code, self.msg)
+ return self.message
-#-------------------------------------------------------------------------------
-def next_serial():
- global serial_number
- serial_number += 1
- return serial_number
+def run_cmd(cmd_args, input=None):
+ logging.debug(' '.join(cmd_args))
+ try:
+ p = subprocess.Popen(cmd_args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate(input)
+ returncode = p.returncode
+ if returncode != 0:
+ raise CmdError(cmd_args, returncode,
+ 'failed %s' % (', '.join(cmd_args)),
+ stdout, stderr)
+ return stdout, stderr
+ except OSError as e:
+ raise CmdError(cmd_args, e.errno, stderr=str(e))
+
+def exit_handler(options):
+ logging.debug('in exit handler')
+
+ if options.passwd_filename is not None:
+ logging.debug('removing passwd_filename=%s', options.passwd_filename)
+ os.remove(options.passwd_filename)
+
+ if options.noise_filename is not None:
+ logging.debug('removing noise_filename=%s', options.noise_filename)
+ os.remove(options.noise_filename)
+
+def write_serial(options, serial_number):
+ with open(options.serial_file, 'w') as f:
+ f.write('%x\n' % serial_number)
+
+
+def read_serial(options):
+ if not os.path.exists(options.serial_file):
+ write_serial(options, options.serial_number)
-def create_noise_file():
- """
- Generate a noise file to be used when creating a key
- """
- if os.path.exists(config['noise_file']):
- os.remove(config['noise_file'])
-
- f = open(config['noise_file'], "w")
- f.write(os.urandom(40))
- f.close()
+ with open(options.serial_file) as f:
+ serial_number = int(f.readline(), 16)
+ return serial_number
- return
-def run_cmd(cmd, input=None):
- logging.debug("running command: %s", cmd)
+def init_noise_file(options):
+ '''Generate a noise file to be used when creating a key
- if input is None:
- stdin = None
- else:
- stdin = subprocess.PIPE
+ We create a temporary file on first use and continue to use
+ the same temporary file for the duration of this process.
+ Each time this function is called it writes new random data
+ into the file.
+ '''
+ random_data = os.urandom(40)
- p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate(input)
- status = p.returncode
- if config['verbose']:
- logging.debug("cmd status = %s", status)
- logging.debug("cmd stdout = %s", stdout)
- logging.debug("cmd stderr = %s", stderr)
- return status, stdout, stderr
-
-def run_cmd_with_prompts(cmd, prompts):
- logging.debug('running command: %s', cmd)
-
- argv = shlex.split(cmd)
-
- pid, master_fd = pty.fork()
- if pid == 0:
- os.execlp(argv[0], *argv)
-
- time.sleep(0.1) # FIXME: why is this necessary?
- output = ''
- search_position = 0
- cur_prompt = 0
- if cur_prompt < len(prompts):
- prompt_re = re.compile(prompts[cur_prompt][0])
- response = prompts[cur_prompt][1]
- cur_prompt += 1
+ if options.noise_filename is None:
+ fd, options.noise_filename = tempfile.mkstemp()
+ os.write(fd, random_data)
+ os.close(fd)
else:
- prompt_re = None
- response = None
-
- while True:
- try:
- new_data = os.read(master_fd, 1024)
- except OSError, e:
- if e.errno == errno.EIO: # process exited
- break
- else:
- raise
- if len(new_data) == 0:
- break # EOF
- output += new_data
- logging.debug('output="%s"', output[search_position:]);
- if prompt_re is not None:
- logging.debug('search pattern = "%s"', prompt_re.pattern)
- match = prompt_re.search(output, search_position)
- if match:
- search_position = match.end()
- parsed = output[match.start() : match.end()]
- logging.debug('found prompt: "%s"', parsed)
- logging.debug('writing response: "%s"', response)
- os.write(master_fd, response)
-
- if cur_prompt < len(prompts):
- prompt_re = re.compile(prompts[cur_prompt][0])
- response = prompts[cur_prompt][1]
- cur_prompt += 1
- else:
- prompt_re = None
- response = None
-
-
- exit_value = os.waitpid(pid, 0)[1]
- exit_signal = exit_value & 0xFF
- exit_code = exit_value >> 8
- #logging.debug('output="%s"' % output)
- logging.debug('cmd signal=%s, exit_code=%s' % (exit_signal, exit_code))
-
- return exit_code, output
+ with open(options.noise_filename, 'w') as f:
+ f.write(random_data)
+ return
+def create_passwd_file(options):
+ fd, options.passwd_filename = tempfile.mkstemp()
+ os.write(fd, options.db_passwd)
+ os.close(fd)
-#-------------------------------------------------------------------------------
-def setup_certs():
- print 'setting up certs ...'
-
- if os.path.exists(config['dbdir']):
- shutil.rmtree(config['dbdir'])
- os.makedirs(config['dbdir'])
+def db_has_cert(options, nickname):
+ cmd_args = ['/usr/bin/certutil',
+ '-d', options.db_name,
+ '-L',
+ '-n', nickname]
try:
+ run_cmd(cmd_args)
+ except CmdError as e:
+ if e.returncode == 255 and 'not found' in e.stderr:
+ return False
+ else:
+ raise
+ return True
+
+def format_cert(options, nickname):
+ cmd_args = ['/usr/bin/certutil',
+ '-L', # OPERATION: list
+ '-d', options.db_name, # NSS database
+ '-f', options.passwd_filename, # database password in file
+ '-n', nickname, # nickname of cert to list
+ ]
- create_noise_file()
-
- # 1. Create the database
- cmd = 'certutil -N -d %(dbdir)s' % config
- exit_code, output = run_cmd_with_prompts(cmd,
- [('Enter new password:\s*', config['db_passwd'] + '\n'),
- ('Re-enter password:\s*', config['db_passwd'] + '\n')])
- if exit_code != 0:
- raise CmdError(cmd, exit_code, output)
-
- # 2. Create a root CA certificate
- config['serial_number'] = next_serial()
- cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -s "%(ca_subject)s" -n "%(ca_nickname)s" -x -t "CTu,C,C" -m %(serial_number)d' % config
- exit_code, output = run_cmd_with_prompts(cmd,
- [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')])
- if exit_code != 0:
- raise CmdError(cmd, exit_code, output)
-
- # 3. Create a server certificate and sign it.
- config['serial_number'] = next_serial()
- cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(ca_nickname)s -s "%(server_subject)s" -n "%(server_nickname)s" -t "u,u,u" -m %(serial_number)d' % config
- exit_code, output = run_cmd_with_prompts(cmd,
- [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')])
- if exit_code != 0:
- raise CmdError(cmd, exit_code, output)
-
- # 4. Create a client certificate and sign it.
- config['serial_number'] = next_serial()
- cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(ca_nickname)s -s "%(client_subject)s" -n "%(client_nickname)s" -t "u,u,u" -m %(serial_number)d' % config
- exit_code, output = run_cmd_with_prompts(cmd,
- [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')])
- if exit_code != 0:
- raise CmdError(cmd, exit_code, output)
-
- # 5. Import public root CA's
- cmd = 'modutil -dbdir %(dbdir)s -add ca_certs -libfile libnssckbi.so' % config
- exit_code, stdout, stderr = run_cmd(cmd)
- if exit_code != 0:
- raise CmdError(cmd, exit_code, output)
-
- # 6. Create a sub CA certificate
- config['serial_number'] = next_serial()
- config['subca_subject'] = 'CN=subca'
- config['subca_nickname'] = 'subca'
- cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(ca_nickname)s -s "%(subca_subject)s" -n "%(subca_nickname)s" -t "CTu,C,C" -m %(serial_number)d' % config
- exit_code, output = run_cmd_with_prompts(cmd,
- [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')])
- if exit_code != 0:
- raise CmdError(cmd, exit_code, output)
-
- # 7. Create a server certificate and sign it with the subca.
- config['serial_number'] = next_serial()
- config['server_subject'] = config['server_subject'] + "_" + config['subca_nickname']
- config['server_nickname'] = config['server_nickname'] + "_" + config['subca_nickname']
- cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(subca_nickname)s -s "%(server_subject)s" -n "%(server_nickname)s" -t "u,u,u" -m %(serial_number)d' % config
- exit_code, output = run_cmd_with_prompts(cmd,
- [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')])
- if exit_code != 0:
- raise CmdError(cmd, exit_code, output)
-
- finally:
- if os.path.exists(config['noise_file']):
- os.remove(config['noise_file'])
-
- logging.info('certifcate database password="%(db_passwd)s"', config)
- logging.info('CA nickname="%(ca_nickname)s", CA subject="%(ca_subject)s"', config)
- logging.info('server nickname="%(server_nickname)s", server subject="%(server_subject)s"', config)
- logging.info('client nickname="%(client_nickname)s", client subject="%(client_subject)s"', config)
+ stdout, stderr = run_cmd(cmd_args)
+ return stdout
#-------------------------------------------------------------------------------
-class Usage(Exception):
- def __init__(self, msg):
- self.msg = msg
+def create_database(options):
+ if os.path.exists(options.db_dir) and not os.path.isdir(options.db_dir):
+ raise ValueError('db_dir "%s" exists but is not a directory' % options.db_dir)
+
+ # Create resources
+ create_passwd_file(options)
+
+ if options.clean:
+ logging.info('Creating clean database directory: "%s"', options.db_dir)
+
+ if os.path.exists(options.db_dir):
+ shutil.rmtree(options.db_dir)
+ os.makedirs(options.db_dir)
+
+ cmd_args = ['/usr/bin/certutil',
+ '-N', # OPERATION: create database
+ '-d', options.db_name, # NSS database
+ '-f', options.passwd_filename, # database password in file
+ ]
-def usage():
- '''
- Print command help.
- '''
+ stdout, stderr = run_cmd(cmd_args)
+ else:
+ logging.info('Using existing database directory: "%s"', options.db_dir)
+
+def create_ca_cert(options):
+ serial_number = read_serial(options)
+ init_noise_file(options)
+
+ logging.info('creating ca cert: subject="%s", nickname="%s"',
+ options.ca_subject, options.ca_nickname)
+
+ cmd_args = ['/usr/bin/certutil',
+ '-S', # OPERATION: create signed cert
+ '-x', # self-sign the cert
+ '-d', options.db_name, # NSS database
+ '-f', options.passwd_filename, # database password in file
+ '-n', options.ca_nickname, # nickname of cert being created
+ '-s', options.ca_subject, # subject of cert being created
+ '-g', str(options.key_size), # keysize
+ '-t', 'CT,,CT', # trust
+ '-1', # add key usage extension
+ '-2', # add basic contraints extension
+ '-5', # add certificate type extension
+ '-m', str(serial_number), # cert serial number
+ '-v', str(options.valid_months), # validity in months
+ '-z', options.noise_filename, # noise file random seed
+ ]
+
+ # Provide input for extension creation prompting
+ input = ''
+
+ # >> Key Usage extension <<
+ # 0 - Digital Signature
+ # 1 - Non-repudiation
+ # 2 - Key encipherment
+ # 3 - Data encipherment
+ # 4 - Key agreement
+ # 5 - Cert signing key
+ # 6 - CRL signing key
+ # Other to finish
+ input += '0\n1\n5\n100\n'
+ # Is this a critical extension [y/N]?
+ input += 'y\n'
+
+ # >> Basic Constraints extension <<
+ # Is this a CA certificate [y/N]?
+ input += 'y\n'
+ # Enter the path length constraint, enter to skip [<0 for unlimited path]: > 2
+ input += '%d\n' % options.ca_path_len
+ # Is this a critical extension [y/N]?
+ input += 'y\n'
+
+ # >> NS Cert Type extension <<
+ # 0 - SSL Client
+ # 1 - SSL Server
+ # 2 - S/MIME
+ # 3 - Object Signing
+ # 4 - Reserved for future use
+ # 5 - SSL CA
+ # 6 - S/MIME CA
+ # 7 - Object Signing CA
+ # Other to finish
+ input += '5\n6\n7\n100\n'
+ # Is this a critical extension [y/N]?
+ input += 'n\n'
+
+ stdout, stderr = run_cmd(cmd_args, input)
+ write_serial(options, serial_number + 1)
+
+ return options.ca_nickname
+
+def create_server_cert(options):
+ serial_number = read_serial(options)
+ init_noise_file(options)
+
+ logging.info('creating server cert: subject="%s", nickname="%s"',
+ options.server_subject, options.server_nickname)
+
+ cmd_args = ['/usr/bin/certutil',
+ '-S', # OPERATION: create signed cert
+ '-d', options.db_name, # NSS database
+ '-f', options.passwd_filename, # database password in file
+ '-c', options.ca_nickname, # nickname of CA used to sign this cert
+ '-n', options.server_nickname, # nickname of cert being created
+ '-s', options.server_subject, # subject of cert being created
+ '-g', str(options.key_size), # keysize
+ '-t', 'u,u,u', # trust
+ '-5', # add certificate type extensionn
+ '-m', str(serial_number), # cert serial number
+ '-v', str(options.valid_months), # validity in months
+ '-z', options.noise_filename, # noise file random seed
+ ]
+
+ # Provide input for extension creation prompting
+ input = ''
+
+ # >> NS Cert Type extension <<
+ # 0 - SSL Client
+ # 1 - SSL Server
+ # 2 - S/MIME
+ # 3 - Object Signing
+ # 4 - Reserved for future use
+ # 5 - SSL CA
+ # 6 - S/MIME CA
+ # 7 - Object Signing CA
+ # Other to finish
+ input += '1\n100\n'
+ # Is this a critical extension [y/N]?
+ input += 'n\n'
+
+ stdout, stderr = run_cmd(cmd_args, input)
+ write_serial(options, serial_number + 1)
+
+ return options.server_nickname
+
+def create_client_cert(options):
+ serial_number = read_serial(options)
+ init_noise_file(options)
+
+ logging.info('creating client cert: subject="%s", nickname="%s"',
+ options.client_subject, options.client_nickname)
+
+ cmd_args = ['/usr/bin/certutil',
+ '-S', # OPERATION: create signed cert
+ '-d', options.db_name, # NSS database
+ '-f', options.passwd_filename, # database password in file
+ '-c', options.ca_nickname, # nickname of CA used to sign this cert
+ '-n', options.client_nickname, # nickname of cert being created
+ '-s', options.client_subject, # subject of cert being created
+ '-g', str(options.key_size), # keysize
+ '-t', 'u,u,u', # trust
+ '-5', # add certificate type extensionn
+ '-m', str(serial_number), # cert serial number
+ '-v', str(options.valid_months), # validity in months
+ '-z', options.noise_filename, # noise file random seed
+ ]
+
+ # Provide input for extension creation prompting
+ input = ''
+
+ # >> NS Cert Type extension <<
+ # 0 - SSL Client
+ # 1 - SSL Server
+ # 2 - S/MIME
+ # 3 - Object Signing
+ # 4 - Reserved for future use
+ # 5 - SSL CA
+ # 6 - S/MIME CA
+ # 7 - Object Signing CA
+ # Other to finish
+ input += '0\n100\n'
+ # Is this a critical extension [y/N]?
+ input += 'n\n'
+
+ stdout, stderr = run_cmd(cmd_args, input)
+ write_serial(options, serial_number + 1)
+
+ return options.client_nickname
+
+def add_trusted_certs(options):
+ name = 'ca_certs'
+ module = 'libnssckbi.so'
+ logging.info('adding system trusted certs: name="%s" module="%s"',
+ name, module)
+
+ cmd_args = ['/usr/bin/modutil',
+ '-dbdir', options.db_name, # NSS database
+ '-add', name, # module name
+ '-libfile', module, # module
+ ]
- return '''\
--h --help print help
--l --log-level level set the logging level, may be one of:
- debug, info, warn, error, critical
--L --logfile filename log to this file, empty string disables logging to a file
--v --verbose be chatty
--D --debug show run information
--w --password set the certificate database password
--d --dbdir set the datbase directory
--s --server-subject set the server's subject
-
-Examples:
-
-%(prog_name)s -m 10
-''' % {'prog_name' : prog_name,
- }
+ run_cmd(cmd_args)
+ return name
#-------------------------------------------------------------------------------
-def main(argv=None):
- if argv is None:
- argv = sys.argv
+def setup_certs(args):
- try:
- try:
- opts, args = getopt.getopt(argv[1:], 'hl:L:vDw:d:s:',
- ['help', 'logfile=', 'verbose', 'debug',
- 'password', 'dbdir', 'server-subject'])
- except getopt.GetoptError, e:
- raise Usage(e)
- return 2
-
- for o, a in opts:
- if o in ('-h', '--help'):
- print >>sys.stdout, usage()
- return 0
- elif o in ('-L', '--logfile'):
- if not a:
- config['logfile'] = None
- else:
- config['logfile'] = a
- elif o in ('-l', '--log-level'):
- if a.upper() in logging._levelNames:
- config['log_level'] = logging._levelNames[a.upper()]
- else:
- print >>sys.stderr, "ERROR: unknown log-level '%s'" % a
- elif o in ('-v', '--verbose'):
- config['verbose'] = True
- elif o in ('-D', '--debug'):
- config['debug'] = True
- elif o in ('-w', '--password'):
- config['db_passwd'] = a
- elif o in ('-d', '--dbdir'):
- config['dbdir'] = a
- elif o in ('-s', '--server-subject'):
- config['server_subject'] = 'CN=%s' % a
- else:
- raise Usage("command argument '%s' not handled, internal error" % o)
- except Usage, e:
- print >>sys.stderr, e.msg
- print >>sys.stderr, "for help use --help"
- return 2
-
- if config['verbose']:
- config['log_level'] = logging.INFO
- if config['debug']:
- config['log_level'] = logging.DEBUG
+ # --- cmd ---
+ parser = argparse.ArgumentParser(description='create certs for testing',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+ parser.add_argument('--verbose', action='store_true',
+ help='provide info level messages')
+
+ parser.add_argument('--debug', action='store_true',
+ help='provide debug level messages')
+
+ parser.add_argument('--quiet', action='store_true',
+ help='do not display any messages')
+
+ parser.add_argument('--show-certs', action='store_true',
+ help='show the certificate details')
+
+ parser.add_argument('--no-clean', action='store_false', dest='clean',
+ help='do not remove existing db_dir')
+
+ parser.add_argument('--no-trusted-certs', dest='add_trusted_certs', action='store_false',
+ help='do not add trusted certs')
+
+ parser.add_argument('--hostname',
+ help='hostname used in cert subjects')
+
+ parser.add_argument('--db-type',
+ choices=['sql', ''],
+ help='NSS database type')
+
+ parser.add_argument('--db-dir',
+ help='NSS database directory')
+
+ parser.add_argument('--db-passwd',
+ help='NSS database password')
+
+ parser.add_argument('--ca-subject',
+ help='CA certificate subject')
+
+ parser.add_argument('--ca-nickname',
+ help='CA certificate nickname')
+
+ parser.add_argument('--server-subject',
+ help='server certificate subject')
+
+ parser.add_argument('--server-nickname',
+ help='server certificate nickname')
+
+ parser.add_argument('--client-username',
+ help='client user name, used in client cert subject')
+
+ parser.add_argument('--client-subject',
+ help='client certificate subject')
+
+ parser.add_argument('--client-nickname',
+ help='client certificate nickname')
+
+ parser.add_argument('--serial-number', type=int,
+ help='starting serial number for certificates')
+
+ parser.add_argument('--valid-months', dest='valid_months', type=int,
+ help='validity period in months')
+ parser.add_argument('--path-len', dest='ca_path_len', type=int,
+ help='basic constraints path length')
+ parser.add_argument('--key-type', dest='key_type',
+ help='key type, either rsa or dsa')
+ parser.add_argument('--key-size', dest='key_size',
+ help='number of bits in key (must be multiple of 8)')
+ parser.add_argument('--serial-file', dest='serial_file',
+ help='name of file used to track next serial number')
+
+ parser.set_defaults(verbose = False,
+ debug = False,
+ quiet = False,
+ show_certs = False,
+ clean = True,
+ add_trusted_certs = True,
+ hostname = os.uname()[1],
+ db_type = 'sql',
+ db_dir = 'pki',
+ db_passwd = 'db_passwd',
+ ca_subject = 'CN=Test CA',
+ ca_nickname = 'test_ca',
+ server_subject = 'CN=${hostname}',
+ server_nickname = 'test_server',
+ client_username = 'test_user',
+ client_subject = 'CN=${client_username}',
+ client_nickname = '${client_username}',
+ serial_number = 1,
+ key_type = 'rsa',
+ key_size = 1024,
+ valid_months = 12,
+ ca_path_len = 2,
+ serial_file = '${db_dir}/serial',
+ )
+
+
+ options = parser.parse_args(args)
+
+ # Do substitutions on option values.
+ # This is ugly because argparse does not expose an API which permits iterating over
+ # the contents of options nor a way to get the options as a dict, ugh :-(
+ # So we access options.__dict__ directly.
+ for key in options.__dict__.keys():
+ # Assume options never begin with underscore
+ if key.startswith('_'):
+ continue
+ value = getattr(options, key)
+ # Can't substitue on non-string values
+ if not isinstance(value, basestring):
+ continue
+ # Don't bother trying to substitute if $ substitution character isn't present
+ if '$' not in value:
+ continue
+ setattr(options, key, Template(value).substitute(options.__dict__))
+
+ # Set up logging
+ log_level = logging.INFO
+ if options.quiet:
+ log_level = logging.ERROR
+ if options.verbose:
+ log_level = logging.INFO
+ if options.debug:
+ log_level = logging.DEBUG
# Initialize logging
- logging.basicConfig(level=config['log_level'],
- format='%(asctime)s %(levelname)-8s %(message)s',
- datefmt='%m-%d %H:%M',
- filename=config['logfile'],
- filemode='a')
-
- if config['interactive']:
- # Create a seperate logger for the console
- console_logger = logging.StreamHandler()
- console_logger.setLevel(config['log_level'])
- # set a format which is simpler for console use
- formatter = logging.Formatter('%(message)s')
- console_logger.setFormatter(formatter)
- # add the handler to the root logger
- logging.getLogger('').addHandler(console_logger)
+ logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s')
+ logger = logging.getLogger()
+
+ # Synthesize some useful options derived from specified options
+ if options.db_type == '':
+ options.db_name = options.db_dir
+ else:
+ options.db_name = '%s:%s' % (options.db_type, options.db_dir)
+ options.passwd_filename = None
+ options.noise_filename = None
+
+ # Set function to clean up on exit, bind fuction with options
+ def exit_handler_with_options():
+ exit_handler(options)
+ atexit.register(exit_handler_with_options)
+
+ cert_nicknames = []
try:
- setup_certs()
- except Exception, e:
- logging.error(traceback.format_exc())
- logging.error(str(e))
+ create_database(options)
+ cert_nicknames.append(create_ca_cert(options))
+ cert_nicknames.append(create_server_cert(options))
+ cert_nicknames.append(create_client_cert(options))
+ if options.add_trusted_certs:
+ add_trusted_certs(options)
+ except CmdError as e:
+ logging.error(e.message)
+ logging.error(e.stderr)
return 1
+ if options.show_certs:
+ if logger.getEffectiveLevel() > logging.INFO:
+ logger.setLevel(logging.INFO)
+ for nickname in cert_nicknames:
+ logging.info('Certificate nickname "%s"\n%s',
+ nickname, format_cert(options, nickname))
+
+ logging.info('---------- Summary ----------')
+ logging.info('NSS database name="%s", password="%s"',
+ options.db_name, options.db_passwd)
+ logging.info('CA nickname="%s", CA subject="%s"',
+ options.ca_nickname, options.ca_subject)
+ logging.info('server nickname="%s", server subject="%s"',
+ options.server_nickname, options.server_subject)
+ logging.info('client nickname="%s", client subject="%s"',
+ options.client_nickname, options.client_subject)
+
return 0
#-------------------------------------------------------------------------------
+def main():
+ return setup_certs(None)
+
if __name__ == '__main__':
sys.exit(main())
diff -N -u -r python-nss-0.14.0/test/test_client_server.py python-nss-0.14.1/test/test_client_server.py
--- python-nss-0.14.0/test/test_client_server.py 2013-04-30 13:00:23.000000000 -0400
+++ python-nss-0.14.1/test/test_client_server.py 2013-10-08 09:35:53.000000000 -0400
@@ -25,13 +25,12 @@
password = 'db_passwd'
use_ssl = True
client_cert_action = NO_CLIENT_CERT
-certdir = os.path.join(os.path.dirname(sys.argv[0]), 'pki')
+db_name = 'sql:pki'
hostname = os.uname()[1]
server_nickname = 'test_server'
client_nickname = 'test_user'
port = 1234
timeout_secs = 10
-family = io.PR_AF_INET
sleep_time = 5
@@ -142,7 +141,6 @@
if info: print "client: using SSL"
ssl.set_domestic_policy()
- valid_addr = False
# Get the IP Address of our server
try:
addr_info = io.AddrInfo(hostname)
@@ -151,8 +149,6 @@
return
for net_addr in addr_info:
- if family != io.PR_AF_UNSPEC:
- if net_addr.family != family: continue
net_addr.port = port
if use_ssl:
@@ -180,26 +176,21 @@
if verbose: print "client trying connection to: %s" % (net_addr)
sock.connect(net_addr, timeout=io.seconds_to_interval(timeout_secs))
if verbose: print "client connected to: %s" % (net_addr)
- valid_addr = True
break
except Exception, e:
sock.close()
print >>sys.stderr, "client: connection to: %s failed (%s)" % (net_addr, e)
- if not valid_addr:
- print >>sys.stderr, "Could not establish valid address for \"%s\" in family %s" % \
- (hostname, io.addr_family_name(family))
- return
-
# Talk to the server
try:
if info: print "client: sending \"%s\"" % (request)
- sock.send(request)
- buf = sock.recv(1024)
+ sock.send(request + '\n') # newline is protocol record separator
+ buf = sock.readline()
if not buf:
print >>sys.stderr, "client: lost connection"
sock.close()
return
+ buf = buf.rstrip() # remove newline record separator
if info: print "client: received \"%s\"" % (buf)
except Exception, e:
print >>sys.stderr, "client: %s" % e
@@ -228,15 +219,11 @@
# -----------------------------------------------------------------------------
def server():
- global family
-
if verbose: print "starting server:"
# Initialize
# Setup an IP Address to listen on any of our interfaces
- if family == io.PR_AF_UNSPEC:
- family = io.PR_AF_INET
- net_addr = io.NetworkAddress(io.PR_IpAddrAny, port, family)
+ net_addr = io.NetworkAddress(io.PR_IpAddrAny, port)
if use_ssl:
if info: print "server: using SSL"
@@ -290,15 +277,16 @@
while True:
try:
# Handle the client connection
- buf = client_sock.recv(1024)
+ buf = client_sock.readline() # newline is protocol record separator
if not buf:
print >>sys.stderr, "server: lost lost connection to %s" % (client_addr)
break
+ buf = buf.rstrip() # remove newline record separator
if info: print "server: received \"%s\"" % (buf)
- reply = "{%s}" % buf # echo
+ reply = "{%s}" % buf # echo embedded inside braces
if info: print "server: sending \"%s\"" % (reply)
- client_sock.send(reply) # echo
+ client_sock.send(reply + '\n') # send echo with record separator
time.sleep(sleep_time)
client_sock.shutdown()
@@ -320,7 +308,7 @@
def run_server():
pid = os.fork()
if pid == 0:
- nss.nss_init(certdir)
+ nss.nss_init(db_name)
server()
nss.nss_shutdown()
time.sleep(sleep_time)
@@ -348,7 +336,7 @@
def test_ssl(self):
request = "foo"
- nss.nss_init(certdir)
+ nss.nss_init(db_name)
reply = client(request)
nss.nss_shutdown()
self.assertEqual("{%s}" % request, reply)
diff -N -u -r python-nss-0.14.0/test/test_ocsp.py python-nss-0.14.1/test/test_ocsp.py
--- python-nss-0.14.0/test/test_ocsp.py 2013-04-24 12:36:07.000000000 -0400
+++ python-nss-0.14.1/test/test_ocsp.py 2013-10-08 09:09:18.000000000 -0400
@@ -7,7 +7,7 @@
import nss.nss as nss
from nss.error import NSPRError
-certdir = 'pki'
+db_name = 'sql:pki'
#-------------------------------------------------------------------------------
@@ -16,7 +16,7 @@
class TestAPI(unittest.TestCase):
def setUp(self):
- nss.nss_init_read_write(certdir)
+ nss.nss_init_read_write(db_name)
self.certdb = nss.get_default_certdb()
def tearDown(self):
diff -N -u -r python-nss-0.14.0/test/test_pkcs12.py python-nss-0.14.1/test/test_pkcs12.py
--- python-nss-0.14.0/test/test_pkcs12.py 2013-04-15 13:05:46.000000000 -0400
+++ python-nss-0.14.1/test/test_pkcs12.py 2013-10-17 10:29:09.000000000 -0400
@@ -2,6 +2,7 @@
import sys
import os
+import re
import subprocess
import shlex
import unittest
@@ -13,30 +14,55 @@
#-------------------------------------------------------------------------------
verbose = False
-certdir = 'pki'
+db_name = 'sql:pki'
db_passwd = 'db_passwd'
-pkcs12_file_password = 'pk12_passwd'
+pk12_passwd = 'pk12_passwd'
-read_nickname = 'test_user'
-read_pkcs12_file = '%s.p12' % read_nickname
-
-write_export_file = False
-export_nickname = 'test_server'
+cert_nickname = 'test_user'
+pk12_filename = '%s.p12' % cert_nickname
+exported_pk12_filename = 'exported_%s' % pk12_filename
#-------------------------------------------------------------------------------
-def run_cmd(cmd):
- if verbose: print "running command: %s" % cmd
-
- args = shlex.split(cmd)
- subprocess.check_call(args, stdout=subprocess.PIPE)
+class CmdError(Exception):
+ def __init__(self, cmd_args, returncode, message=None, stdout=None, stderr=None):
+ self.cmd_args = cmd_args
+ self.returncode = returncode
+ if message is None:
+ self.message = 'Failed error=%s, ' % (returncode)
+ if stderr:
+ self.message += '"%s", ' % stderr
+ self.message += 'args=%s' % (cmd_args)
+ else:
+ self.message = message
+ self.stdout = stdout
+ self.stderr = stderr
+
+ def __str__(self):
+ return self.message
+
+
+def run_cmd(cmd_args, input=None):
+ try:
+ p = subprocess.Popen(cmd_args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate(input)
+ returncode = p.returncode
+ if returncode != 0:
+ raise CmdError(cmd_args, returncode,
+ 'failed %s' % (', '.join(cmd_args)),
+ stdout, stderr)
+ return stdout, stderr
+ except OSError as e:
+ raise CmdError(cmd_args, e.errno, stderr=str(e))
#-------------------------------------------------------------------------------
def password_callback(slot, retry):
return db_passwd
-#-------------------------------------------------------------------------------
def nickname_collision_callback(old_nickname, cert):
cancel = False
@@ -44,6 +70,51 @@
return new_nickname, cancel
+def get_cert_der_from_db(nickname):
+ cmd_args = ['/usr/bin/certutil',
+ '-d', db_name,
+ '-L',
+ '-n', nickname]
+
+ try:
+ stdout, stderr = run_cmd(cmd_args)
+ except CmdError as e:
+ if e.returncode == 255 and 'not found' in e.stderr:
+ return None
+ else:
+ raise
+ return stdout
+
+def delete_cert_from_db(nickname):
+ cmd_args = ['/usr/bin/certutil',
+ '-d', db_name,
+ '-D',
+ '-n', nickname]
+
+ run_cmd(cmd_args)
+
+def create_pk12(nickname, filename):
+ cmd_args = ['/usr/bin/pk12util',
+ '-o', filename,
+ '-n', nickname,
+ '-d', db_name,
+ '-K', db_passwd,
+ '-W', pk12_passwd]
+ run_cmd(cmd_args)
+
+def list_pk12(filename):
+ cmd_args = ['/usr/bin/pk12util',
+ '-l', filename,
+ '-W', pk12_passwd]
+ stdout, stderr = run_cmd(cmd_args)
+ return stdout
+
+def strip_key_from_pk12_listing(text):
+ match = re.search(r'^Certificate:$', text, re.MULTILINE)
+ if not match:
+ raise ValueError('Could not file Key section in pk12 listing')
+ return text[match.start(0):]
+
#-------------------------------------------------------------------------------
def load_tests(loader, tests, pattern):
@@ -59,22 +130,23 @@
class TestPKCS12Decoder(unittest.TestCase):
def setUp(self):
- nss.nss_init_read_write(certdir)
+ nss.nss_init_read_write(db_name)
nss.set_password_callback(password_callback)
nss.pkcs12_set_nickname_collision_callback(nickname_collision_callback)
nss.pkcs12_enable_all_ciphers()
+ self.cert_der = get_cert_der_from_db(cert_nickname)
+ if self.cert_der is None:
+ raise ValueError('cert with nickname "%s" not in database "%s"' % (cert_nickname, db_name))
def tearDown(self):
nss.nss_shutdown()
def test_read(self):
if verbose: print "test_read"
- cmd='pk12util -o %s -n %s -d pki -K %s -W %s' % \
- (read_pkcs12_file, read_nickname, db_passwd, pkcs12_file_password)
- run_cmd(cmd)
+ create_pk12(cert_nickname, pk12_filename)
slot = nss.get_internal_key_slot()
- pkcs12 = nss.PKCS12Decoder(read_pkcs12_file, pkcs12_file_password, slot)
+ pkcs12 = nss.PKCS12Decoder(pk12_filename, pk12_passwd, slot)
self.assertEqual(len(pkcs12), 3)
cert_bag_count = 0
@@ -102,33 +174,43 @@
def test_import(self):
if verbose: print "test_import"
- cmd='certutil -d pki -D -n %s' % (read_nickname)
- run_cmd(cmd)
+ delete_cert_from_db(cert_nickname)
+ self.assertEqual(get_cert_der_from_db(cert_nickname), None)
slot = nss.get_internal_key_slot()
- pkcs12 = nss.PKCS12Decoder(read_pkcs12_file, pkcs12_file_password, slot)
+ pkcs12 = nss.PKCS12Decoder(pk12_filename, pk12_passwd, slot)
slot.authenticate()
pkcs12.database_import()
+ cert_der = get_cert_der_from_db(cert_nickname)
+ self.assertEqual(cert_der, self.cert_der)
#-------------------------------------------------------------------------------
class TestPKCS12Export(unittest.TestCase):
def setUp(self):
- nss.nss_init(certdir)
+ nss.nss_init(db_name)
nss.set_password_callback(password_callback)
nss.pkcs12_enable_all_ciphers()
+ self.cert_der = get_cert_der_from_db(cert_nickname)
+ if self.cert_der is None:
+ raise ValueError('cert with nickname "%s" not in database "%s"' % (cert_nickname, db_name))
def tearDown(self):
nss.nss_shutdown()
def test_export(self):
if verbose: print "test_export"
- pkcs12_data = nss.pkcs12_export(export_nickname, pkcs12_file_password)
- if write_export_file:
- p12_file_path = os.path.join(os.path.dirname(sys.argv[0]), "%s.p12" % export_nickname)
- f = open(p12_file_path, 'w')
+ pkcs12_data = nss.pkcs12_export(cert_nickname, pk12_passwd)
+ with open(exported_pk12_filename, 'w') as f:
f.write(pkcs12_data)
- f.close()
+
+ pk12_listing = list_pk12(pk12_filename)
+ pk12_listing = strip_key_from_pk12_listing(pk12_listing)
+
+ exported_pk12_listing = list_pk12(exported_pk12_filename)
+ exported_pk12_listing = strip_key_from_pk12_listing(exported_pk12_listing)
+
+ self.assertEqual(pk12_listing, exported_pk12_listing)
if __name__ == '__main__':
unittest.main()
diff -N -u -r python-nss-0.14.0/test/util.py python-nss-0.14.1/test/util.py
--- python-nss-0.14.0/test/util.py 1969-12-31 19:00:00.000000000 -0500
+++ python-nss-0.14.1/test/util.py 2013-06-13 09:58:32.000000000 -0400
@@ -0,0 +1,40 @@
+import sys
+import os
+from distutils.util import get_platform
+
+def get_build_dir():
+ '''
+ Walk from the current directory up until a directory is found
+ which contains a regular file called "setup.py" and a directory
+ called "build". If found return the fully qualified path to
+ the build directory's platform specific directory, this is where
+ the architecture specific build produced by setup.py is located.
+
+ There is no API in distutils to return the platform specific
+ directory so we use as much as distutils exposes, the rest was
+ determined by looking at the source code for distutils.
+
+ If the build directory cannont be found in the tree None is returned.
+ '''
+ cwd = os.getcwd()
+ path_components = cwd.split('/')
+ while (len(path_components)):
+ path = os.path.join('/', *path_components)
+ setup_path = os.path.join(path, 'setup.py')
+ build_path = os.path.join(path, 'build')
+ # Does this directory contain the file "setup.py" and the directory "build"?
+ if os.path.exists(setup_path) and os.path.exists(build_path) and \
+ os.path.isfile(setup_path) and os.path.isdir(build_path):
+ # Found, return the path contentated with the architecture
+ # specific build directory
+ platform_specifier = "lib.%s-%s" % (get_platform(), sys.version[0:3])
+ return os.path.join(build_path, platform_specifier)
+
+ # Not found, ascend to parent directory and try again
+ path_components.pop()
+
+ # Failed to find the build directory
+ return None
+
+def insert_build_dir_into_path():
+ sys.path.insert(0,get_build_dir())