[Erp5-report] r42381 luke - /erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Jan 17 15:45:01 CET 2011


Author: luke
Date: Mon Jan 17 15:45:01 2011
New Revision: 42381

URL: http://svn.erp5.org?rev=42381&view=rev
Log:
 - simplify usage of subprocess
 - kill only when returncode is none
 - to not play with hiding exceptions during runtime, so
   CertificateGenerationError, as simple ValueError is acceptable for
   simplicity
 - during revoking generate hashed links to the newest CRL

Modified:
    erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py

Modified: erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py?rev=42381&r1=42380&r2=42381&view=diff
==============================================================================
--- erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py [utf8] (original)
+++ erp5/trunk/products/Vifib/Tool/CertificateAuthorityTool.py [utf8] Mon Jan 17 15:45:01 2011
@@ -32,15 +32,22 @@ from Products.ERP5Type.Globals import In
 from Products.ERP5Type.Tool.BaseTool import BaseTool
 from Products.ERP5Type import Permissions
 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
-from zLOG import LOG, INFO, ERROR
+from zLOG import LOG, INFO
 
 import os
 import subprocess
 import base64
 
-class CertificateGenerationError(Exception):
-  """Exception raised when certificate authority failed to work"""
-  pass
+def popenCommunicate(command_list, input=None, **kwargs):
+  kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+  popen = subprocess.Popen(command_list, **kwargs)
+  result = popen.communicate(input)[0]
+  if popen.returncode is None:
+    popen.kill()
+  if popen.returncode != 0:
+    raise ValueError('Issue during calling %r, result was: %r' % (command_list,
+      result))
+  return result
 
 class CertificateAuthorityBusy(Exception):
   """Exception raised when certificate authority is busy"""
@@ -183,23 +190,11 @@ class CertificateAuthorityTool(BaseTool)
       csr = os.path.join(self.certificate_authority_path, new_id + '.csr')
       cert = os.path.join(self.certificate_authority_path, 'certs', new_id + '.crt')
       try:
-        keygen = subprocess.Popen([self.openssl_binary, 'req', '-nodes', '-config',
+        popenCommunicate([self.openssl_binary, 'req', '-nodes', '-config',
           self.openssl_config, '-new', '-keyout', key, '-out', csr, '-days',
-          '3650'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
-          stdin=subprocess.PIPE)
-        result = keygen.communicate('%s\n' % cn)[0]
-        if keygen.returncode is None or keygen.returncode != 0:
-          LOG('CertificateAuthorityTool', ERROR, 'Issue during key generation, result was:%r' % result)
-          keygen.kill()
-          raise CertificateGenerationError
-        keysign = subprocess.Popen([self.openssl_binary, 'ca', '-batch', '-config',
-          self.openssl_config, '-out', cert, '-infiles', csr], stdout=subprocess.PIPE,
-          stderr=subprocess.STDOUT)
-        result = keysign.communicate()[0]
-        if keysign.returncode is None or keysign.returncode != 0:
-          LOG('CertificateAuthorityTool', ERROR, 'Issue during key signing, result was:%r' % result)
-          keygen.kill()
-          raise CertificateGenerationError
+          '3650'], '%s\n' % cn, stdin=subprocess.PIPE)
+        popenCommunicate([self.openssl_binary, 'ca', '-batch', '-config',
+          self.openssl_config, '-out', cert, '-infiles', csr])
         os.unlink(csr)
         return dict(
           key=open(key).read(),
@@ -224,27 +219,20 @@ class CertificateAuthorityTool(BaseTool)
     self._lockCertificateAuthority()
     try:
       new_id = open(self.crl, 'r').read().strip().lower()
-      crl = os.path.join(self.certificate_authority_path, 'crl', new_id + '.crl')
+      crl_path = os.path.join(self.certificate_authority_path, 'crl')
+      crl = os.path.join(crl_path, new_id + '.crl')
       cert = os.path.join(self.certificate_authority_path, 'certs', serial + '.crt')
       if not os.path.exists(cert):
         raise ValueError('Certificate with serial %r does not exists' % serial)
       try:
-        crl_update = subprocess.Popen([self.openssl_binary, 'ca', '-config',
-          self.openssl_config, '-revoke', cert], stdout=subprocess.PIPE,
-          stderr=subprocess.STDOUT)
-        result = crl_update.communicate()[0]
-        if crl_update.returncode is None or crl_update.returncode != 0:
-          LOG('CertificateAuthorityTool', ERROR, 'Issue during CRL update, result was:%r' % result)
-          crl_update.kill()
-          raise CertificateGenerationError
-        crl_gen = subprocess.Popen([self.openssl_binary, 'ca', '-config',
-          self.openssl_config, '-gencrl', '-out', crl], stdout=subprocess.PIPE,
-          stderr=subprocess.STDOUT)
-        result = crl_gen.communicate()[0]
-        if crl_gen.returncode is None or crl_gen.returncode != 0:
-          LOG('CertificateAuthorityTool', ERROR, 'Issue during CRL generation, result was:%r' % result)
-          crl_gen.kill()
-          raise CertificateGenerationError
+        popenCommunicate([self.openssl_binary, 'ca', '-config',
+          self.openssl_config, '-revoke', cert])
+        popenCommunicate([self.openssl_binary, 'ca', '-config',
+          self.openssl_config, '-gencrl', '-out', crl])
+        hash = popenCommunicate([self.openssl_binary, 'crl', '-noout',
+          '-hash', '-in', crl]).strip()
+        previous_id = int(len([q for q in os.listdir(crl_path) if hash in q]))
+        os.symlink(crl, os.path.join(crl_path, '%s.%s' % (hash, previous_id)))
         return dict(crl=open(crl).read())
       except:
         try:



More information about the Erp5-report mailing list