diff -N -u -r python-nss-0.14.0/doc/ChangeLog python-nss-0.14.1/doc/ChangeLog --- python-nss-0.14.0/doc/ChangeLog 2013-05-02 17:29:27.000000000 -0400 +++ python-nss-0.14.1/doc/ChangeLog 2013-10-09 09:35:29.000000000 -0400 @@ -1,5 +1,37 @@ -2013-04-24 John Dennis 0.14.0 - External Changes +2013-10-09 John Dennis 0.14.1 + + Modifications only to tests and examples. + + * Fix bug in ssl_example.py and test_client_server.py where complete + data was not read from socket. The Beast CVE fix in NSS causes + only one octet to be sent in the first socket packet and then the + remaining data is sent normally, this is known as 1/n-1 record + splitting. The example and test SSL code sent short messages and + then did a sock.recv(1024). We had always received the entire + message in one sock.recv() call because it was so short. But + sock.recv() does not guarantee how much data will be received, + thus this was a coding mistake. The solution is straight forward, + use newlines as a record separator and call sock.readline() + instead of sock.recv(). sock.readline() calls sock.recv() + internally until a complete line is read or the socket is closed. + + * Rewrite setup_certs.py, it was written like an expect script + reacting to prompts read from a pseudo terminal but it was fragile + and would hang on some systems. New version uses temporary + password file and writes hardcoded responses to the stdin of + certuil and modutil. + + * setup_certs now creates a new sql sytle NSS database (sql:pki) + + * All tests and examples now load the sql:pki database. Command line + arg and variable changed from dbdir to db_name to reflect the + database specification is no longer just a directory. + + * All command line process in test and examples now uses modern + argparse module instead of deprecated getopt and optparse. Some + command line args were tweaked. + +2013-04-24 John Dennis 0.14.0 External Changes ---------------- The primary enhancements in this version is support of certifcate diff -N -u -r python-nss-0.14.0/doc/examples/cert_dump.py python-nss-0.14.1/doc/examples/cert_dump.py --- python-nss-0.14.0/doc/examples/cert_dump.py 2013-04-23 13:36:53.000000000 -0400 +++ python-nss-0.14.1/doc/examples/cert_dump.py 2013-10-08 12:59:24.000000000 -0400 @@ -18,10 +18,10 @@ components of a cert. ''' +import argparse +import getpass import os import sys -import getopt -import getpass from nss.error import NSPRError import nss.io as io @@ -93,57 +93,34 @@ # ----------------------------------------------------------------------------- -usage_str = ''' --p --pem read the certifcate in PEM ascii format (default) --d --der read the certifcate in DER binary format --P --print-cert print the cert using the internal rendering code -''' - -def usage(): - print usage_str - -try: - opts, args = getopt.getopt(sys.argv[1:], "hpdP", - ["help", "pem", "der", "print-cert"]) -except getopt.GetoptError: - # print help information and exit: - usage() - sys.exit(2) - - -filename = 'cert.der' -is_pem_format = True -print_cert = False - -for o, a in opts: - if o in ("-H", "--help"): - usage() - sys.exit() - elif o in ("-p", "--pem"): - is_pem_format = True - elif o in ("-d", "--der"): - is_pem_format = False - elif o in ("-P", "--print-cert"): - print_cert = True - - -filename = sys.argv[1] +parser = argparse.ArgumentParser(description='cert formatting example', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('-f', '--cert-format', choices=['pem', 'der'], + help='format of input cert') +parser.add_argument('-p', '--print-cert', action='store_true', + help='print the cert using the internal rendering code') +parser.add_argument('cert_file', nargs=1, + help='input cert file to process') + +parser.set_defaults(cert_format='pem', + print_cert=False + ) +options = parser.parse_args() # Perform basic configuration and setup nss.nss_init_nodb() -if len(args): - filename = args[0] +filename = options.cert_file[0] print "certificate filename=%s" % (filename) # Read the certificate as DER encoded data -si = nss.read_der_from_file(filename, is_pem_format) +si = nss.read_der_from_file(filename, options.cert_format == 'pem') # Parse the DER encoded data returning a Certificate object cert = nss.Certificate(si) # Useful for comparing the internal cert rendering to what this script generates. -if print_cert: +if options.print_cert: print cert # Get the extension list from the certificate diff -N -u -r python-nss-0.14.0/doc/examples/httplib_example.py python-nss-0.14.1/doc/examples/httplib_example.py --- python-nss-0.14.0/doc/examples/httplib_example.py 2013-04-15 13:05:46.000000000 -0400 +++ python-nss-0.14.1/doc/examples/httplib_example.py 2013-10-08 13:04:03.000000000 -0400 @@ -4,13 +4,13 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. -import sys +import argparse import errno -import getopt -import urlparse -import httplib import getpass +import httplib import logging +import sys +import urlparse from nss.error import NSPRError import nss.io as io @@ -19,14 +19,6 @@ #------------------------------------------------------------------------------ -httplib_debug_level = 0 -logging_debug_level = logging.INFO -certdir = 'pki' -password = '' -nickname = '' -url = 'https://sourceforge.net/projects/python' -use_ssl = True -use_connection_class = True timeout_secs = 3 #------------------------------------------------------------------------------ @@ -94,28 +86,6 @@ logging.debug('cert valid %s for "%s"', cert_is_valid, cert.subject) return cert_is_valid -def client_auth_data_callback(ca_names, chosen_nickname, password, certdb): - cert = None - if chosen_nickname: - try: - cert = nss.find_cert_from_nickname(chosen_nickname, password) - priv_key = nss.find_key_by_any_cert(cert, password) - return cert, priv_key - except NSPRError: - return False - else: - nicknames = nss.get_cert_nicknames(certdb, nss.SEC_CERT_NICKNAMES_USER) - for nickname in nicknames: - try: - cert = nss.find_cert_from_nickname(nickname, password) - if cert.check_valid_times(): - if cert.has_signer_in_ca_names(ca_names): - priv_key = nss.find_key_by_any_cert(cert, password) - return cert, priv_key - except NSPRError: - return False - return False - def password_callback(slot, retry, password): if not retry and password: return password return getpass.getpass("Enter password for %s: " % slot.token_name); @@ -136,7 +106,7 @@ if not dbdir: raise RuntimeError("dbdir is required") - logging.debug('%s init %s', self.__class__.__name__, host) + logging.debug('%s init host=%s dbdir=%s', self.__class__.__name__, host, dbdir) if not nss.nss_is_initialized(): nss.nss_init(dbdir) self.sock = None ssl.set_domestic_policy() @@ -231,33 +201,46 @@ #------------------------------------------------------------------------------ -opts, args = getopt.getopt(sys.argv[1:], - 'Dd:n:w:sScC', - ['debuglevel','certdir=','nickname=','password=', - 'use-ssl', 'no-ssl', 'use-connection-class', 'no-connection-class']) -for o, a in opts: - if o in('-D', '--httplib_debug_level'): - httplib_debug_level = httplib_debug_level + 1 - elif o in ("-d", "--certdir"): - certdir = a - elif o in ("-n", "--nickname"): - nickname = a - elif o in ("-w", "--password"): - password = a - elif o in ("-s", "--use-ssl"): - use_ssl = True - elif o in ("-S", "--no-ssl"): - use_ssl = False - elif o in ("-c", "--use-connection-class"): - use_connection_class = True - elif o in ("-C", "--no-connection-class"): - use_connection_class = False +parser = argparse.ArgumentParser(description='httplib example') -if len(args) > 0: - url = args[0] +parser.add_argument('-d', '--db-name', + help='NSS database name (e.g. "sql:pki")') +parser.add_argument('--db-passwd', + help='NSS database password') -if httplib_debug_level > 0: +parser.add_argument('-s', '--ssl', dest='use_ssl', action='store_true', + help='use SSL connection') + +parser.add_argument('-S', '--no-ssl', dest='use_ssl', action='store_false', + help='do not use SSL connection') + +parser.add_argument('-c', '--connection-class', dest='use_connection_class', action='store_true', + help='use connection class') + +parser.add_argument('-C', '--no-connection-class', dest='use_connection_class', action='store_false', + help='do not use connection class') + +parser.add_argument('-D', '--httplib-debug-level', action='count', + help='httplib debug level') + +parser.add_argument('url', nargs=1, + help='URL to open (e.g. "https://sourceforge.net/projects/python"') + +parser.set_defaults(db_name = 'sql:pki', + db_passwd = 'db_passwd', + httplib_debug_level = 0, + use_ssl = True, + use_connection_class = True, + ) + +options = parser.parse_args() + + +url = options.url[0] + +logging_debug_level = logging.INFO +if options.httplib_debug_level > 0: logging_debug_level = logging.DEBUG else: logging_debug_level = logging.INFO @@ -269,7 +252,7 @@ # Perform basic configuration and setup url_components = urlparse.urlsplit(url) -if use_ssl: +if options.use_ssl: url_components.schema = 'https' else: url_components.schema = 'http' @@ -280,14 +263,14 @@ print "ERROR: bad url \"%s\"" % (url) sys.exit(1) -if use_connection_class: - if use_ssl: +if options.use_connection_class: + if options.use_ssl: logging.info("Start (using NSSConnection class) %s", url) - conn = NSSConnection(url_components.netloc, 443, dbdir="/etc/pki/nssdb") + conn = NSSConnection(url_components.netloc, 443, dbdir=options.db_name) else: logging.info("Start (using NSPRConnection class) %s", url) conn = NSPRConnection(url_components.netloc, 80) - conn.set_debuglevel(httplib_debug_level) + conn.set_debuglevel(options.httplib_debug_level) conn.connect() conn.request("GET", "/") response = conn.getresponse() @@ -302,13 +285,13 @@ print data conn.close() else: - if use_ssl: + if options.use_ssl: logging.info("Start (using NSSHTTPS class) %s", url) - h = NSSHTTPS(url_components.netloc, 443, dbdir="/etc/pki/nssdb") + h = NSSHTTPS(url_components.netloc, 443, dbdir=options.db_name) else: logging.info("Start (using NSPRHTTP class) %s", url) h = NSPRHTTP(url_components.netloc, 80) - h.set_debuglevel(httplib_debug_level) + h.set_debuglevel(options.httplib_debug_level) h.connect() h.putrequest('GET', '/') h.endheaders() diff -N -u -r python-nss-0.14.0/doc/examples/ssl_example.py python-nss-0.14.1/doc/examples/ssl_example.py --- python-nss-0.14.0/doc/examples/ssl_example.py 2013-04-15 13:05:46.000000000 -0400 +++ python-nss-0.14.1/doc/examples/ssl_example.py 2013-10-09 00:07:54.000000000 -0400 @@ -7,10 +7,10 @@ import warnings warnings.simplefilter( "always", DeprecationWarning) +import argparse +import getpass import os import sys -import getopt -import getpass from nss.error import NSPRError import nss.io as io @@ -24,19 +24,7 @@ REQUEST_CLIENT_CERT_ALWAYS = 3 REQUIRE_CLIENT_CERT_ALWAYS = 4 -# command line parameters, default them to something reasonable -client = False -server = False -password = 'db_passwd' -use_ssl = True -client_cert_action = NO_CLIENT_CERT -certdir = 'pki' -hostname = os.uname()[1] -server_nickname = 'test_server' -client_nickname = 'test_user' -port = 1234 timeout_secs = 3 -family = io.PR_AF_UNSPEC # ----------------------------------------------------------------------------- # Utility Functions @@ -63,7 +51,7 @@ if pin_args is None: pin_args = () - print "cert:\n%s" % cert + print "peer cert:\n%s" % cert # Define how the cert is being used based upon the is_server flag. This may # seem backwards, but isn't. If we're a server we're trying to validate a @@ -149,30 +137,30 @@ valid_addr = False # Get the IP Address of our server try: - addr_info = io.AddrInfo(hostname) + addr_info = io.AddrInfo(options.hostname) except Exception, e: - print "could not resolve host address \"%s\"" % hostname + print "could not resolve host address \"%s\"" % options.hostname return for net_addr in addr_info: - if family != io.PR_AF_UNSPEC: - if net_addr.family != family: continue - net_addr.port = port + if options.family != io.PR_AF_UNSPEC: + if net_addr.family != options.family: continue + net_addr.port = options.port - if use_ssl: + if options.use_ssl: sock = ssl.SSLSocket(net_addr.family) # Set client SSL socket options sock.set_ssl_option(ssl.SSL_SECURITY, True) sock.set_ssl_option(ssl.SSL_HANDSHAKE_AS_CLIENT, True) - sock.set_hostname(hostname) + sock.set_hostname(options.hostname) # Provide a callback which notifies us when the SSL handshake is complete sock.set_handshake_callback(handshake_callback) # Provide a callback to supply our client certificate info - sock.set_client_auth_data_callback(client_auth_data_callback, client_nickname, - password, nss.get_default_certdb()) + sock.set_client_auth_data_callback(client_auth_data_callback, options.client_nickname, + options.password, nss.get_default_certdb()) # Provide a callback to verify the servers certificate sock.set_auth_certificate_callback(auth_certificate_callback, @@ -192,17 +180,18 @@ if not valid_addr: print "Could not establish valid address for \"%s\" in family %s" % \ - (hostname, io.addr_family_name(family)) + (options.hostname, io.addr_family_name(options.family)) return # Talk to the server try: - sock.send("Hello") - buf = sock.recv(1024) + sock.send('Hello' + '\n') # newline is protocol record separator + buf = sock.readline() if not buf: print "client lost connection" sock.close() return + buf = buf.rstrip() # remove newline record separator print "client received: %s" % (buf) except Exception, e: print e.strerror @@ -221,7 +210,7 @@ try: sock.close() - if use_ssl: + if options.use_ssl: ssl.clear_session_cache() except Exception, e: print e @@ -231,36 +220,34 @@ # ----------------------------------------------------------------------------- def Server(): - global family - - # Perform basic SSL server configuration - ssl.set_default_cipher_pref(ssl.SSL_RSA_WITH_NULL_MD5, True) - ssl.config_server_session_id_cache() - - # Get our certificate and private key - server_cert = nss.find_cert_from_nickname(server_nickname, password) - priv_key = nss.find_key_by_any_cert(server_cert, password) - server_cert_kea = server_cert.find_kea_type(); - - print "server cert:\n%s" % server_cert - # Setup an IP Address to listen on any of our interfaces - if family == io.PR_AF_UNSPEC: - family = io.PR_AF_INET - net_addr = io.NetworkAddress(io.PR_IpAddrAny, port, family) + if options.family == io.PR_AF_UNSPEC: + options.family = io.PR_AF_INET + net_addr = io.NetworkAddress(io.PR_IpAddrAny, options.port, options.family) + + if options.use_ssl: + # Perform basic SSL server configuration + ssl.set_default_cipher_pref(ssl.SSL_RSA_WITH_NULL_MD5, True) + ssl.config_server_session_id_cache() + + # Get our certificate and private key + server_cert = nss.find_cert_from_nickname(options.server_nickname, options.password) + priv_key = nss.find_key_by_any_cert(server_cert, options.password) + server_cert_kea = server_cert.find_kea_type(); + + print "server cert:\n%s" % server_cert - if use_ssl: sock = ssl.SSLSocket(net_addr.family) # Set server SSL socket options - sock.set_pkcs11_pin_arg(password) + sock.set_pkcs11_pin_arg(options.password) sock.set_ssl_option(ssl.SSL_SECURITY, True) sock.set_ssl_option(ssl.SSL_HANDSHAKE_AS_SERVER, True) # If we're doing client authentication then set it up - if client_cert_action >= REQUEST_CLIENT_CERT_ONCE: + if options.client_cert_action >= REQUEST_CLIENT_CERT_ONCE: sock.set_ssl_option(ssl.SSL_REQUEST_CERTIFICATE, True) - if client_cert_action == REQUIRE_CLIENT_CERT_ONCE: + if options.client_cert_action == REQUIRE_CLIENT_CERT_ONCE: sock.set_ssl_option(ssl.SSL_REQUIRE_CERTIFICATE, True) sock.set_auth_certificate_callback(auth_certificate_callback, nss.get_default_certdb()) @@ -278,7 +265,7 @@ while True: # Accept a connection from a client client_sock, client_addr = sock.accept() - if use_ssl: + if options.use_ssl: client_sock.set_handshake_callback(handshake_callback) print "client connect from: %s" % (client_addr) @@ -286,14 +273,15 @@ while True: try: # Handle the client connection - buf = client_sock.recv(1024) + buf = client_sock.readline() if not buf: print "server lost lost connection to %s" % (client_addr) break + buf = buf.rstrip() # remove newline record separator print "server received: %s" % (buf) - client_sock.send("Goodbye") + client_sock.send('Goodbye' + '\n') # newline is protocol record separator try: client_sock.shutdown(io.PR_SHUTDOWN_RCV) client_sock.close() @@ -308,7 +296,7 @@ try: sock.shutdown() sock.close() - if use_ssl: + if options.use_ssl: ssl.shutdown_server_session_id_cache() except Exception, e: print e @@ -316,141 +304,121 @@ # ----------------------------------------------------------------------------- -usage_str = ''' --C --client run as the client (default: %(client)s) --S --server run as the server (default: %(server)s) --d --certdir certificate directory (default: %(certdir)s) --h --hostname host to connect to (default: %(hostname)s) --f --family may be inet|inet6|unspec (default: %(family)s) - if unspec client tries all addresses returned by AddrInfo - server binds to IPv4 "any" wildcard address - if inet client tries IPv4 addresses returned by AddrInfo - server binds to IPv4 "any" wildcard address - if inet6 client tries IPv6 addresses returned by AddrInfo - server binds to IPv6 "any" wildcard address --4 --inet set family to inet (see family) --6 --inet6 set family to inet6 (see family) --n --server_nickname server certificate nickname (default: %(server_nickname)s) --N --client_nickname client certificate nickname (default: %(client_nickname)s) --w --password certificate database password (default: %(password)s) --p --port host port (default: %(port)s) --e --encrypt use SSL (default) (default: %(encrypt)s) --E --noencrypt don't use SSL (default: %(noencrypt)s) --f --require_cert_once (default: %(require_cert_once)s) --F --require_cert_always (default: %(require_cert_always)s) --r --request_cert_once (default: %(request_cert_once)s) --R --request_cert_always (default: %(request_cert_always)s) --H --help -''' % { - 'client' : client, - 'server' : server, - 'certdir' : certdir, - 'hostname' : hostname, - 'family' : io.addr_family_name(family), - 'server_nickname' : server_nickname, - 'client_nickname' : client_nickname, - 'password' : password, - 'port' : port, - 'encrypt' : use_ssl is True, - 'noencrypt' : use_ssl is False, - 'require_cert_once' : client_cert_action == REQUIRE_CLIENT_CERT_ONCE, - 'require_cert_always' : client_cert_action == REQUIRE_CLIENT_CERT_ALWAYS, - 'request_cert_once' : client_cert_action == REQUEST_CLIENT_CERT_ONCE, - 'request_cert_always' : client_cert_action == REQUEST_CLIENT_CERT_ALWAYS, - } - -def usage(): - print usage_str - -try: - opts, args = getopt.getopt(sys.argv[1:], "Hd:h:f:46n:N:w:p:CSeE", - ["help", "certdir=", "hostname=", - "family", "inet", "inet6", - "server_nickname=", "client_nickname=", - "password=", "port=", - "client", "server", "encrypt", "noencrypt", - "require_cert_once", "require_cert_always", - "request_cert_once", "request_cert_always", - ]) -except getopt.GetoptError: - # print help information and exit: - usage() - sys.exit(2) - - -for o, a in opts: - if o in ("-d", "--certdir"): - certdir = a - elif o in ("-h", "--hostname"): - hostname = a - elif o in ("-f", "--family"): - if a == "inet": +class FamilyArgAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + value = values[0] + if value == "inet": family = io.PR_AF_INET - elif a == "inet6": + elif value == "inet6": family = io.PR_AF_INET6 - elif a == "unspec": + elif value == "unspec": family = io.PR_AF_UNSPEC else: - print "unknown address family (%s)" % (a) - usage() - sys.exit() - elif o in ("-4", "--inet"): - family = io.PR_AF_INET - elif o in ("-6", "--inet6"): - family = io.PR_AF_INET6 - elif o in ("-n", "--server_nickname"): - server_nickname = a - elif o in ("-N", "--client_nickname"): - client_nickname = a - elif o in ("-w", "--password"): - password = a - elif o in ("-p", "--port"): - port = int(a) - elif o in ("-C", "--client"): - client = True - elif o in ("-S", "--server"): - server = True - elif o in ("-e", "--encrypt"): - use_ssl = True - elif o in ("-E", "--noencrypt"): - use_ssl = False - elif o in ("--require_cert_once"): - client_cert_action = REQUIRE_CLIENT_CERT_ONCE - elif o in ("--require_cert_always"): - client_cert_action = REQUIRE_CLIENT_CERT_ALWAYS - elif o in ("--request_cert_once"): - client_cert_action = REQUEST_CLIENT_CERT_ONCE - elif o in ("--request_cert_always"): - client_cert_action = REQUEST_CLIENT_CERT_ALWAYS - elif o in ("-H", "--help"): - usage() - sys.exit() - else: - usage() - sys.exit() + raise argparse.ArgumentError(self, "unknown address family (%s)" % (value)) + setattr(namespace, self.dest, family) + +parser = argparse.ArgumentParser(description='SSL example') + +parser.add_argument('-C', '--client', action='store_true', + help='run as the client') + +parser.add_argument('-S', '--server', action='store_true', + help='run as the server') + +parser.add_argument('-d', '--db-name', + help='NSS database name (e.g. "sql:pki")') + +parser.add_argument('-H', '--hostname', + help='host to connect to') + +parser.add_argument('-f', '--family', + choices=['unspec', 'inet', 'inet6'], + dest='family', action=FamilyArgAction, nargs=1, + help=''' + If unspec client tries all addresses returned by AddrInfo, + server binds to IPv4 "any" wildcard address. + + If inet client tries IPv4 addresses returned by AddrInfo, + server binds to IPv4 "any" wildcard address. + + If inet6 client tries IPv6 addresses returned by AddrInfo, + server binds to IPv6 "any" wildcard address''') -if client and server: +parser.add_argument('-4', '--inet', + dest='family', action='store_const', const=io.PR_AF_INET, + help='set family to inet (see family)') + +parser.add_argument('-6', '--inet6', + dest='family', action='store_const', const=io.PR_AF_INET6, + help='set family to inet6 (see family)') + +parser.add_argument('-n', '--server-nickname', + help='server certificate nickname') + +parser.add_argument('-N', '--client-nickname', + help='client certificate nickname') + +parser.add_argument('-w', '--password', + help='certificate database password') + +parser.add_argument('-p', '--port', type=int, + help='host port') + +parser.add_argument('-e', '--encrypt', dest='use_ssl', action='store_true', + help='use SSL connection') + +parser.add_argument('-E', '--no-encrypt', dest='use_ssl', action='store_false', + help='do not use SSL connection') + +parser.add_argument('--require-cert-once', dest='client_cert_action', + action='store_const', const=REQUIRE_CLIENT_CERT_ONCE) + +parser.add_argument('--require-cert-always', dest='client_cert_action', + action='store_const', const=REQUIRE_CLIENT_CERT_ALWAYS) + +parser.add_argument('--request-cert-once', dest='client_cert_action', + action='store_const', const=REQUEST_CLIENT_CERT_ONCE) + +parser.add_argument('--request-cert-always', dest='client_cert_action', + action='store_const', const=REQUEST_CLIENT_CERT_ALWAYS) + +parser.set_defaults(client = False, + server = False, + db_name = 'sql:pki', + hostname = os.uname()[1], + family = io.PR_AF_UNSPEC, + server_nickname = 'test_server', + client_nickname = 'test_user', + password = 'db_passwd', + port = 1234, + use_ssl = True, + client_cert_action = NO_CLIENT_CERT, + ) + +options = parser.parse_args() + +if options.client and options.server: print "can't be both client and server" sys.exit(1) -if not (client or server): +if not (options.client or options.server): print "must be one of client or server" sys.exit(1) # Perform basic configuration and setup -if certdir is None: - nss.nss_init_nodb() +if options.use_ssl: + nss.nss_init(options.db_name) else: - nss.nss_init(certdir) + nss.nss_init_nodb() ssl.set_domestic_policy() nss.set_password_callback(password_callback) # Run as a client or as a server -if client: +if options.client: print "starting as client" Client() -if server: +if options.server: print "starting as server" Server() @@ -458,4 +426,3 @@ nss.nss_shutdown() except Exception, e: print e - diff -N -u -r python-nss-0.14.0/doc/examples/verify_cert.py python-nss-0.14.1/doc/examples/verify_cert.py --- python-nss-0.14.0/doc/examples/verify_cert.py 2013-04-15 13:05:46.000000000 -0400 +++ python-nss-0.14.1/doc/examples/verify_cert.py 2013-10-08 13:04:51.000000000 -0400 @@ -1,7 +1,7 @@ #!/usr/bin/python +import argparse import sys -import optparse import nss.nss as nss import nss.error as nss_error @@ -75,75 +75,74 @@ lines.extend(nss.make_line_fmt_tuples(level, msg)) lines.extend(obj.format_lines(level+1)) return nss.indented_format(lines) - + #------------------------------------------------------------------------------- def main(): - # Command line argument processing - parser = optparse.OptionParser() - - parser.set_defaults(dbdir = '/etc/pki/nssdb', - db_passwd = 'db_passwd', - input_format = 'pem', - check_sig = True, - print_cert = False, - with_log = True, - check_ca = True, - ) + global options - param_group = optparse.OptionGroup(parser, 'NSS Database', - 'Specify & control the NSS Database') + parser = argparse.ArgumentParser(description='certificate validation example') - param_group.add_option('-d', '--dbdir', dest='dbdir', - help='NSS database directory, default="%default"') - param_group.add_option('-P', '--db-passwd', dest='db_passwd', - help='NSS database password, default="%default"') + # === NSS Database Group === + group = parser.add_argument_group('NSS Database', + 'Specify & control the NSS Database') + group.add_argument('-d', '--db-name', + help='NSS database name (e.g. "sql:pki")') - parser.add_option_group(param_group) + group.add_argument('-P', '--db-passwd', + help='NSS database password') - param_group = optparse.OptionGroup(parser, 'Certificate', - 'Specify how the certificate is loaded') + # === Certificate Group === + group = parser.add_argument_group('Certificate', + 'Specify how the certificate is loaded') - param_group.add_option('-f', '--file', dest='cert_filename', - help='read cert from file') - param_group.add_option('--format', dest='input_format', choices=['pem', 'der'], - help='import format for certificate (der|pem) default="%default"') - param_group.add_option('-n', '--nickname', dest='cert_nickname', - help='load cert from NSS database by looking it up under this nickname') + group.add_argument('-f', '--file', dest='cert_filename', + help='read cert from file') + group.add_argument('-F', '--input-format', choices=['pem', 'der'], + help='format of input cert') - parser.add_option_group(param_group) + group.add_argument('-n', '--nickname', dest='cert_nickname', + help='load cert from NSS database by looking it up under this nickname') - param_group = optparse.OptionGroup(parser, 'Validation', - 'Control the validation') + # === Validation Group === + group = parser.add_argument_group('Validation', + 'Control the validation') - param_group.add_option('-u', '--usage', dest='cert_usage', action='append', choices=cert_usage_map.keys(), - help='may be specified multiple times, default="CheckAllUsages", may be one of: %s' % ', '.join(sorted(cert_usage_map.keys()))) - param_group.add_option('-c', '--check-sig', action='store_true', dest='check_sig', - help='check signature default=%default') - param_group.add_option('-C', '--no-check-sig', action='store_false', dest='check_sig', + group.add_argument('-u', '--usage', dest='cert_usage', action='append', choices=cert_usage_map.keys(), + help='certificate usage flags, may be specified multiple times') + group.add_argument('-c', '--check-sig', action='store_true', dest='check_sig', help='check signature') - param_group.add_option('-l', '--log', action='store_true', dest='with_log', - help='use verify log, default=%default') - param_group.add_option('-L', '--no-log', action='store_false', dest='with_log', - help='use verify log, default=%default') - param_group.add_option('-a', '--check-ca', action='store_true', dest='check_ca', - help='check if cert is CA, default=%default') - param_group.add_option('-A', '--no-check-ca', action='store_false', dest='check_ca', - help='check if cert is CA, default=%default') - - parser.add_option_group(param_group) - - param_group = optparse.OptionGroup(parser, 'Miscellaneous', - 'Miscellaneous options') + group.add_argument('-C', '--no-check-sig', action='store_false', dest='check_sig', + help='do not check signature') + group.add_argument('-l', '--log', action='store_true', dest='with_log', + help='use verify log') + group.add_argument('-L', '--no-log', action='store_false', dest='with_log', + help='do not use verify log') + group.add_argument('-a', '--check-ca', action='store_true', dest='check_ca', + help='check if cert is CA') + group.add_argument('-A', '--no-check-ca', action='store_false', dest='check_ca', + help='do not check if cert is CA') + + # === Miscellaneous Group === + group = parser.add_argument_group('Miscellaneous', + 'Miscellaneous options') - param_group.add_option('-p', '--print-cert', action='store_true', dest='print_cert', - help='print the certificate in a friendly fashion, default=%default') + group.add_argument('-p', '--print-cert', action='store_true', dest='print_cert', + help='print the certificate in a friendly fashion') - parser.add_option_group(param_group) - options, args = parser.parse_args() + parser.set_defaults(db_name = 'sql:pki', + db_passwd = 'db_passwd', + input_format = 'pem', + check_sig = True, + with_log = True, + check_ca = True, + print_cert = False, + ) + + options = parser.parse_args() # Process the command line arguments @@ -175,9 +174,9 @@ return 1 # Initialize NSS. - print indented_output('NSS Database', options.dbdir) + print indented_output('NSS Database', options.db_name) print - nss.nss_init(options.dbdir) + nss.nss_init(options.db_name) certdb = nss.get_default_certdb() nss.set_password_callback(password_callback) @@ -194,7 +193,7 @@ except Exception, e: print e print >>sys.stderr, 'Unable to load cert nickname "%s" from database "%s"' % \ - (options.cert_nickname, options.dbdir) + (options.cert_nickname, options.db_name) return 1 # Dump the cert if the user wants to see it @@ -282,5 +281,3 @@ #------------------------------------------------------------------------------- if __name__ == "__main__": sys.exit(main()) - - diff -N -u -r python-nss-0.14.0/doc/examples/verify_server.py python-nss-0.14.1/doc/examples/verify_server.py --- python-nss-0.14.0/doc/examples/verify_server.py 2013-04-15 13:05:46.000000000 -0400 +++ python-nss-0.14.1/doc/examples/verify_server.py 2013-10-08 13:00:37.000000000 -0400 @@ -4,10 +4,10 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import argparse +import getpass import os import sys -import getopt -import getpass from nss.error import NSPRError import nss.io as io @@ -16,11 +16,6 @@ # ----------------------------------------------------------------------------- -# command line parameters, default them to something reasonable -#certdir = '/etc/httpd/alias' -certdir = '/etc/pki/nssdb' -hostname = 'www.verisign.com' -port = 443 timeout_secs = 3 request = '''\ @@ -104,18 +99,18 @@ valid_addr = False # Get the IP Address of our server try: - addr_info = io.AddrInfo(hostname) + addr_info = io.AddrInfo(options.hostname) except: - print "ERROR: could not resolve hostname \"%s\"" % hostname + print "ERROR: could not resolve hostname \"%s\"" % options.hostname return for net_addr in addr_info: - net_addr.port = port + net_addr.port = options.port sock = ssl.SSLSocket(net_addr.family) # Set client SSL socket options sock.set_ssl_option(ssl.SSL_SECURITY, True) sock.set_ssl_option(ssl.SSL_HANDSHAKE_AS_CLIENT, True) - sock.set_hostname(hostname) + sock.set_hostname(options.hostname) # Provide a callback which notifies us when the SSL handshake is # complete @@ -135,7 +130,7 @@ continue if not valid_addr: - print "ERROR: could not connect to \"%s\"" % hostname + print "ERROR: could not connect to \"%s\"" % options.hostname return try: @@ -158,48 +153,31 @@ # ----------------------------------------------------------------------------- -usage_str = ''' --d --certdir certificate directory (default: %(certdir)s) --h --hostname host to connect to (default: %(hostname)s) --p --port host port (default: %(port)s) -''' % { - 'certdir' : certdir, - 'hostname' : hostname, - 'port' : port, - } +parser = argparse.ArgumentParser(description='certificate verification example', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) -def usage(): - print usage_str +parser.add_argument('-d', '--db-name', + help='NSS database name (e.g. "sql:pki")') -try: - opts, args = getopt.getopt(sys.argv[1:], "Hd:h:p:", - ["help", "certdir=", "hostname=", - "port=", - ]) -except getopt.GetoptError: - # print help information and exit: - usage() - sys.exit(2) - - -for o, a in opts: - if o in ("-d", "--certdir"): - certdir = a - if o in ("-h", "--hostname"): - hostname = a - if o in ("-p", "--port"): - port = int(a) - if o in ("-H", "--help"): - usage() - sys.exit() +parser.add_argument('-H', '--hostname', + help='host to connect to') + +parser.add_argument('-p', '--port', type=int, + help='host port') + +parser.set_defaults(db_name = 'sql:pki', + hostname = 'www.verisign.com', + port = 443, + ) + +options = parser.parse_args() # Perform basic configuration and setup try: - nss.nss_init(certdir) + nss.nss_init(options.db_name) ssl.set_domestic_policy() except Exception, e: print >>sys.stderr, e.strerror sys.exit(1) client() - diff -N -u -r python-nss-0.14.0/src/__init__.py python-nss-0.14.1/src/__init__.py --- python-nss-0.14.0/src/__init__.py 2013-04-24 16:30:26.000000000 -0400 +++ python-nss-0.14.1/src/__init__.py 2013-10-08 14:38:28.000000000 -0400 @@ -163,8 +163,8 @@ - Initialize NSS and indicate the certficate database (CertDB):: - certdir = './pki' - ssl.nssinit(certdir) + db_name = 'sql:pki' + ssl.nssinit(db_name) - If you are implementing an SSL server call config_secure_server() (see ssl_example.py):: @@ -244,7 +244,7 @@ future we can find a solution but the immediate goal of the NSS Python binding was to expose NSS through Python, not necessarily to solve the larger integration issue of Python run-time and NSPR - run-time. + run-time. - NSPR would like to hide the underlying platform socket (in the NSPR code this is called "osfd"). There are NSPR API's which @@ -312,5 +312,4 @@ To be added """ -__version__ = '0.14.0' - +__version__ = '0.14.1' diff -N -u -r python-nss-0.14.0/test/run_tests python-nss-0.14.1/test/run_tests --- python-nss-0.14.0/test/run_tests 2013-04-30 11:03:54.000000000 -0400 +++ python-nss-0.14.1/test/run_tests 2013-10-08 12:57:56.000000000 -0400 @@ -1,21 +1,13 @@ #!/usr/bin/python -import getopt -import sys +import argparse import os +import sys import unittest from util import get_build_dir #------------------------------------------------------------------------------- -prog_name = os.path.basename(sys.argv[0]) - -config = { - 'in_tree' : True, -} - -#------------------------------------------------------------------------------- - def run_tests(): import setup_certs @@ -23,9 +15,11 @@ import test_cipher import test_digest import test_pkcs12 + import test_misc + import test_ocsp import test_client_server - setup_certs.setup_certs() + setup_certs.setup_certs([]) loader = unittest.TestLoader() runner = unittest.TextTestRunner() @@ -43,62 +37,19 @@ #------------------------------------------------------------------------------- -class Usage(Exception): - def __init__(self, msg): - self.msg = msg - -def usage(): - 'Print command help.' - - return '''\ -%(prog_name)s [-i] - --h --help print help --i --installed runs the test using installed libraries - instead of "in tree" libraries - -Runs unit tests. -By default test is done "in tree". - -Examples: - -Run test using libraries built in this tree -%(prog_name)s - -Run post install test -%(prog_name)s -i -''' % {'prog_name' : prog_name, - } - -#------------------------------------------------------------------------------- +def main(): + parser = argparse.ArgumentParser(description='run the units (installed or in tree)') + parser.add_argument('-i', '--installed', action='store_false', dest='in_tree', + help='run tests using installed library') + parser.add_argument('-t', '--in-tree', action='store_true', dest='in_tree', + help='run tests using devel tree') -def main(argv=None): - if argv is None: - argv = sys.argv - - try: - try: - opts, args = getopt.getopt(argv[1:], 'hi', - ['help', 'installed',]) - except getopt.GetoptError, e: - raise Usage(e) - return 2 - - for o, a in opts: - if o in ('-h', '--help'): - print >>sys.stdout, usage() - return 0 - elif o in ('-i', '--installed'): - config['in_tree'] = False - else: - raise Usage("command argument '%s' not handled, internal error" % o) - except Usage, e: - print >>sys.stderr, e.msg - print >>sys.stderr, "for help use --help" - return 2 + parser.set_defaults(in_tree = False, + ) + options = parser.parse_args() - if config['in_tree']: + if options.in_tree: # Run the tests "in the tree" # Rather than testing with installed versions run the test # with the package built in this tree. diff -N -u -r python-nss-0.14.0/test/setup_certs.py python-nss-0.14.1/test/setup_certs.py --- python-nss-0.14.0/test/setup_certs.py 2013-04-18 12:28:05.000000000 -0400 +++ python-nss-0.14.1/test/setup_certs.py 2013-10-17 11:07:09.000000000 -0400 @@ -1,345 +1,506 @@ #!/usr/bin/python -import traceback -import getopt -import sys -import os -import errno +import argparse +import atexit import logging -import subprocess +import os import shutil -import shlex -import pty -import tty -import re -import time - -#------------------------------------------------------------------------------- - -__all__ = ["config", "setup_certs"] - -if __name__ == '__main__': - prog_name = os.path.basename(sys.argv[0]) -else: - prog_name = 'setup_certs' - -serial_number = 0 -hostname = os.uname()[1] -client_username = 'test_user' - -config = { - 'verbose' : False, - 'debug' : False, - 'logfile' : 'setup_certs.log', - 'log_level' : logging.WARN, - 'interactive' : sys.stdout.isatty(), - 'dbdir' : os.path.join(os.path.dirname(sys.argv[0]), 'pki'), - 'db_passwd' : 'db_passwd', - 'noise_file' : 'noise_file', - 'ca_subject' : 'CN=Test CA', - 'ca_nickname' : 'test_ca', - 'server_subject' : 'CN=%s' % hostname, - 'server_nickname' : 'test_server', - 'client_subject' : 'CN=%s' % client_username, - 'client_nickname' : client_username, -} +import subprocess +import sys +from string import Template +import tempfile #------------------------------------------------------------------------------- class CmdError(Exception): - def __init__(self, cmd, exit_code, msg): - self.cmd = cmd - self.exit_code = exit_code - self.msg = msg + def __init__(self, cmd_args, returncode, message=None, stdout=None, stderr=None): + self.cmd_args = cmd_args + self.returncode = returncode + if message is None: + self.message = 'Failed error=%s, ' % (returncode) + if stderr: + self.message += '"%s", ' % stderr + self.message += 'args=%s' % (cmd_args) + else: + self.message = message + self.stdout = stdout + self.stderr = stderr def __str__(self): - return "Command \"%s\"\nFailed with exit code = %s\nOutput was:\n%s\n" % \ - (self.cmd, self.exit_code, self.msg) + return self.message -#------------------------------------------------------------------------------- -def next_serial(): - global serial_number - serial_number += 1 - return serial_number +def run_cmd(cmd_args, input=None): + logging.debug(' '.join(cmd_args)) + try: + p = subprocess.Popen(cmd_args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate(input) + returncode = p.returncode + if returncode != 0: + raise CmdError(cmd_args, returncode, + 'failed %s' % (', '.join(cmd_args)), + stdout, stderr) + return stdout, stderr + except OSError as e: + raise CmdError(cmd_args, e.errno, stderr=str(e)) + +def exit_handler(options): + logging.debug('in exit handler') + + if options.passwd_filename is not None: + logging.debug('removing passwd_filename=%s', options.passwd_filename) + os.remove(options.passwd_filename) + + if options.noise_filename is not None: + logging.debug('removing noise_filename=%s', options.noise_filename) + os.remove(options.noise_filename) + +def write_serial(options, serial_number): + with open(options.serial_file, 'w') as f: + f.write('%x\n' % serial_number) + + +def read_serial(options): + if not os.path.exists(options.serial_file): + write_serial(options, options.serial_number) -def create_noise_file(): - """ - Generate a noise file to be used when creating a key - """ - if os.path.exists(config['noise_file']): - os.remove(config['noise_file']) - - f = open(config['noise_file'], "w") - f.write(os.urandom(40)) - f.close() + with open(options.serial_file) as f: + serial_number = int(f.readline(), 16) + return serial_number - return -def run_cmd(cmd, input=None): - logging.debug("running command: %s", cmd) +def init_noise_file(options): + '''Generate a noise file to be used when creating a key - if input is None: - stdin = None - else: - stdin = subprocess.PIPE + We create a temporary file on first use and continue to use + the same temporary file for the duration of this process. + Each time this function is called it writes new random data + into the file. + ''' + random_data = os.urandom(40) - p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate(input) - status = p.returncode - if config['verbose']: - logging.debug("cmd status = %s", status) - logging.debug("cmd stdout = %s", stdout) - logging.debug("cmd stderr = %s", stderr) - return status, stdout, stderr - -def run_cmd_with_prompts(cmd, prompts): - logging.debug('running command: %s', cmd) - - argv = shlex.split(cmd) - - pid, master_fd = pty.fork() - if pid == 0: - os.execlp(argv[0], *argv) - - time.sleep(0.1) # FIXME: why is this necessary? - output = '' - search_position = 0 - cur_prompt = 0 - if cur_prompt < len(prompts): - prompt_re = re.compile(prompts[cur_prompt][0]) - response = prompts[cur_prompt][1] - cur_prompt += 1 + if options.noise_filename is None: + fd, options.noise_filename = tempfile.mkstemp() + os.write(fd, random_data) + os.close(fd) else: - prompt_re = None - response = None - - while True: - try: - new_data = os.read(master_fd, 1024) - except OSError, e: - if e.errno == errno.EIO: # process exited - break - else: - raise - if len(new_data) == 0: - break # EOF - output += new_data - logging.debug('output="%s"', output[search_position:]); - if prompt_re is not None: - logging.debug('search pattern = "%s"', prompt_re.pattern) - match = prompt_re.search(output, search_position) - if match: - search_position = match.end() - parsed = output[match.start() : match.end()] - logging.debug('found prompt: "%s"', parsed) - logging.debug('writing response: "%s"', response) - os.write(master_fd, response) - - if cur_prompt < len(prompts): - prompt_re = re.compile(prompts[cur_prompt][0]) - response = prompts[cur_prompt][1] - cur_prompt += 1 - else: - prompt_re = None - response = None - - - exit_value = os.waitpid(pid, 0)[1] - exit_signal = exit_value & 0xFF - exit_code = exit_value >> 8 - #logging.debug('output="%s"' % output) - logging.debug('cmd signal=%s, exit_code=%s' % (exit_signal, exit_code)) - - return exit_code, output + with open(options.noise_filename, 'w') as f: + f.write(random_data) + return +def create_passwd_file(options): + fd, options.passwd_filename = tempfile.mkstemp() + os.write(fd, options.db_passwd) + os.close(fd) -#------------------------------------------------------------------------------- -def setup_certs(): - print 'setting up certs ...' - - if os.path.exists(config['dbdir']): - shutil.rmtree(config['dbdir']) - os.makedirs(config['dbdir']) +def db_has_cert(options, nickname): + cmd_args = ['/usr/bin/certutil', + '-d', options.db_name, + '-L', + '-n', nickname] try: + run_cmd(cmd_args) + except CmdError as e: + if e.returncode == 255 and 'not found' in e.stderr: + return False + else: + raise + return True + +def format_cert(options, nickname): + cmd_args = ['/usr/bin/certutil', + '-L', # OPERATION: list + '-d', options.db_name, # NSS database + '-f', options.passwd_filename, # database password in file + '-n', nickname, # nickname of cert to list + ] - create_noise_file() - - # 1. Create the database - cmd = 'certutil -N -d %(dbdir)s' % config - exit_code, output = run_cmd_with_prompts(cmd, - [('Enter new password:\s*', config['db_passwd'] + '\n'), - ('Re-enter password:\s*', config['db_passwd'] + '\n')]) - if exit_code != 0: - raise CmdError(cmd, exit_code, output) - - # 2. Create a root CA certificate - config['serial_number'] = next_serial() - cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -s "%(ca_subject)s" -n "%(ca_nickname)s" -x -t "CTu,C,C" -m %(serial_number)d' % config - exit_code, output = run_cmd_with_prompts(cmd, - [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')]) - if exit_code != 0: - raise CmdError(cmd, exit_code, output) - - # 3. Create a server certificate and sign it. - config['serial_number'] = next_serial() - cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(ca_nickname)s -s "%(server_subject)s" -n "%(server_nickname)s" -t "u,u,u" -m %(serial_number)d' % config - exit_code, output = run_cmd_with_prompts(cmd, - [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')]) - if exit_code != 0: - raise CmdError(cmd, exit_code, output) - - # 4. Create a client certificate and sign it. - config['serial_number'] = next_serial() - cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(ca_nickname)s -s "%(client_subject)s" -n "%(client_nickname)s" -t "u,u,u" -m %(serial_number)d' % config - exit_code, output = run_cmd_with_prompts(cmd, - [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')]) - if exit_code != 0: - raise CmdError(cmd, exit_code, output) - - # 5. Import public root CA's - cmd = 'modutil -dbdir %(dbdir)s -add ca_certs -libfile libnssckbi.so' % config - exit_code, stdout, stderr = run_cmd(cmd) - if exit_code != 0: - raise CmdError(cmd, exit_code, output) - - # 6. Create a sub CA certificate - config['serial_number'] = next_serial() - config['subca_subject'] = 'CN=subca' - config['subca_nickname'] = 'subca' - cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(ca_nickname)s -s "%(subca_subject)s" -n "%(subca_nickname)s" -t "CTu,C,C" -m %(serial_number)d' % config - exit_code, output = run_cmd_with_prompts(cmd, - [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')]) - if exit_code != 0: - raise CmdError(cmd, exit_code, output) - - # 7. Create a server certificate and sign it with the subca. - config['serial_number'] = next_serial() - config['server_subject'] = config['server_subject'] + "_" + config['subca_nickname'] - config['server_nickname'] = config['server_nickname'] + "_" + config['subca_nickname'] - cmd = 'certutil -S -d %(dbdir)s -z %(noise_file)s -c %(subca_nickname)s -s "%(server_subject)s" -n "%(server_nickname)s" -t "u,u,u" -m %(serial_number)d' % config - exit_code, output = run_cmd_with_prompts(cmd, - [('Enter Password or Pin for "NSS Certificate DB":\s*', config['db_passwd'] + '\n')]) - if exit_code != 0: - raise CmdError(cmd, exit_code, output) - - finally: - if os.path.exists(config['noise_file']): - os.remove(config['noise_file']) - - logging.info('certifcate database password="%(db_passwd)s"', config) - logging.info('CA nickname="%(ca_nickname)s", CA subject="%(ca_subject)s"', config) - logging.info('server nickname="%(server_nickname)s", server subject="%(server_subject)s"', config) - logging.info('client nickname="%(client_nickname)s", client subject="%(client_subject)s"', config) + stdout, stderr = run_cmd(cmd_args) + return stdout #------------------------------------------------------------------------------- -class Usage(Exception): - def __init__(self, msg): - self.msg = msg +def create_database(options): + if os.path.exists(options.db_dir) and not os.path.isdir(options.db_dir): + raise ValueError('db_dir "%s" exists but is not a directory' % options.db_dir) + + # Create resources + create_passwd_file(options) + + if options.clean: + logging.info('Creating clean database directory: "%s"', options.db_dir) + + if os.path.exists(options.db_dir): + shutil.rmtree(options.db_dir) + os.makedirs(options.db_dir) + + cmd_args = ['/usr/bin/certutil', + '-N', # OPERATION: create database + '-d', options.db_name, # NSS database + '-f', options.passwd_filename, # database password in file + ] -def usage(): - ''' - Print command help. - ''' + stdout, stderr = run_cmd(cmd_args) + else: + logging.info('Using existing database directory: "%s"', options.db_dir) + +def create_ca_cert(options): + serial_number = read_serial(options) + init_noise_file(options) + + logging.info('creating ca cert: subject="%s", nickname="%s"', + options.ca_subject, options.ca_nickname) + + cmd_args = ['/usr/bin/certutil', + '-S', # OPERATION: create signed cert + '-x', # self-sign the cert + '-d', options.db_name, # NSS database + '-f', options.passwd_filename, # database password in file + '-n', options.ca_nickname, # nickname of cert being created + '-s', options.ca_subject, # subject of cert being created + '-g', str(options.key_size), # keysize + '-t', 'CT,,CT', # trust + '-1', # add key usage extension + '-2', # add basic contraints extension + '-5', # add certificate type extension + '-m', str(serial_number), # cert serial number + '-v', str(options.valid_months), # validity in months + '-z', options.noise_filename, # noise file random seed + ] + + # Provide input for extension creation prompting + input = '' + + # >> Key Usage extension << + # 0 - Digital Signature + # 1 - Non-repudiation + # 2 - Key encipherment + # 3 - Data encipherment + # 4 - Key agreement + # 5 - Cert signing key + # 6 - CRL signing key + # Other to finish + input += '0\n1\n5\n100\n' + # Is this a critical extension [y/N]? + input += 'y\n' + + # >> Basic Constraints extension << + # Is this a CA certificate [y/N]? + input += 'y\n' + # Enter the path length constraint, enter to skip [<0 for unlimited path]: > 2 + input += '%d\n' % options.ca_path_len + # Is this a critical extension [y/N]? + input += 'y\n' + + # >> NS Cert Type extension << + # 0 - SSL Client + # 1 - SSL Server + # 2 - S/MIME + # 3 - Object Signing + # 4 - Reserved for future use + # 5 - SSL CA + # 6 - S/MIME CA + # 7 - Object Signing CA + # Other to finish + input += '5\n6\n7\n100\n' + # Is this a critical extension [y/N]? + input += 'n\n' + + stdout, stderr = run_cmd(cmd_args, input) + write_serial(options, serial_number + 1) + + return options.ca_nickname + +def create_server_cert(options): + serial_number = read_serial(options) + init_noise_file(options) + + logging.info('creating server cert: subject="%s", nickname="%s"', + options.server_subject, options.server_nickname) + + cmd_args = ['/usr/bin/certutil', + '-S', # OPERATION: create signed cert + '-d', options.db_name, # NSS database + '-f', options.passwd_filename, # database password in file + '-c', options.ca_nickname, # nickname of CA used to sign this cert + '-n', options.server_nickname, # nickname of cert being created + '-s', options.server_subject, # subject of cert being created + '-g', str(options.key_size), # keysize + '-t', 'u,u,u', # trust + '-5', # add certificate type extensionn + '-m', str(serial_number), # cert serial number + '-v', str(options.valid_months), # validity in months + '-z', options.noise_filename, # noise file random seed + ] + + # Provide input for extension creation prompting + input = '' + + # >> NS Cert Type extension << + # 0 - SSL Client + # 1 - SSL Server + # 2 - S/MIME + # 3 - Object Signing + # 4 - Reserved for future use + # 5 - SSL CA + # 6 - S/MIME CA + # 7 - Object Signing CA + # Other to finish + input += '1\n100\n' + # Is this a critical extension [y/N]? + input += 'n\n' + + stdout, stderr = run_cmd(cmd_args, input) + write_serial(options, serial_number + 1) + + return options.server_nickname + +def create_client_cert(options): + serial_number = read_serial(options) + init_noise_file(options) + + logging.info('creating client cert: subject="%s", nickname="%s"', + options.client_subject, options.client_nickname) + + cmd_args = ['/usr/bin/certutil', + '-S', # OPERATION: create signed cert + '-d', options.db_name, # NSS database + '-f', options.passwd_filename, # database password in file + '-c', options.ca_nickname, # nickname of CA used to sign this cert + '-n', options.client_nickname, # nickname of cert being created + '-s', options.client_subject, # subject of cert being created + '-g', str(options.key_size), # keysize + '-t', 'u,u,u', # trust + '-5', # add certificate type extensionn + '-m', str(serial_number), # cert serial number + '-v', str(options.valid_months), # validity in months + '-z', options.noise_filename, # noise file random seed + ] + + # Provide input for extension creation prompting + input = '' + + # >> NS Cert Type extension << + # 0 - SSL Client + # 1 - SSL Server + # 2 - S/MIME + # 3 - Object Signing + # 4 - Reserved for future use + # 5 - SSL CA + # 6 - S/MIME CA + # 7 - Object Signing CA + # Other to finish + input += '0\n100\n' + # Is this a critical extension [y/N]? + input += 'n\n' + + stdout, stderr = run_cmd(cmd_args, input) + write_serial(options, serial_number + 1) + + return options.client_nickname + +def add_trusted_certs(options): + name = 'ca_certs' + module = 'libnssckbi.so' + logging.info('adding system trusted certs: name="%s" module="%s"', + name, module) + + cmd_args = ['/usr/bin/modutil', + '-dbdir', options.db_name, # NSS database + '-add', name, # module name + '-libfile', module, # module + ] - return '''\ --h --help print help --l --log-level level set the logging level, may be one of: - debug, info, warn, error, critical --L --logfile filename log to this file, empty string disables logging to a file --v --verbose be chatty --D --debug show run information --w --password set the certificate database password --d --dbdir set the datbase directory --s --server-subject set the server's subject - -Examples: - -%(prog_name)s -m 10 -''' % {'prog_name' : prog_name, - } + run_cmd(cmd_args) + return name #------------------------------------------------------------------------------- -def main(argv=None): - if argv is None: - argv = sys.argv +def setup_certs(args): - try: - try: - opts, args = getopt.getopt(argv[1:], 'hl:L:vDw:d:s:', - ['help', 'logfile=', 'verbose', 'debug', - 'password', 'dbdir', 'server-subject']) - except getopt.GetoptError, e: - raise Usage(e) - return 2 - - for o, a in opts: - if o in ('-h', '--help'): - print >>sys.stdout, usage() - return 0 - elif o in ('-L', '--logfile'): - if not a: - config['logfile'] = None - else: - config['logfile'] = a - elif o in ('-l', '--log-level'): - if a.upper() in logging._levelNames: - config['log_level'] = logging._levelNames[a.upper()] - else: - print >>sys.stderr, "ERROR: unknown log-level '%s'" % a - elif o in ('-v', '--verbose'): - config['verbose'] = True - elif o in ('-D', '--debug'): - config['debug'] = True - elif o in ('-w', '--password'): - config['db_passwd'] = a - elif o in ('-d', '--dbdir'): - config['dbdir'] = a - elif o in ('-s', '--server-subject'): - config['server_subject'] = 'CN=%s' % a - else: - raise Usage("command argument '%s' not handled, internal error" % o) - except Usage, e: - print >>sys.stderr, e.msg - print >>sys.stderr, "for help use --help" - return 2 - - if config['verbose']: - config['log_level'] = logging.INFO - if config['debug']: - config['log_level'] = logging.DEBUG + # --- cmd --- + parser = argparse.ArgumentParser(description='create certs for testing', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--verbose', action='store_true', + help='provide info level messages') + + parser.add_argument('--debug', action='store_true', + help='provide debug level messages') + + parser.add_argument('--quiet', action='store_true', + help='do not display any messages') + + parser.add_argument('--show-certs', action='store_true', + help='show the certificate details') + + parser.add_argument('--no-clean', action='store_false', dest='clean', + help='do not remove existing db_dir') + + parser.add_argument('--no-trusted-certs', dest='add_trusted_certs', action='store_false', + help='do not add trusted certs') + + parser.add_argument('--hostname', + help='hostname used in cert subjects') + + parser.add_argument('--db-type', + choices=['sql', ''], + help='NSS database type') + + parser.add_argument('--db-dir', + help='NSS database directory') + + parser.add_argument('--db-passwd', + help='NSS database password') + + parser.add_argument('--ca-subject', + help='CA certificate subject') + + parser.add_argument('--ca-nickname', + help='CA certificate nickname') + + parser.add_argument('--server-subject', + help='server certificate subject') + + parser.add_argument('--server-nickname', + help='server certificate nickname') + + parser.add_argument('--client-username', + help='client user name, used in client cert subject') + + parser.add_argument('--client-subject', + help='client certificate subject') + + parser.add_argument('--client-nickname', + help='client certificate nickname') + + parser.add_argument('--serial-number', type=int, + help='starting serial number for certificates') + + parser.add_argument('--valid-months', dest='valid_months', type=int, + help='validity period in months') + parser.add_argument('--path-len', dest='ca_path_len', type=int, + help='basic constraints path length') + parser.add_argument('--key-type', dest='key_type', + help='key type, either rsa or dsa') + parser.add_argument('--key-size', dest='key_size', + help='number of bits in key (must be multiple of 8)') + parser.add_argument('--serial-file', dest='serial_file', + help='name of file used to track next serial number') + + parser.set_defaults(verbose = False, + debug = False, + quiet = False, + show_certs = False, + clean = True, + add_trusted_certs = True, + hostname = os.uname()[1], + db_type = 'sql', + db_dir = 'pki', + db_passwd = 'db_passwd', + ca_subject = 'CN=Test CA', + ca_nickname = 'test_ca', + server_subject = 'CN=${hostname}', + server_nickname = 'test_server', + client_username = 'test_user', + client_subject = 'CN=${client_username}', + client_nickname = '${client_username}', + serial_number = 1, + key_type = 'rsa', + key_size = 1024, + valid_months = 12, + ca_path_len = 2, + serial_file = '${db_dir}/serial', + ) + + + options = parser.parse_args(args) + + # Do substitutions on option values. + # This is ugly because argparse does not expose an API which permits iterating over + # the contents of options nor a way to get the options as a dict, ugh :-( + # So we access options.__dict__ directly. + for key in options.__dict__.keys(): + # Assume options never begin with underscore + if key.startswith('_'): + continue + value = getattr(options, key) + # Can't substitue on non-string values + if not isinstance(value, basestring): + continue + # Don't bother trying to substitute if $ substitution character isn't present + if '$' not in value: + continue + setattr(options, key, Template(value).substitute(options.__dict__)) + + # Set up logging + log_level = logging.INFO + if options.quiet: + log_level = logging.ERROR + if options.verbose: + log_level = logging.INFO + if options.debug: + log_level = logging.DEBUG # Initialize logging - logging.basicConfig(level=config['log_level'], - format='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%m-%d %H:%M', - filename=config['logfile'], - filemode='a') - - if config['interactive']: - # Create a seperate logger for the console - console_logger = logging.StreamHandler() - console_logger.setLevel(config['log_level']) - # set a format which is simpler for console use - formatter = logging.Formatter('%(message)s') - console_logger.setFormatter(formatter) - # add the handler to the root logger - logging.getLogger('').addHandler(console_logger) + logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s') + logger = logging.getLogger() + + # Synthesize some useful options derived from specified options + if options.db_type == '': + options.db_name = options.db_dir + else: + options.db_name = '%s:%s' % (options.db_type, options.db_dir) + options.passwd_filename = None + options.noise_filename = None + + # Set function to clean up on exit, bind fuction with options + def exit_handler_with_options(): + exit_handler(options) + atexit.register(exit_handler_with_options) + + cert_nicknames = [] try: - setup_certs() - except Exception, e: - logging.error(traceback.format_exc()) - logging.error(str(e)) + create_database(options) + cert_nicknames.append(create_ca_cert(options)) + cert_nicknames.append(create_server_cert(options)) + cert_nicknames.append(create_client_cert(options)) + if options.add_trusted_certs: + add_trusted_certs(options) + except CmdError as e: + logging.error(e.message) + logging.error(e.stderr) return 1 + if options.show_certs: + if logger.getEffectiveLevel() > logging.INFO: + logger.setLevel(logging.INFO) + for nickname in cert_nicknames: + logging.info('Certificate nickname "%s"\n%s', + nickname, format_cert(options, nickname)) + + logging.info('---------- Summary ----------') + logging.info('NSS database name="%s", password="%s"', + options.db_name, options.db_passwd) + logging.info('CA nickname="%s", CA subject="%s"', + options.ca_nickname, options.ca_subject) + logging.info('server nickname="%s", server subject="%s"', + options.server_nickname, options.server_subject) + logging.info('client nickname="%s", client subject="%s"', + options.client_nickname, options.client_subject) + return 0 #------------------------------------------------------------------------------- +def main(): + return setup_certs(None) + if __name__ == '__main__': sys.exit(main()) diff -N -u -r python-nss-0.14.0/test/test_client_server.py python-nss-0.14.1/test/test_client_server.py --- python-nss-0.14.0/test/test_client_server.py 2013-04-30 13:00:23.000000000 -0400 +++ python-nss-0.14.1/test/test_client_server.py 2013-10-08 09:35:53.000000000 -0400 @@ -25,13 +25,12 @@ password = 'db_passwd' use_ssl = True client_cert_action = NO_CLIENT_CERT -certdir = os.path.join(os.path.dirname(sys.argv[0]), 'pki') +db_name = 'sql:pki' hostname = os.uname()[1] server_nickname = 'test_server' client_nickname = 'test_user' port = 1234 timeout_secs = 10 -family = io.PR_AF_INET sleep_time = 5 @@ -142,7 +141,6 @@ if info: print "client: using SSL" ssl.set_domestic_policy() - valid_addr = False # Get the IP Address of our server try: addr_info = io.AddrInfo(hostname) @@ -151,8 +149,6 @@ return for net_addr in addr_info: - if family != io.PR_AF_UNSPEC: - if net_addr.family != family: continue net_addr.port = port if use_ssl: @@ -180,26 +176,21 @@ if verbose: print "client trying connection to: %s" % (net_addr) sock.connect(net_addr, timeout=io.seconds_to_interval(timeout_secs)) if verbose: print "client connected to: %s" % (net_addr) - valid_addr = True break except Exception, e: sock.close() print >>sys.stderr, "client: connection to: %s failed (%s)" % (net_addr, e) - if not valid_addr: - print >>sys.stderr, "Could not establish valid address for \"%s\" in family %s" % \ - (hostname, io.addr_family_name(family)) - return - # Talk to the server try: if info: print "client: sending \"%s\"" % (request) - sock.send(request) - buf = sock.recv(1024) + sock.send(request + '\n') # newline is protocol record separator + buf = sock.readline() if not buf: print >>sys.stderr, "client: lost connection" sock.close() return + buf = buf.rstrip() # remove newline record separator if info: print "client: received \"%s\"" % (buf) except Exception, e: print >>sys.stderr, "client: %s" % e @@ -228,15 +219,11 @@ # ----------------------------------------------------------------------------- def server(): - global family - if verbose: print "starting server:" # Initialize # Setup an IP Address to listen on any of our interfaces - if family == io.PR_AF_UNSPEC: - family = io.PR_AF_INET - net_addr = io.NetworkAddress(io.PR_IpAddrAny, port, family) + net_addr = io.NetworkAddress(io.PR_IpAddrAny, port) if use_ssl: if info: print "server: using SSL" @@ -290,15 +277,16 @@ while True: try: # Handle the client connection - buf = client_sock.recv(1024) + buf = client_sock.readline() # newline is protocol record separator if not buf: print >>sys.stderr, "server: lost lost connection to %s" % (client_addr) break + buf = buf.rstrip() # remove newline record separator if info: print "server: received \"%s\"" % (buf) - reply = "{%s}" % buf # echo + reply = "{%s}" % buf # echo embedded inside braces if info: print "server: sending \"%s\"" % (reply) - client_sock.send(reply) # echo + client_sock.send(reply + '\n') # send echo with record separator time.sleep(sleep_time) client_sock.shutdown() @@ -320,7 +308,7 @@ def run_server(): pid = os.fork() if pid == 0: - nss.nss_init(certdir) + nss.nss_init(db_name) server() nss.nss_shutdown() time.sleep(sleep_time) @@ -348,7 +336,7 @@ def test_ssl(self): request = "foo" - nss.nss_init(certdir) + nss.nss_init(db_name) reply = client(request) nss.nss_shutdown() self.assertEqual("{%s}" % request, reply) diff -N -u -r python-nss-0.14.0/test/test_ocsp.py python-nss-0.14.1/test/test_ocsp.py --- python-nss-0.14.0/test/test_ocsp.py 2013-04-24 12:36:07.000000000 -0400 +++ python-nss-0.14.1/test/test_ocsp.py 2013-10-08 09:09:18.000000000 -0400 @@ -7,7 +7,7 @@ import nss.nss as nss from nss.error import NSPRError -certdir = 'pki' +db_name = 'sql:pki' #------------------------------------------------------------------------------- @@ -16,7 +16,7 @@ class TestAPI(unittest.TestCase): def setUp(self): - nss.nss_init_read_write(certdir) + nss.nss_init_read_write(db_name) self.certdb = nss.get_default_certdb() def tearDown(self): diff -N -u -r python-nss-0.14.0/test/test_pkcs12.py python-nss-0.14.1/test/test_pkcs12.py --- python-nss-0.14.0/test/test_pkcs12.py 2013-04-15 13:05:46.000000000 -0400 +++ python-nss-0.14.1/test/test_pkcs12.py 2013-10-17 10:29:09.000000000 -0400 @@ -2,6 +2,7 @@ import sys import os +import re import subprocess import shlex import unittest @@ -13,30 +14,55 @@ #------------------------------------------------------------------------------- verbose = False -certdir = 'pki' +db_name = 'sql:pki' db_passwd = 'db_passwd' -pkcs12_file_password = 'pk12_passwd' +pk12_passwd = 'pk12_passwd' -read_nickname = 'test_user' -read_pkcs12_file = '%s.p12' % read_nickname - -write_export_file = False -export_nickname = 'test_server' +cert_nickname = 'test_user' +pk12_filename = '%s.p12' % cert_nickname +exported_pk12_filename = 'exported_%s' % pk12_filename #------------------------------------------------------------------------------- -def run_cmd(cmd): - if verbose: print "running command: %s" % cmd - - args = shlex.split(cmd) - subprocess.check_call(args, stdout=subprocess.PIPE) +class CmdError(Exception): + def __init__(self, cmd_args, returncode, message=None, stdout=None, stderr=None): + self.cmd_args = cmd_args + self.returncode = returncode + if message is None: + self.message = 'Failed error=%s, ' % (returncode) + if stderr: + self.message += '"%s", ' % stderr + self.message += 'args=%s' % (cmd_args) + else: + self.message = message + self.stdout = stdout + self.stderr = stderr + + def __str__(self): + return self.message + + +def run_cmd(cmd_args, input=None): + try: + p = subprocess.Popen(cmd_args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate(input) + returncode = p.returncode + if returncode != 0: + raise CmdError(cmd_args, returncode, + 'failed %s' % (', '.join(cmd_args)), + stdout, stderr) + return stdout, stderr + except OSError as e: + raise CmdError(cmd_args, e.errno, stderr=str(e)) #------------------------------------------------------------------------------- def password_callback(slot, retry): return db_passwd -#------------------------------------------------------------------------------- def nickname_collision_callback(old_nickname, cert): cancel = False @@ -44,6 +70,51 @@ return new_nickname, cancel +def get_cert_der_from_db(nickname): + cmd_args = ['/usr/bin/certutil', + '-d', db_name, + '-L', + '-n', nickname] + + try: + stdout, stderr = run_cmd(cmd_args) + except CmdError as e: + if e.returncode == 255 and 'not found' in e.stderr: + return None + else: + raise + return stdout + +def delete_cert_from_db(nickname): + cmd_args = ['/usr/bin/certutil', + '-d', db_name, + '-D', + '-n', nickname] + + run_cmd(cmd_args) + +def create_pk12(nickname, filename): + cmd_args = ['/usr/bin/pk12util', + '-o', filename, + '-n', nickname, + '-d', db_name, + '-K', db_passwd, + '-W', pk12_passwd] + run_cmd(cmd_args) + +def list_pk12(filename): + cmd_args = ['/usr/bin/pk12util', + '-l', filename, + '-W', pk12_passwd] + stdout, stderr = run_cmd(cmd_args) + return stdout + +def strip_key_from_pk12_listing(text): + match = re.search(r'^Certificate:$', text, re.MULTILINE) + if not match: + raise ValueError('Could not file Key section in pk12 listing') + return text[match.start(0):] + #------------------------------------------------------------------------------- def load_tests(loader, tests, pattern): @@ -59,22 +130,23 @@ class TestPKCS12Decoder(unittest.TestCase): def setUp(self): - nss.nss_init_read_write(certdir) + nss.nss_init_read_write(db_name) nss.set_password_callback(password_callback) nss.pkcs12_set_nickname_collision_callback(nickname_collision_callback) nss.pkcs12_enable_all_ciphers() + self.cert_der = get_cert_der_from_db(cert_nickname) + if self.cert_der is None: + raise ValueError('cert with nickname "%s" not in database "%s"' % (cert_nickname, db_name)) def tearDown(self): nss.nss_shutdown() def test_read(self): if verbose: print "test_read" - cmd='pk12util -o %s -n %s -d pki -K %s -W %s' % \ - (read_pkcs12_file, read_nickname, db_passwd, pkcs12_file_password) - run_cmd(cmd) + create_pk12(cert_nickname, pk12_filename) slot = nss.get_internal_key_slot() - pkcs12 = nss.PKCS12Decoder(read_pkcs12_file, pkcs12_file_password, slot) + pkcs12 = nss.PKCS12Decoder(pk12_filename, pk12_passwd, slot) self.assertEqual(len(pkcs12), 3) cert_bag_count = 0 @@ -102,33 +174,43 @@ def test_import(self): if verbose: print "test_import" - cmd='certutil -d pki -D -n %s' % (read_nickname) - run_cmd(cmd) + delete_cert_from_db(cert_nickname) + self.assertEqual(get_cert_der_from_db(cert_nickname), None) slot = nss.get_internal_key_slot() - pkcs12 = nss.PKCS12Decoder(read_pkcs12_file, pkcs12_file_password, slot) + pkcs12 = nss.PKCS12Decoder(pk12_filename, pk12_passwd, slot) slot.authenticate() pkcs12.database_import() + cert_der = get_cert_der_from_db(cert_nickname) + self.assertEqual(cert_der, self.cert_der) #------------------------------------------------------------------------------- class TestPKCS12Export(unittest.TestCase): def setUp(self): - nss.nss_init(certdir) + nss.nss_init(db_name) nss.set_password_callback(password_callback) nss.pkcs12_enable_all_ciphers() + self.cert_der = get_cert_der_from_db(cert_nickname) + if self.cert_der is None: + raise ValueError('cert with nickname "%s" not in database "%s"' % (cert_nickname, db_name)) def tearDown(self): nss.nss_shutdown() def test_export(self): if verbose: print "test_export" - pkcs12_data = nss.pkcs12_export(export_nickname, pkcs12_file_password) - if write_export_file: - p12_file_path = os.path.join(os.path.dirname(sys.argv[0]), "%s.p12" % export_nickname) - f = open(p12_file_path, 'w') + pkcs12_data = nss.pkcs12_export(cert_nickname, pk12_passwd) + with open(exported_pk12_filename, 'w') as f: f.write(pkcs12_data) - f.close() + + pk12_listing = list_pk12(pk12_filename) + pk12_listing = strip_key_from_pk12_listing(pk12_listing) + + exported_pk12_listing = list_pk12(exported_pk12_filename) + exported_pk12_listing = strip_key_from_pk12_listing(exported_pk12_listing) + + self.assertEqual(pk12_listing, exported_pk12_listing) if __name__ == '__main__': unittest.main() diff -N -u -r python-nss-0.14.0/test/util.py python-nss-0.14.1/test/util.py --- python-nss-0.14.0/test/util.py 1969-12-31 19:00:00.000000000 -0500 +++ python-nss-0.14.1/test/util.py 2013-06-13 09:58:32.000000000 -0400 @@ -0,0 +1,40 @@ +import sys +import os +from distutils.util import get_platform + +def get_build_dir(): + ''' + Walk from the current directory up until a directory is found + which contains a regular file called "setup.py" and a directory + called "build". If found return the fully qualified path to + the build directory's platform specific directory, this is where + the architecture specific build produced by setup.py is located. + + There is no API in distutils to return the platform specific + directory so we use as much as distutils exposes, the rest was + determined by looking at the source code for distutils. + + If the build directory cannont be found in the tree None is returned. + ''' + cwd = os.getcwd() + path_components = cwd.split('/') + while (len(path_components)): + path = os.path.join('/', *path_components) + setup_path = os.path.join(path, 'setup.py') + build_path = os.path.join(path, 'build') + # Does this directory contain the file "setup.py" and the directory "build"? + if os.path.exists(setup_path) and os.path.exists(build_path) and \ + os.path.isfile(setup_path) and os.path.isdir(build_path): + # Found, return the path contentated with the architecture + # specific build directory + platform_specifier = "lib.%s-%s" % (get_platform(), sys.version[0:3]) + return os.path.join(build_path, platform_specifier) + + # Not found, ascend to parent directory and try again + path_components.pop() + + # Failed to find the build directory + return None + +def insert_build_dir_into_path(): + sys.path.insert(0,get_build_dir())