[Pki-devel] [PATCH] 669 Merged pki.nss into pki.crypto.
Endi Sukma Dewata
edewata at redhat.com
Fri Jan 22 15:57:06 UTC 2016
The pki.nss module has been merged into pki.crypto to prevent
conflicts with nss module.
https://fedorahosted.org/pki/ticket/456
--
Endi S. Dewata
-------------- next part --------------
From a0200070efd13fd0c68909b22f3d507b21131aff Mon Sep 17 00:00:00 2001
From: "Endi S. Dewata" <edewata at redhat.com>
Date: Fri, 22 Jan 2016 00:47:28 +0100
Subject: [PATCH] Merged pki.nss into pki.crypto.
The pki.nss module has been merged into pki.crypto to prevent
conflicts with nss module.
https://fedorahosted.org/pki/ticket/456
---
base/common/python/pki/crypto.py | 512 ++++++++++++++++++++
base/common/python/pki/nss.py | 532 ---------------------
base/server/python/pki/server/__init__.py | 4 +-
base/server/python/pki/server/cli/subsystem.py | 6 +-
.../server/deployment/scriptlets/configuration.py | 6 +-
5 files changed, 520 insertions(+), 540 deletions(-)
delete mode 100644 base/common/python/pki/nss.py
diff --git a/base/common/python/pki/crypto.py b/base/common/python/pki/crypto.py
index 60e83c924a6986152ba732cc63b668a906350228..44347031e9df62538da48a59a0315741a911690f 100644
--- a/base/common/python/pki/crypto.py
+++ b/base/common/python/pki/crypto.py
@@ -22,6 +22,7 @@ Module containing crypto classes.
"""
from __future__ import absolute_import
import abc
+import base64
import nss.nss as nss
import os
import six
@@ -30,6 +31,81 @@ import subprocess
import tempfile
+CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----'
+CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
+
+CERT_HEADER = '-----BEGIN CERTIFICATE-----'
+CERT_FOOTER = '-----END CERTIFICATE-----'
+
+PKCS7_HEADER = '-----BEGIN PKCS7-----'
+PKCS7_FOOTER = '-----END PKCS7-----'
+
+
+def convert_data(data, input_format, output_format, header=None, footer=None):
+
+ if input_format == output_format:
+ return data
+
+ if input_format == 'base64' and output_format == 'pem':
+
+ # join base-64 data into a single line
+ data = data.replace('\r', '').replace('\n', '')
+
+ # re-split the line into fixed-length lines
+ lines = [data[i:i+64] for i in range(0, len(data), 64)]
+
+ # add header and footer
+ return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
+
+ if input_format == 'pem' and output_format == 'base64':
+
+ # join multiple lines into a single line
+ lines = []
+ for line in data.splitlines():
+ line = line.rstrip('\r\n')
+ if line == header:
+ continue
+ if line == footer:
+ continue
+ lines.append(line)
+
+ return ''.join(lines)
+
+ raise Exception('Unable to convert data from %s to %s' % (input_format, output_format))
+
+
+def convert_csr(csr_data, input_format, output_format):
+
+ return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER)
+
+
+def convert_cert(cert_data, input_format, output_format):
+
+ return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
+
+
+def convert_pkcs7(pkcs7_data, input_format, output_format):
+
+ return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER)
+
+
+def get_file_type(filename):
+
+ with open(filename, 'r') as f:
+ data = f.read()
+
+ if data.startswith(CSR_HEADER):
+ return 'csr'
+
+ if data.startswith(CERT_HEADER):
+ return 'cert'
+
+ if data.startswith(PKCS7_HEADER):
+ return 'pkcs7'
+
+ return None
+
+
class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)):
"""
Abstract class containing methods to do cryptographic operations.
@@ -287,3 +363,439 @@ class NSSCryptoProvider(CryptoProvider):
Searches NSS database and returns SecItem object for this certificate.
"""
return nss.find_cert_from_nickname(cert_nick)
+
+
+class NSSDatabase(object):
+
+ def __init__(self, directory, token='internal', password=None, password_file=None):
+ self.directory = directory
+ self.token = token
+
+ self.tmpdir = tempfile.mkdtemp()
+
+ if password:
+ self.password_file = os.path.join(self.tmpdir, 'password.txt')
+ with open(self.password_file, 'w') as f:
+ f.write(password)
+
+ elif password_file:
+ self.password_file = password_file
+
+ else:
+ raise Exception('Missing NSS database password')
+
+ def close(self):
+ shutil.rmtree(self.tmpdir)
+
+ def add_cert(self,
+ nickname,
+ cert_file,
+ trust_attributes=',,'):
+
+ cmd = [
+ 'certutil',
+ '-A',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname,
+ '-i', cert_file,
+ '-t', trust_attributes
+ ]
+
+ subprocess.check_call(cmd)
+
+ def modify_cert(self,
+ nickname,
+ trust_attributes):
+
+ cmd = [
+ 'certutil',
+ '-M',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname,
+ '-t', trust_attributes
+ ]
+
+ subprocess.check_call(cmd)
+
+ def create_noise(self, noise_file, size=2048):
+
+ subprocess.check_call([
+ 'openssl',
+ 'rand',
+ '-out', noise_file,
+ str(size)
+ ])
+
+ def create_request(self,
+ subject_dn,
+ request_file,
+ noise_file=None,
+ key_type=None,
+ key_size=None,
+ curve=None,
+ hash_alg=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if not noise_file:
+ noise_file = os.path.join(tmpdir, 'noise.bin')
+ if key_size:
+ size = key_size
+ else:
+ size = 2048
+ self.create_noise(
+ noise_file=noise_file,
+ size=size)
+
+ binary_request_file = os.path.join(tmpdir, 'request.bin')
+
+ cmd = [
+ 'certutil',
+ '-R',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-s', subject_dn,
+ '-o', binary_request_file,
+ '-z', noise_file
+ ]
+
+ if key_type:
+ cmd.extend(['-k', key_type])
+
+ if key_size:
+ cmd.extend(['-g', str(key_size)])
+
+ if curve:
+ cmd.extend(['-q', curve])
+
+ if hash_alg:
+ cmd.extend(['-Z', hash_alg])
+
+ # generate binary request
+ subprocess.check_call(cmd)
+
+ # encode binary request in base-64
+ b64_request_file = os.path.join(tmpdir, 'request.b64')
+ subprocess.check_call([
+ 'BtoA', binary_request_file, b64_request_file])
+
+ # read base-64 request
+ with open(b64_request_file, 'r') as f:
+ b64_request = f.read()
+
+ # add header and footer
+ with open(request_file, 'w') as f:
+ f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n')
+ f.write(b64_request)
+ f.write('-----END NEW CERTIFICATE REQUEST-----\n')
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def create_self_signed_ca_cert(self,
+ subject_dn,
+ request_file,
+ cert_file,
+ serial='1',
+ validity=240):
+
+ cmd = [
+ 'certutil',
+ '-C',
+ '-x',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-c', subject_dn,
+ '-a',
+ '-i', request_file,
+ '-o', cert_file,
+ '-m', serial,
+ '-v', str(validity),
+ '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical',
+ '-2',
+ '-3',
+ '--extSKID',
+ '--extAIA'
+ ]
+
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+ keystroke = ''
+
+ # Is this a CA certificate [y/N]?
+ keystroke += 'y\n'
+
+ # Enter the path length constraint, enter to skip [<0 for unlimited path]:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += 'y\n'
+
+ # Enter value for the authKeyID extension [y/N]?
+ keystroke += 'y\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Enter value for the authCertSerial field, enter to omit:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Adding Subject Key ID extension.
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # Enter access method type for Authority Information Access extension:
+ keystroke += '2\n'
+
+ # Select one of the following general name type:
+ keystroke += '7\n'
+
+ # TODO: replace with actual hostname name and port number
+ # Enter data:
+ keystroke += 'http://server.example.com:8080/ca/ocsp\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Add another location to the Authority Information Access extension [y/N]
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ p.communicate(keystroke)
+
+ rc = p.wait()
+
+ if rc:
+ raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc)
+
+ def get_cert(self, nickname, output_format='pem'):
+
+ if output_format == 'pem':
+ output_format_option = '-a'
+
+ elif output_format == 'base64':
+ output_format_option = '-r'
+
+ else:
+ raise Exception('Unsupported output format: %s' % output_format)
+
+ cmd = [
+ 'certutil',
+ '-L',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname,
+ output_format_option
+ ]
+
+ cert_data = subprocess.check_output(cmd)
+
+ if output_format == 'base64':
+ cert_data = base64.b64encode(cert_data)
+
+ return cert_data
+
+ def remove_cert(self, nickname):
+
+ cmd = [
+ 'certutil',
+ '-D',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname
+ ]
+
+ subprocess.check_call(cmd)
+
+ def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ file_type = get_file_type(cert_chain_file)
+
+ if file_type == 'cert': # import single PEM cert
+ self.add_cert(
+ nickname=nickname,
+ cert_file=cert_chain_file,
+ trust_attributes=trust_attributes)
+ return self.get_cert(
+ nickname=nickname,
+ output_format='base64')
+
+ elif file_type == 'pkcs7': # import PKCS #7 cert chain
+ return self.import_pkcs7(
+ pkcs7_file=cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes,
+ output_format='base64')
+
+ else: # import PKCS #7 data without header/footer
+ with open(cert_chain_file, 'r') as f:
+ base64_data = f.read()
+ pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem')
+
+ tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b')
+ with open(tmp_cert_chain_file, 'w') as f:
+ f.write(pkcs7_data)
+
+ self.import_pkcs7(
+ pkcs7_file=tmp_cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes)
+
+ return base64_data
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ # export certs from PKCS #7 into PEM output
+ output = subprocess.check_output([
+ 'openssl',
+ 'pkcs7',
+ '-print_certs',
+ '-in', pkcs7_file
+ ])
+
+ # parse PEM output into separate PEM certificates
+ certs = []
+ lines = []
+ state = 'header'
+
+ for line in output.splitlines():
+
+ if state == 'header':
+ if line != CERT_HEADER:
+ # ignore header lines
+ pass
+ else:
+ # save cert header
+ lines.append(line)
+ state = 'body'
+
+ elif state == 'body':
+ if line != CERT_FOOTER:
+ # save cert body
+ lines.append(line)
+ else:
+ # save cert footer
+ lines.append(line)
+
+ # construct PEM cert
+ cert = '\n'.join(lines)
+ certs.append(cert)
+ lines = []
+ state = 'header'
+
+ # import PEM certs into NSS database
+ counter = 1
+ for cert in certs:
+
+ cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter)
+ with open(cert_file, 'w') as f:
+ f.write(cert)
+
+ if counter == 1:
+ n = nickname
+ else:
+ n = '%s #%d' % (nickname, counter)
+
+ self.add_cert(n, cert_file, trust_attributes)
+
+ counter += 1
+
+ # convert PKCS #7 data to the requested format
+ with open(pkcs7_file, 'r') as f:
+ data = f.read()
+
+ return convert_pkcs7(data, 'pem', output_format)
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ cmd = [
+ 'pk12util',
+ '-d', self.directory,
+ '-h', self.token,
+ '-k', self.password_file,
+ '-i', pkcs12_file,
+ '-w', password_file
+ ]
+
+ subprocess.check_call(cmd)
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ cmd = [
+ 'pk12util',
+ '-d', self.directory,
+ '-k', self.password_file,
+ '-o', pkcs12_file,
+ '-w', password_file,
+ '-n', nickname
+ ]
+
+ subprocess.check_call(cmd)
+
+ finally:
+ shutil.rmtree(tmpdir)
diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py
deleted file mode 100644
index 44e286853c252675bff9e25c32377aaa86f8cf18..0000000000000000000000000000000000000000
--- a/base/common/python/pki/nss.py
+++ /dev/null
@@ -1,532 +0,0 @@
-# Authors:
-# Endi S. Dewata <edewata at redhat.com>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; version 2 of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-#
-# Copyright (C) 2015 Red Hat, Inc.
-# All rights reserved.
-#
-
-import base64
-import os
-import shutil
-import subprocess
-import tempfile
-
-
-CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----'
-CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
-
-CERT_HEADER = '-----BEGIN CERTIFICATE-----'
-CERT_FOOTER = '-----END CERTIFICATE-----'
-
-PKCS7_HEADER = '-----BEGIN PKCS7-----'
-PKCS7_FOOTER = '-----END PKCS7-----'
-
-
-def convert_data(data, input_format, output_format, header=None, footer=None):
-
- if input_format == output_format:
- return data
-
- if input_format == 'base64' and output_format == 'pem':
-
- # join base-64 data into a single line
- data = data.replace('\r', '').replace('\n', '')
-
- # re-split the line into fixed-length lines
- lines = [data[i:i+64] for i in range(0, len(data), 64)]
-
- # add header and footer
- return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
-
- if input_format == 'pem' and output_format == 'base64':
-
- # join multiple lines into a single line
- lines = []
- for line in data.splitlines():
- line = line.rstrip('\r\n')
- if line == header:
- continue
- if line == footer:
- continue
- lines.append(line)
-
- return ''.join(lines)
-
- raise Exception('Unable to convert data from %s to %s' % (input_format, output_format))
-
-def convert_csr(csr_data, input_format, output_format):
-
- return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER)
-
-def convert_cert(cert_data, input_format, output_format):
-
- return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
-
-def convert_pkcs7(pkcs7_data, input_format, output_format):
-
- return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER)
-
-def get_file_type(filename):
-
- with open(filename, 'r') as f:
- data = f.read()
-
- if data.startswith(CSR_HEADER):
- return 'csr'
-
- if data.startswith(CERT_HEADER):
- return 'cert'
-
- if data.startswith(PKCS7_HEADER):
- return 'pkcs7'
-
- return None
-
-
-class NSSDatabase(object):
-
- def __init__(self, directory, token='internal', password=None, password_file=None):
- self.directory = directory
- self.token = token
-
- self.tmpdir = tempfile.mkdtemp()
-
- if password:
- self.password_file = os.path.join(self.tmpdir, 'password.txt')
- with open(self.password_file, 'w') as f:
- f.write(password)
-
- elif password_file:
- self.password_file = password_file
-
- else:
- raise Exception('Missing NSS database password')
-
- def close(self):
- shutil.rmtree(self.tmpdir)
-
- def add_cert(self,
- nickname,
- cert_file,
- trust_attributes=',,'):
-
- cmd = [
- 'certutil',
- '-A',
- '-d', self.directory,
- '-h', self.token,
- '-f', self.password_file,
- '-n', nickname,
- '-i', cert_file,
- '-t', trust_attributes
- ]
-
- subprocess.check_call(cmd)
-
- def modify_cert(self,
- nickname,
- trust_attributes):
-
- cmd = [
- 'certutil',
- '-M',
- '-d', self.directory,
- '-h', self.token,
- '-f', self.password_file,
- '-n', nickname,
- '-t', trust_attributes
- ]
-
- subprocess.check_call(cmd)
-
- def create_noise(self, noise_file, size=2048):
-
- subprocess.check_call([
- 'openssl',
- 'rand',
- '-out', noise_file,
- str(size)
- ])
-
- def create_request(self,
- subject_dn,
- request_file,
- noise_file=None,
- key_type=None,
- key_size=None,
- curve=None,
- hash_alg=None):
-
- tmpdir = tempfile.mkdtemp()
-
- try:
- if not noise_file:
- noise_file = os.path.join(tmpdir, 'noise.bin')
- if key_size:
- size = key_size
- else:
- size = 2048
- self.create_noise(
- noise_file=noise_file,
- size=size)
-
- binary_request_file = os.path.join(tmpdir, 'request.bin')
-
- cmd = [
- 'certutil',
- '-R',
- '-d', self.directory,
- '-h', self.token,
- '-f', self.password_file,
- '-s', subject_dn,
- '-o', binary_request_file,
- '-z', noise_file
- ]
-
- if key_type:
- cmd.extend(['-k', key_type])
-
- if key_size:
- cmd.extend(['-g', str(key_size)])
-
- if curve:
- cmd.extend(['-q', curve])
-
- if hash_alg:
- cmd.extend(['-Z', hash_alg])
-
- # generate binary request
- subprocess.check_call(cmd)
-
- # encode binary request in base-64
- b64_request_file = os.path.join(tmpdir, 'request.b64')
- subprocess.check_call([
- 'BtoA', binary_request_file, b64_request_file])
-
- # read base-64 request
- with open(b64_request_file, 'r') as f:
- b64_request = f.read()
-
- # add header and footer
- with open(request_file, 'w') as f:
- f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n')
- f.write(b64_request)
- f.write('-----END NEW CERTIFICATE REQUEST-----\n')
-
- finally:
- shutil.rmtree(tmpdir)
-
- def create_self_signed_ca_cert(self,
- subject_dn,
- request_file,
- cert_file,
- serial='1',
- validity=240):
-
- cmd = [
- 'certutil',
- '-C',
- '-x',
- '-d', self.directory,
- '-h', self.token,
- '-f', self.password_file,
- '-c', subject_dn,
- '-a',
- '-i', request_file,
- '-o', cert_file,
- '-m', serial,
- '-v', str(validity),
- '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical',
- '-2',
- '-3',
- '--extSKID',
- '--extAIA'
- ]
-
- p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-
- keystroke = ''
-
- # Is this a CA certificate [y/N]?
- keystroke += 'y\n'
-
- # Enter the path length constraint, enter to skip [<0 for unlimited path]:
- keystroke += '\n'
-
- # Is this a critical extension [y/N]?
- keystroke += 'y\n'
-
- # Enter value for the authKeyID extension [y/N]?
- keystroke += 'y\n'
-
- # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
- # Enter value for the key identifier fields,enter to omit:
- keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
-
- # Select one of the following general name type:
- keystroke += '0\n'
-
- # Enter value for the authCertSerial field, enter to omit:
- keystroke += '\n'
-
- # Is this a critical extension [y/N]?
- keystroke += '\n'
-
- # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
- # Adding Subject Key ID extension.
- # Enter value for the key identifier fields,enter to omit:
- keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
-
- # Is this a critical extension [y/N]?
- keystroke += '\n'
-
- # Enter access method type for Authority Information Access extension:
- keystroke += '2\n'
-
- # Select one of the following general name type:
- keystroke += '7\n'
-
- # TODO: replace with actual hostname name and port number
- # Enter data:
- keystroke += 'http://server.example.com:8080/ca/ocsp\n'
-
- # Select one of the following general name type:
- keystroke += '0\n'
-
- # Add another location to the Authority Information Access extension [y/N]
- keystroke += '\n'
-
- # Is this a critical extension [y/N]?
- keystroke += '\n'
-
- p.communicate(keystroke)
-
- rc = p.wait()
-
- if rc:
- raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc)
-
- def get_cert(self, nickname, output_format='pem'):
-
- if output_format == 'pem':
- output_format_option = '-a'
-
- elif output_format == 'base64':
- output_format_option = '-r'
-
- else:
- raise Exception('Unsupported output format: %s' % output_format)
-
- cmd = [
- 'certutil',
- '-L',
- '-d', self.directory,
- '-h', self.token,
- '-f', self.password_file,
- '-n', nickname,
- output_format_option
- ]
-
- cert_data = subprocess.check_output(cmd)
-
- if output_format == 'base64':
- cert_data = base64.b64encode(cert_data)
-
- return cert_data
-
- def remove_cert(self, nickname):
-
- cmd = [
- 'certutil',
- '-D',
- '-d', self.directory,
- '-h', self.token,
- '-f', self.password_file,
- '-n', nickname
- ]
-
- subprocess.check_call(cmd)
-
- def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None):
-
- tmpdir = tempfile.mkdtemp()
-
- try:
- file_type = get_file_type(cert_chain_file)
-
- if file_type == 'cert': # import single PEM cert
- self.add_cert(
- nickname=nickname,
- cert_file=cert_chain_file,
- trust_attributes=trust_attributes)
- return self.get_cert(
- nickname=nickname,
- output_format='base64')
-
- elif file_type == 'pkcs7': # import PKCS #7 cert chain
- return self.import_pkcs7(
- pkcs7_file=cert_chain_file,
- nickname=nickname,
- trust_attributes=trust_attributes,
- output_format='base64')
-
- else: # import PKCS #7 data without header/footer
- with open(cert_chain_file, 'r') as f:
- base64_data = f.read()
- pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem')
-
- tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b')
- with open(tmp_cert_chain_file, 'w') as f:
- f.write(pkcs7_data)
-
- self.import_pkcs7(
- pkcs7_file=tmp_cert_chain_file,
- nickname=nickname,
- trust_attributes=trust_attributes)
-
- return base64_data
-
- finally:
- shutil.rmtree(tmpdir)
-
- def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'):
-
- tmpdir = tempfile.mkdtemp()
-
- try:
- # export certs from PKCS #7 into PEM output
- output = subprocess.check_output([
- 'openssl',
- 'pkcs7',
- '-print_certs',
- '-in', pkcs7_file
- ])
-
- # parse PEM output into separate PEM certificates
- certs = []
- lines = []
- state = 'header'
-
- for line in output.splitlines():
-
- if state == 'header':
- if line != CERT_HEADER:
- # ignore header lines
- pass
- else:
- # save cert header
- lines.append(line)
- state = 'body'
-
- elif state == 'body':
- if line != CERT_FOOTER:
- # save cert body
- lines.append(line)
- else:
- # save cert footer
- lines.append(line)
-
- # construct PEM cert
- cert = '\n'.join(lines)
- certs.append(cert)
- lines = []
- state = 'header'
-
- # import PEM certs into NSS database
- counter = 1
- for cert in certs:
-
- cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter)
- with open(cert_file, 'w') as f:
- f.write(cert)
-
- if counter == 1:
- n = nickname
- else:
- n = '%s #%d' % (nickname, counter)
-
- self.add_cert(n, cert_file, trust_attributes)
-
- counter += 1
-
- # convert PKCS #7 data to the requested format
- with open(pkcs7_file, 'r') as f:
- data = f.read()
-
- return convert_pkcs7(data, 'pem', output_format)
-
- finally:
- shutil.rmtree(tmpdir)
-
- def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
-
- tmpdir = tempfile.mkdtemp()
-
- try:
- if pkcs12_password:
- password_file = os.path.join(tmpdir, 'password.txt')
- with open(password_file, 'w') as f:
- f.write(pkcs12_password)
-
- elif pkcs12_password_file:
- password_file = pkcs12_password_file
-
- else:
- raise Exception('Missing PKCS #12 password')
-
- cmd = [
- 'pk12util',
- '-d', self.directory,
- '-h', self.token,
- '-k', self.password_file,
- '-i', pkcs12_file,
- '-w', password_file
- ]
-
- subprocess.check_call(cmd)
-
- finally:
- shutil.rmtree(tmpdir)
-
- def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None):
-
- tmpdir = tempfile.mkdtemp()
-
- try:
- if pkcs12_password:
- password_file = os.path.join(tmpdir, 'password.txt')
- with open(password_file, 'w') as f:
- f.write(pkcs12_password)
-
- elif pkcs12_password_file:
- password_file = pkcs12_password_file
-
- else:
- raise Exception('Missing PKCS #12 password')
-
- cmd = [
- 'pk12util',
- '-d', self.directory,
- '-k', self.password_file,
- '-o', pkcs12_file,
- '-w', password_file,
- '-n', nickname
- ]
-
- subprocess.check_call(cmd)
-
- finally:
- shutil.rmtree(tmpdir)
diff --git a/base/server/python/pki/server/__init__.py b/base/server/python/pki/server/__init__.py
index 98b7d97bfaf237d97bda1246ac9a26fde1c70402..0bbf69eb8c37b78653df61b3711420e927feb19d 100644
--- a/base/server/python/pki/server/__init__.py
+++ b/base/server/python/pki/server/__init__.py
@@ -33,7 +33,7 @@ import subprocess
import tempfile
import pki
-import pki.nss
+import pki.crypto
INSTANCE_BASE_DIR = '/var/lib/pki'
REGISTRY_DIR = '/etc/sysconfig/pki'
@@ -328,7 +328,7 @@ class PKIInstance(object):
return password
def open_nssdb(self, token='internal'):
- return pki.nss.NSSDatabase(
+ return pki.crypto.NSSDatabase(
directory=self.nssdb_dir,
token=token,
password=self.get_password(token))
diff --git a/base/server/python/pki/server/cli/subsystem.py b/base/server/python/pki/server/cli/subsystem.py
index d033142f5be2a221a30b9b750da80a3143b79574..b915017cbf6003502559581ee393e6953d41c36f 100644
--- a/base/server/python/pki/server/cli/subsystem.py
+++ b/base/server/python/pki/server/cli/subsystem.py
@@ -28,7 +28,7 @@ import string
import sys
import pki.cli
-import pki.nss
+import pki.crypto
import pki.server
@@ -537,13 +537,13 @@ class SubsystemCertExportCLI(pki.cli.CLI):
if cert_file:
- cert_data = pki.nss.convert_cert(subsystem_cert['data'], 'base64', 'pem')
+ cert_data = pki.crypto.convert_cert(subsystem_cert['data'], 'base64', 'pem')
with open(cert_file, 'w') as f:
f.write(cert_data)
if csr_file:
- csr_data = pki.nss.convert_csr(subsystem_cert['request'], 'base64', 'pem')
+ csr_data = pki.crypto.convert_csr(subsystem_cert['request'], 'base64', 'pem')
with open(csr_file, 'w') as f:
f.write(csr_data)
diff --git a/base/server/python/pki/server/deployment/scriptlets/configuration.py b/base/server/python/pki/server/deployment/scriptlets/configuration.py
index a5ab3f88b6c74de7acf8ca6224c87a71e1211c08..f466df93beb0349966167f432df12d6605ac89d9 100644
--- a/base/server/python/pki/server/deployment/scriptlets/configuration.py
+++ b/base/server/python/pki/server/deployment/scriptlets/configuration.py
@@ -27,8 +27,8 @@ from .. import pkiconfig as config
from .. import pkimessages as log
from .. import pkiscriptlet
+import pki.crypto
import pki.encoder
-import pki.nss
import pki.server
import pki.system
import pki.util
@@ -138,7 +138,7 @@ class PkiScriptlet(pkiscriptlet.AbstractBasePkiScriptlet):
hash_alg=hash_alg)
with open(external_csr_path) as f:
signing_csr = f.read()
- signing_csr = pki.nss.convert_csr(signing_csr, 'pem', 'base64')
+ signing_csr = pki.crypto.convert_csr(signing_csr, 'pem', 'base64')
subsystem.config['ca.signing.certreq'] = signing_csr
# This is needed by IPA to detect step 1 completion.
@@ -153,7 +153,7 @@ class PkiScriptlet(pkiscriptlet.AbstractBasePkiScriptlet):
if external_csr_path:
with open(external_csr_path) as f:
signing_csr = f.read()
- signing_csr = pki.nss.convert_csr(signing_csr, 'pem', 'base64')
+ signing_csr = pki.crypto.convert_csr(signing_csr, 'pem', 'base64')
subsystem.config['ca.signing.certreq'] = signing_csr
# If specified, import external CA cert into NSS database.
--
2.4.3
More information about the Pki-devel
mailing list