[Erp5-report] r42381 luke - /erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Jan 17 15:45:01 CET 2011
Author: luke
Date: Mon Jan 17 15:45:01 2011
New Revision: 42381
URL: http://svn.erp5.org?rev=42381&view=rev
Log:
- simplify usage of subprocess
- kill only when returncode is none
- to not play with hiding exceptions during runtime, so
CertificateGenerationError, as simple ValueError is acceptable for
simplicity
- during revoking generate hashed links to the newest CRL
Modified:
erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py
Modified: erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py?rev=42381&r1=42380&r2=42381&view=diff
==============================================================================
--- erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py [utf8] (original)
+++ erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py [utf8] Mon Jan 17 15:45:01 2011
@@ -32,15 +32,22 @@ from Products.ERP5Type.Globals import In
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
-from zLOG import LOG, INFO, ERROR
+from zLOG import LOG, INFO
import os
import subprocess
import base64
-class CertificateGenerationError(Exception):
- """Exception raised when certificate authority failed to work"""
- pass
+def popenCommunicate(command_list, input=None, **kwargs):
+ kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ popen = subprocess.Popen(command_list, **kwargs)
+ result = popen.communicate(input)[0]
+ if popen.returncode is None:
+ popen.kill()
+ if popen.returncode != 0:
+ raise ValueError('Issue during calling %r, result was: %r' % (command_list,
+ result))
+ return result
class CertificateAuthorityBusy(Exception):
"""Exception raised when certificate authority is busy"""
@@ -183,23 +190,11 @@ class CertificateAuthorityTool(BaseTool)
csr = os.path.join(self.certificate_authority_path, new_id + '.csr')
cert = os.path.join(self.certificate_authority_path, 'certs', new_id + '.crt')
try:
- keygen = subprocess.Popen([self.openssl_binary, 'req', '-nodes', '-config',
+ popenCommunicate([self.openssl_binary, 'req', '-nodes', '-config',
self.openssl_config, '-new', '-keyout', key, '-out', csr, '-days',
- '3650'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
- stdin=subprocess.PIPE)
- result = keygen.communicate('%s\n' % cn)[0]
- if keygen.returncode is None or keygen.returncode != 0:
- LOG('CertificateAuthorityTool', ERROR, 'Issue during key generation, result was:%r' % result)
- keygen.kill()
- raise CertificateGenerationError
- keysign = subprocess.Popen([self.openssl_binary, 'ca', '-batch', '-config',
- self.openssl_config, '-out', cert, '-infiles', csr], stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- result = keysign.communicate()[0]
- if keysign.returncode is None or keysign.returncode != 0:
- LOG('CertificateAuthorityTool', ERROR, 'Issue during key signing, result was:%r' % result)
- keygen.kill()
- raise CertificateGenerationError
+ '3650'], '%s\n' % cn, stdin=subprocess.PIPE)
+ popenCommunicate([self.openssl_binary, 'ca', '-batch', '-config',
+ self.openssl_config, '-out', cert, '-infiles', csr])
os.unlink(csr)
return dict(
key=open(key).read(),
@@ -224,27 +219,20 @@ class CertificateAuthorityTool(BaseTool)
self._lockCertificateAuthority()
try:
new_id = open(self.crl, 'r').read().strip().lower()
- crl = os.path.join(self.certificate_authority_path, 'crl', new_id + '.crl')
+ crl_path = os.path.join(self.certificate_authority_path, 'crl')
+ crl = os.path.join(crl_path, new_id + '.crl')
cert = os.path.join(self.certificate_authority_path, 'certs', serial + '.crt')
if not os.path.exists(cert):
raise ValueError('Certificate with serial %r does not exists' % serial)
try:
- crl_update = subprocess.Popen([self.openssl_binary, 'ca', '-config',
- self.openssl_config, '-revoke', cert], stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- result = crl_update.communicate()[0]
- if crl_update.returncode is None or crl_update.returncode != 0:
- LOG('CertificateAuthorityTool', ERROR, 'Issue during CRL update, result was:%r' % result)
- crl_update.kill()
- raise CertificateGenerationError
- crl_gen = subprocess.Popen([self.openssl_binary, 'ca', '-config',
- self.openssl_config, '-gencrl', '-out', crl], stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- result = crl_gen.communicate()[0]
- if crl_gen.returncode is None or crl_gen.returncode != 0:
- LOG('CertificateAuthorityTool', ERROR, 'Issue during CRL generation, result was:%r' % result)
- crl_gen.kill()
- raise CertificateGenerationError
+ popenCommunicate([self.openssl_binary, 'ca', '-config',
+ self.openssl_config, '-revoke', cert])
+ popenCommunicate([self.openssl_binary, 'ca', '-config',
+ self.openssl_config, '-gencrl', '-out', crl])
+ hash = popenCommunicate([self.openssl_binary, 'crl', '-noout',
+ '-hash', '-in', crl]).strip()
+ previous_id = int(len([q for q in os.listdir(crl_path) if hash in q]))
+ os.symlink(crl, os.path.join(crl_path, '%s.%s' % (hash, previous_id)))
return dict(crl=open(crl).read())
except:
try:
More information about the Erp5-report
mailing list