[Pki-devel] [PATCH] 658 Added pki-server subsystem-cert-export command.

Endi Sukma Dewata edewata at redhat.com
Thu Nov 12 16:18:38 UTC 2015


A new command has been added to export a system certificate, the
CSR, and the key. This command can be used to migrate a system
certificate into another instance.

https://fedorahosted.org/pki/ticket/456

-- 
Endi S. Dewata
-------------- next part --------------
From 6dbe3082fa7c7c2e7b1ef020c51c08568ff46547 Mon Sep 17 00:00:00 2001
From: "Endi S. Dewata" <edewata at redhat.com>
Date: Thu, 12 Nov 2015 00:23:26 +0100
Subject: [PATCH] Added pki-server subsystem-cert-export command.

A new command has been added to export a system certificate, the
CSR, and the key. This command can be used to migrate a system
certificate into another instance.

https://fedorahosted.org/pki/ticket/456
---
 base/common/python/pki/nss.py                  | 336 +++++++++++++++++++++++++
 base/server/python/pki/server/__init__.py      |   6 +
 base/server/python/pki/server/cli/subsystem.py | 126 ++++++++++
 3 files changed, 468 insertions(+)
 create mode 100644 base/common/python/pki/nss.py

diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py
new file mode 100644
index 0000000000000000000000000000000000000000..f36b771f85eb45641022d6033c23a88aca50757a
--- /dev/null
+++ b/base/common/python/pki/nss.py
@@ -0,0 +1,336 @@
+#!/usr/bin/python
+# Authors:
+#     Endi S. Dewata <edewata at redhat.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# Copyright (C) 2015 Red Hat, Inc.
+# All rights reserved.
+#
+
+import base64
+import os
+import shutil
+import subprocess
+import tempfile
+
+
+CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----'
+CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
+
+CERT_HEADER = '-----BEGIN CERTIFICATE-----'
+CERT_FOOTER = '-----END CERTIFICATE-----'
+
+
+def convert_data(data, input_format, output_format, header=None, footer=None):
+
+    if input_format == 'base64' and output_format == 'pem':
+
+        # split a single line into multiple lines
+        lines = [data[i:i+64] for i in range(0, len(data), 64)]
+        return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
+
+    if input_format == 'pem' and output_format == 'base64':
+
+        # join multiple lines into a single line
+        lines = []
+        for line in data.splitlines():
+            line = line.rstrip('\r\n')
+            if line == header:
+                continue
+            if line == footer:
+                continue
+            lines.append(line)
+
+        return ''.join(lines)
+
+    raise Exception('Unable to convert data from %s to %s' % (input_format, output_format))
+
+def convert_csr(csr_data, input_format, output_format):
+
+    return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER)
+
+def convert_cert(cert_data, input_format, output_format):
+
+    return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
+
+
+class NSSDatabase(object):
+
+    def __init__(self, directory, password=None, password_file=None):
+        self.directory = directory
+
+        self.tmpdir = tempfile.mkdtemp()
+
+        if password:
+            self.password_file = os.path.join(self.tmpdir, 'password.txt')
+            with open(self.password_file, 'w') as f:
+                f.write(password)
+
+        elif password_file:
+            self.password_file = password_file
+
+        else:
+            raise Exception('Missing NSS database password')
+
+    def close(self):
+        shutil.rmtree(self.tmpdir)
+
+    def add_cert(self,
+        nickname, cert_file,
+        trust_attributes='u,u,u'):
+
+        subprocess.check_call([
+            'certutil',
+            '-A',
+            '-d', self.directory,
+            '-n', nickname,
+            '-i', cert_file,
+            '-t', trust_attributes
+        ])
+
+    def modify_cert(self,
+        nickname,
+        trust_attributes='u,u,u'):
+
+        subprocess.check_call([
+            'certutil',
+            '-M',
+            '-d', self.directory,
+            '-n', nickname,
+            '-t', trust_attributes
+        ])
+
+    def create_noise(self, noise_file, size=2048):
+
+        subprocess.check_call([
+            'openssl',
+            'rand',
+            '-out', noise_file,
+            str(size)
+        ])
+
+    def create_request(self,
+        subject_dn,
+        noise_file,
+        request_file):
+
+        tmpdir = tempfile.mkdtemp()
+
+        try:
+            binary_request_file = os.path.join(tmpdir, 'request.bin')
+            b64_request_file = os.path.join(tmpdir, 'request.b64')
+
+            # generate binary request
+            subprocess.check_call([
+                'certutil',
+                '-R',
+                '-d', self.directory,
+                '-f', self.password_file,
+                '-s', subject_dn,
+                '-z', noise_file,
+                '-o', binary_request_file
+            ])
+
+            # encode binary request in base-64
+            subprocess.check_call([
+                'BtoA', binary_request_file, b64_request_file])
+
+            # read base-64 request
+            with open(b64_request_file, 'r') as f:
+                b64_request = f.read()
+
+            # add header and footer
+            with open(request_file, 'w') as f:
+                f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n')
+                f.write(b64_request)
+                f.write('-----END NEW CERTIFICATE REQUEST-----\n')
+
+        finally:
+            shutil.rmtree(tmpdir)
+
+    def create_self_signed_ca_cert(self,
+        subject_dn,
+        request_file,
+        cert_file,
+        serial='1',
+        validity=240):
+
+        p = subprocess.Popen([
+            'certutil',
+            '-C',
+            '-x',
+            '-d', self.directory,
+            '-f', self.password_file,
+            '-c', subject_dn,
+            '-a',
+            '-i', request_file,
+            '-o', cert_file,
+            '-m', serial,
+            '-v', str(validity),
+            '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical',
+            '-2',
+            '-3',
+            '--extSKID',
+            '--extAIA'
+        ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+        keystroke = ''
+
+        # Is this a CA certificate [y/N]?
+        keystroke += 'y\n'
+
+        # Enter the path length constraint, enter to skip [<0 for unlimited path]:
+        keystroke += '\n'
+
+        # Is this a critical extension [y/N]?
+        keystroke += 'y\n'
+
+        # Enter value for the authKeyID extension [y/N]?
+        keystroke += 'y\n'
+
+        # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+        # Enter value for the key identifier fields,enter to omit:
+        keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+        # Select one of the following general name type:
+        keystroke += '0\n'
+
+        # Enter value for the authCertSerial field, enter to omit:
+        keystroke += '\n'
+
+        # Is this a critical extension [y/N]?
+        keystroke += '\n'
+
+        # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+        # Adding Subject Key ID extension.
+        # Enter value for the key identifier fields,enter to omit:
+        keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+        # Is this a critical extension [y/N]?
+        keystroke += '\n'
+
+        # Enter access method type for Authority Information Access extension:
+        keystroke += '2\n'
+
+        # Select one of the following general name type:
+        keystroke += '7\n'
+
+        # TODO: replace with actual hostname name and port number
+        # Enter data:
+        keystroke += 'http://server.example.com:8080/ca/ocsp\n'
+
+        # Select one of the following general name type:
+        keystroke += '0\n'
+
+        # Add another location to the Authority Information Access extension [y/N]
+        keystroke += '\n'
+
+        # Is this a critical extension [y/N]?
+        keystroke += '\n'
+
+        p.communicate(keystroke)
+
+        rc = p.wait()
+
+        if rc:
+            raise Exception('Failed to generate self-signed CA certificate. RC: %d' + rc)
+
+    def get_cert(self, nickname, output_format='pem'):
+
+        if output_format == 'pem':
+            output_format_option = '-a'
+
+        elif output_format == 'base64':
+            output_format_option = '-r'
+
+        else:
+            raise Exception('Unsupported output format: %s' % output_format)
+
+        cert_data = subprocess.check_output([
+            'certutil',
+            '-L',
+            '-d', self.directory,
+            '-n', nickname,
+            output_format_option
+        ])
+
+        if output_format == 'base64':
+            cert_data = base64.b64encode(cert_data)
+
+        return cert_data
+
+    def remove_cert(self, nickname):
+
+        subprocess.check_call([
+            'certutil',
+            '-D',
+            '-d', self.directory,
+            '-n', nickname
+        ])
+
+    def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
+
+        tmpdir = tempfile.mkdtemp()
+
+        try:
+            if pkcs12_password:
+                password_file = os.path.join(tmpdir, 'password.txt')
+                with open(password_file, 'w') as f:
+                    f.write(pkcs12_password)
+
+            elif pkcs12_password_file:
+                password_file = pkcs12_password_file
+
+            else:
+                raise Exception('Missing PKCS #12 password')
+
+            subprocess.check_call([
+                'pk12util',
+                '-d', self.directory,
+                '-k', self.password_file,
+                '-i', pkcs12_file,
+                '-w', password_file
+            ])
+
+        finally:
+            shutil.rmtree(tmpdir)
+
+    def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None):
+
+        tmpdir = tempfile.mkdtemp()
+
+        try:
+            if pkcs12_password:
+                password_file = os.path.join(tmpdir, 'password.txt')
+                with open(password_file, 'w') as f:
+                    f.write(pkcs12_password)
+
+            elif pkcs12_password_file:
+                password_file = pkcs12_password_file
+
+            else:
+                raise Exception('Missing PKCS #12 password')
+
+            subprocess.check_call([
+                'pk12util',
+                '-d', self.directory,
+                '-k', self.password_file,
+                '-o', pkcs12_file,
+                '-w', password_file,
+                '-n', nickname
+            ])
+
+        finally:
+            shutil.rmtree(tmpdir)
diff --git a/base/server/python/pki/server/__init__.py b/base/server/python/pki/server/__init__.py
index 0d522084c0bf210e93599ce09c2d23d0214c4aa7..d55a3691d180ede7dd1731b7490957c816bd8a3b 100644
--- a/base/server/python/pki/server/__init__.py
+++ b/base/server/python/pki/server/__init__.py
@@ -34,6 +34,7 @@ import subprocess
 import tempfile
 
 import pki
+import pki.nss
 
 INSTANCE_BASE_DIR = '/var/lib/pki'
 REGISTRY_DIR = '/etc/sysconfig/pki'
@@ -327,6 +328,11 @@ class PKIInstance(object):
 
         return password
 
+    def open_nssdb(self):
+        return pki.nss.NSSDatabase(
+            directory=self.nssdb_dir,
+            password=self.get_password('internal'))
+
     def get_subsystem(self, name):
         for subsystem in self.subsystems:
             if name == subsystem.name:
diff --git a/base/server/python/pki/server/cli/subsystem.py b/base/server/python/pki/server/cli/subsystem.py
index 3b9f9860f3b8b8cc9a3722b019c6d93181da6469..91a50cc38de01c98f43bb22935171f7317dbe886 100644
--- a/base/server/python/pki/server/cli/subsystem.py
+++ b/base/server/python/pki/server/cli/subsystem.py
@@ -23,11 +23,13 @@ from __future__ import absolute_import
 from __future__ import print_function
 import base64
 import getopt
+import getpass
 import nss.nss as nss
 import string
 import sys
 
 import pki.cli
+import pki.nss
 import pki.server
 
 
@@ -296,6 +298,7 @@ class SubsystemCertCLI(pki.cli.CLI):
 
         self.add_module(SubsystemCertFindCLI())
         self.add_module(SubsystemCertShowCLI())
+        self.add_module(SubsystemCertExportCLI())
         self.add_module(SubsystemCertUpdateCLI())
 
     @staticmethod
@@ -440,6 +443,129 @@ class SubsystemCertShowCLI(pki.cli.CLI):
         SubsystemCertCLI.print_subsystem_cert(subsystem_cert)
 
 
+class SubsystemCertExportCLI(pki.cli.CLI):
+
+    def __init__(self):
+        super(SubsystemCertExportCLI, self).__init__(
+            'export', 'Export subsystem certificate')
+
+    def usage(self):
+        print('Usage: pki-server subsystem-cert-export [OPTIONS] <subsystem ID> <cert ID>')
+        print()
+        print('  -i, --instance <instance ID>       Instance ID (default: pki-tomcat).')
+        print('      --cert-file <path>             PEM file to store the certificate.')
+        print('      --csr-file <path>              PEM file to store the CSR.')
+        print('      --pkcs12-file <path>           PKCS #12 file to store the certificate and key.')
+        print('      --pkcs12-password <password>   Password for the PKCS #12 file.')
+        print('      --pkcs12-password-file <path>  File containing the password for the PKCS #12 file.')
+        print('  -v, --verbose                      Run in verbose mode.')
+        print('      --help                         Show help message.')
+        print()
+
+    def execute(self, argv):
+
+        try:
+            opts, args = getopt.gnu_getopt(argv, 'i:v', [
+                'instance=', 'cert-file=', 'csr-file=',
+                'pkcs12-file=', 'pkcs12-password=', 'pkcs12-password-file=',
+                'verbose', 'help'])
+
+        except getopt.GetoptError as e:
+            print('ERROR: ' + str(e))
+            self.usage()
+            sys.exit(1)
+
+        if len(args) < 1:
+            print('ERROR: missing subsystem ID')
+            self.usage()
+            sys.exit(1)
+
+        if len(args) < 2:
+            print('ERROR: missing cert ID')
+            self.usage()
+            sys.exit(1)
+
+        subsystem_name = args[0]
+        cert_id = args[1]
+        instance_name = 'pki-tomcat'
+        cert_file = None
+        csr_file = None
+        pkcs12_file = None
+        pkcs12_password = None
+        pkcs12_password_file = None
+
+        for o, a in opts:
+            if o in ('-i', '--instance'):
+                instance_name = a
+
+            elif o == '--cert-file':
+                cert_file = a
+
+            elif o == '--csr-file':
+                csr_file = a
+
+            elif o == '--pkcs12-file':
+                pkcs12_file = a
+
+            elif o == '--pkcs12-password':
+                pkcs12_password = a
+
+            elif o == '--pkcs12-password-file':
+                pkcs12_password_file = a
+
+            elif o in ('-v', '--verbose'):
+                self.set_verbose(True)
+
+            elif o == '--help':
+                self.print_help()
+                sys.exit()
+
+            else:
+                print('ERROR: unknown option ' + o)
+                self.usage()
+                sys.exit(1)
+
+        if not cert_file and not csr_file and not pkcs12_file:
+            print('ERROR: missing output file')
+            self.usage()
+            sys.exit(1)
+
+        instance = pki.server.PKIInstance(instance_name)
+        instance.load()
+
+        subsystem = instance.get_subsystem(subsystem_name)
+        subsystem_cert = subsystem.get_subsystem_cert(cert_id)
+
+        if cert_file:
+
+            cert_data = pki.nss.convert_cert(subsystem_cert['data'], 'base64', 'pem')
+            with open(cert_file, 'w') as f:
+                f.write(cert_data)
+
+        if csr_file:
+
+            csr_data = pki.nss.convert_csr(subsystem_cert['request'], 'base64', 'pem')
+            with open(csr_file, 'w') as f:
+                f.write(csr_data)
+
+        if pkcs12_file:
+
+            if not pkcs12_password and not pkcs12_password_file:
+                pkcs12_password = getpass.getpass(prompt='Enter password for PKCS #12 file: ')
+
+            nssdb = instance.open_nssdb()
+            try:
+                nssdb.export_pkcs12(
+                    pkcs12_file=pkcs12_file,
+                    nickname=subsystem_cert['nickname'],
+                    pkcs12_password=pkcs12_password,
+                    pkcs12_password_file=pkcs12_password_file)
+            finally:
+                nssdb.close()
+
+        self.print_message('Exported %s certificate' % cert_id)
+
+
 class SubsystemCertUpdateCLI(pki.cli.CLI):
 
     def __init__(self):
-- 
2.4.3



More information about the Pki-devel mailing list