[Erp5-report] r41845 jm - in /erp5/trunk: products/ERP5/bin/ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Dec 28 22:08:06 CET 2010


Author: jm
Date: Tue Dec 28 22:08:06 2010
New Revision: 41845

URL: http://svn.erp5.org?rev=41845&view=rev
Log:
Define test suites for ERP5 project in /tests

Added:
    erp5/trunk/products/ERP5/bin/run_test_suite   (with props)
    erp5/trunk/tests/
    erp5/trunk/tests/__init__.py

Added: erp5/trunk/products/ERP5/bin/run_test_suite
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/bin/run_test_suite?rev=41845&view=auto
==============================================================================
--- erp5/trunk/products/ERP5/bin/run_test_suite (added)
+++ erp5/trunk/products/ERP5/bin/run_test_suite [utf8] Tue Dec 28 22:08:06 2010
@@ -0,0 +1,392 @@
+#!/usr/bin/python
+
+import atexit, errno, imp, os, pprint, random, re, socket, shlex, shutil
+import signal, string, subprocess, sys, threading, time, urlparse, xmlrpclib
+
+SVN_UP_REV=re.compile(r'^(?:At|Updated to) revision (\d+).$')
+SVN_CHANGED_REV=re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE)
+
+def killallIfParentDies():
+  os.setsid()
+  atexit.register(lambda: os.kill(0, 9))
+  from ctypes import cdll
+  libc = cdll.LoadLibrary('libc.so.6')
+  def PR_SET_PDEATHSIG(sig):
+    if libc.prctl(1, sig):
+      raise OSError
+  PR_SET_PDEATHSIG(signal.SIGINT)
+
+_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
+_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
+def format_command(*args, **kw):
+  cmdline = []
+  for k, v in sorted(kw.items()):
+    if _format_command_search(v):
+      v = _format_command_escape(v)
+    cmdline.append('%s=%s' % (k, v))
+  for v in args:
+    if _format_command_search(v):
+      v = _format_command_escape(v)
+    cmdline.append(v)
+  return ' '.join(cmdline)
+
+def subprocess_capture(p):
+  def readerthread(input, output, buffer):
+    while True:
+      data = input.readline()
+      if not data:
+        break
+      output.write(data)
+      buffer.append(data)
+  if p.stdout:
+    stdout = []
+    stdout_thread = threading.Thread(target=readerthread,
+                                     args=(p.stdout, sys.stdout, stdout))
+    stdout_thread.setDaemon(True)
+    stdout_thread.start()
+  if p.stderr:
+    stderr = []
+    stderr_thread = threading.Thread(target=readerthread,
+                                     args=(p.stderr, sys.stderr, stderr))
+    stderr_thread.setDaemon(True)
+    stderr_thread.start()
+  if p.stdout:
+    stdout_thread.join()
+  if p.stderr:
+    stderr_thread.join()
+  p.wait()
+  return (p.stdout and ''.join(stdout),
+          p.stderr and ''.join(stderr))
+
+
+class SubprocessError(EnvironmentError):
+  def __init__(self, status_dict):
+    self.status_dict = status_dict
+  def __getattr__(self, name):
+    return self.status_dict[name]
+  def __str__(self):
+    return 'Error %i' % self.status_code
+
+
+class Updater(object):
+
+  realtime_output = True
+  stdin = file(os.devnull)
+
+  def __init__(self, revision=None):
+    self.revision = revision
+    self._path_list = []
+
+  def deletePycFiles(self, path):
+    """Delete *.pyc files so that deleted/moved files can not be imported"""
+    for path, dir_list, file_list in os.walk(path):
+      for file in file_list:
+        if file[-4:] in ('.pyc', '.pyo'):
+          # allow several processes clean the same folder at the same time
+          try:
+            os.remove(os.path.join(path, file))
+          except OSError, e:
+            if e.errno != errno.ENOENT:
+              raise
+
+  def spawn(self, *args, **kw):
+    env = kw and dict(os.environ, **kw) or None
+    command = format_command(*args, **kw)
+    print '\n$ ' + command
+    sys.stdout.flush()
+    p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
+                         stderr=subprocess.PIPE, env=env)
+    if self.realtime_output:
+      stdout, stderr = subprocess_capture(p)
+    else:
+      stdout, stderr = p.communicate()
+      sys.stdout.write(stdout)
+      sys.stderr.write(stderr)
+    result = dict(status_code=p.returncode, command=command,
+                  stdout=stdout, stderr=stderr)
+    if p.returncode:
+      raise SubprocessError(result)
+    return result
+
+  def _git(self, *args, **kw):
+    return self.spawn('git', *args, **kw)['stdout'].strip()
+
+  def getRevision(self):
+    if os.path.isdir('.git'):
+      h = self._git('log', '-1', '--format=%H', *self._path_list)
+      return str(int(self._git('svn', 'find-rev', h)))
+    if os.path.isdir('.svn'):
+      stdout = self.spawn('svn', 'info', *self._path_list)['stdout']
+      return str(max(map(int, SVN_CHANGED_REV.findall(stdout))))
+    raise NotImplementedError
+
+  def checkout(self, *path_list):
+    revision = self.revision
+    if os.path.isdir('.git'):
+      # edit .git/info/sparse-checkout if you want sparse checkout
+      if revision:
+        h = self._git('svn', 'find-rev', 'r%s' % revision)
+        assert h
+        if h != self._git('rev-parse', 'HEAD'):
+          self.deletePycFiles('.')
+          self._git('reset', '--merge', h)
+      else:
+        self.deletePycFiles('.')
+        self._git('svn', 'rebase')
+        self.revision = str(int(self._git('svn', 'find-rev', 'HEAD')))
+    elif os.path.isdir('.svn'):
+      # following code allows sparse checkout
+      def svn_mkdirs(path):
+        path = os.path.dirname(path)
+        if path and not os.path.isdir(path):
+          svn_mkdirs(path)
+          self.spawn(*(args + ['--depth=empty', path]))
+      for path in path_list or ('.',):
+        args = ['svn', 'up', '--force', '--non-interactive']
+        if revision:
+          args.append('-r%s' % revision)
+        svn_mkdirs(path)
+        args += '--depth=infinity', path
+        self.deletePycFiles(path)
+        try:
+          status_dict = self.spawn(*args)
+        except SubprocessError, e:
+          if 'cleanup' not in e.stderr:
+            raise
+          self.spawn('svn', 'cleanup', path)
+          status_dict = self.spawn(*args)
+        if not revision:
+          self.revision = revision = SVN_UP_REV.findall(
+            status_dict['stdout'].splitlines()[-1])[0]
+    else:
+      raise NotImplementedError
+    self._path_list += path_list
+
+
+class TestSuite(Updater):
+
+  mysql_db_count = 1
+  allow_restart = False
+
+  def __init__(self, max_instance_count, **kw):
+    self.__dict__.update(kw)
+    self._path_list = ['tests']
+    pool = threading.Semaphore(max_instance_count)
+    self.acquire = pool.acquire
+    self.release = pool.release
+    self._instance = threading.local()
+    self._pool = max_instance_count == 1 and [None] or \
+                 range(1, max_instance_count + 1)
+    self._ready = set()
+    self.running = {}
+    if max_instance_count != 1:
+      self.realtime_output = False
+    elif os.isatty(1):
+      self.realtime_output = True
+
+  instance = property(lambda self: self._instance.id)
+
+  def start(self, test, on_stop=None):
+    assert test not in self.running
+    self.running[test] = instance = self._pool.pop(0)
+    def run():
+      self._instance.id = instance
+      if instance not in self._ready:
+        self._ready.add(instance)
+        self.setup()
+      status_dict = self.run(test)
+      if on_stop is not None:
+        on_stop(status_dict)
+      self._pool.append(self.running.pop(test))
+      self.release()
+    threading.Thread(target=run).start()
+
+  def update(self):
+    self.checkout() # by default, update everything
+
+  def setup(self):
+    pass
+
+  def run(self, test):
+    raise NotImplementedError
+
+  def getTestList(self):
+    raise NotImplementedError
+
+
+class ERP5TypeTestSuite(TestSuite):
+
+  RUN_RE = re.compile(
+    r'Ran (?P<all_tests>\d+) tests? in (?P<seconds>\d+\.\d+)s',
+    re.DOTALL)
+
+  STATUS_RE = re.compile(r"""
+    (OK|FAILED)\s+\(
+      (failures=(?P<failures>\d+),?\s*)?
+      (errors=(?P<errors>\d+),?\s*)?
+      (skipped=(?P<skips>\d+),?\s*)?
+      (expected\s+failures=(?P<expected_failures>\d+),?\s*)?
+      (unexpected\s+successes=(?P<unexpected_successes>\d+),?\s*)?
+    \)
+    """, re.DOTALL | re.VERBOSE)
+
+  def setup(self):
+    instance_home = self.instance and 'unit_test.%u' % self.instance \
+                                   or 'unit_test'
+    tests = os.path.join(instance_home, 'tests')
+    if os.path.exists(tests):
+      shutil.rmtree(instance_home + '.previous', True)
+      shutil.move(tests, instance_home + '.previous')
+
+  def run(self, test):
+    return self.runUnitTest(test)
+
+  def runUnitTest(self, *args, **kw):
+    if self.instance:
+      args = ('--instance_home=unit_test.%u' % self.instance,) + args
+    mysql_db_list = [string.Template(x).substitute(I=self.instance or '')
+                      for x in self.mysql_db_list]
+    if len(mysql_db_list) > 1:
+      kw['extra_sql_connection_string_list'] = ','.join(mysql_db_list[1:])
+    try:
+      runUnitTest = os.environ.get('RUN_UNIT_TEST',
+                                   'Products/ERP5Type/tests/runUnitTest.py')
+      args = tuple(shlex.split(runUnitTest)) \
+           + ('--verbose', '--erp5_sql_connection_string=' + mysql_db_list[0]) \
+           + args
+      status_dict = self.spawn(*args, **kw)
+    except SubprocessError, e:
+      status_dict = e.status_dict
+    test_log = status_dict['stderr']
+    search = self.RUN_RE.search(test_log)
+    if search:
+      groupdict = search.groupdict()
+      status_dict.update(duration=float(groupdict['seconds']),
+                         test_count=int(groupdict['all_tests']))
+    search = self.STATUS_RE.search(test_log)
+    if search:
+      groupdict = search.groupdict()
+      status_dict.update(error_count=int(groupdict['errors'] or 0),
+                         failure_count=int(groupdict['failures'] or 0),
+                         skip_count=int(groupdict['skips'] or 0)
+                                   +int(groupdict['expected_failures'] or 0)
+                                   +int(groupdict['unexpected_successes'] or 0))
+    return status_dict
+
+
+#class LoadSaveExample(ERP5TypeTestSuite):
+#  def getTestList(self):
+#    return [test_path.split(os.sep)[-1][:-3]
+#            for test_path in glob.glob('tests/test*.py')]
+#
+#  def setup(self):
+#    TestSuite.setup(self)
+#    return self.runUnitTest(self, '--save', 'testFoo')
+#
+#  def run(self, test):
+#    return self.runUnitTest(self, '--load', test)
+
+
+sys.modules['test_suite'] = module = imp.new_module('test_suite')
+for var in TestSuite, ERP5TypeTestSuite:
+  setattr(module, var.__name__, var)
+
+
+def safeRpcCall(function, *args):
+  retry = 64
+  while True:
+    try:
+      return function(*args)
+    except (socket.error, xmlrpclib.ProtocolError), e:
+      print >>sys.stderr, e
+      pprint.pprint(args, file(function._Method__name, 'w'))
+      time.sleep(retry)
+      retry += retry >> 1
+
+def getOptionParser():
+  from optparse import OptionParser
+  parser = OptionParser(usage="%prog [options] <SUITE>[=<MAX_INSTANCES>]")
+  _ = parser.add_option
+  _("--master", help="URL of ERP5 instance, used as master node")
+  _("--mysql_db_list", help="comma-separated list of connection strings")
+  return parser
+
+def main():
+  os.environ['LC_ALL'] = 'C'
+
+  parser = getOptionParser()
+  options, args = parser.parse_args()
+  try:
+    name, = args
+    if '=' in name:
+      name, max_instance_count = name.split('=')
+      max_instance_count = int(max_instance_count)
+    else:
+      max_instance_count = 1
+  except ValueError:
+    parser.error("invalid arguments")
+  db_list = options.mysql_db_list
+  if db_list:
+    db_list = db_list.split(',')
+    multi = max_instance_count != 1
+    try:
+      for db in db_list:
+        if db == string.Template(db).substitute(I=1) and multi:
+          raise KeyError
+    except KeyError:
+      parser.error("invalid value for --mysql_db_list")
+  else:
+    db_list = (max_instance_count == 1 and 'test test' or 'test$I test'),
+
+  def makeSuite(revision=None):
+    updater = Updater(revision)
+    updater.checkout('tests')
+    tests = imp.load_module('tests', *imp.find_module('tests', ['.']))
+    try:
+      suite_class = getattr(tests, name)
+    except AttributeError:
+      parser.error("unknown test suite")
+    if len(db_list) < suite_class.mysql_db_count:
+      parser.error("%r suite needs %u DB (only %u given)" %
+        (name, suite_class.mysql_db_count, len(db_list)))
+    suite = suite_class(revision=updater.revision,
+                        max_instance_count=max_instance_count,
+                        mysql_db_list=db_list[:suite_class.mysql_db_count])
+    suite.update()
+    return suite
+
+  portal_url = options.master
+  if portal_url[-1] != '/':
+    portal_url += '/'
+  portal = xmlrpclib.ServerProxy(portal_url, allow_none=1)
+  master = portal.portal_task_distribution
+  assert master.getProtocolRevision() == 1
+
+  suite = makeSuite()
+  revision = suite.getRevision()
+  test_result = safeRpcCall(master.createTestResult,
+    name, revision, suite.getTestList(), suite.allow_restart)
+  if test_result:
+    test_result_path, test_revision = test_result
+    url_parts = list(urlparse.urlparse(portal_url + test_result_path))
+    url_parts[1] = url_parts[1].split('@')[-1]
+    print 'ERP5_TEST_URL %s OK' % urlparse.urlunparse(url_parts) # for buildbot
+    while suite.acquire():
+      test = safeRpcCall(master.startUnitTest, test_result_path,
+                         suite.running.keys())
+      if test:
+        if revision != test_revision:
+          suite = makeSuite(test_revision)
+          revision = test_revision
+          suite.acquire()
+        suite.start(test[1], lambda status_dict, __test_path=test[0]:
+          safeRpcCall(master.stopUnitTest, __test_path, status_dict))
+      elif not suite.running:
+        break
+      # We are finishing the suite. Let's disable idle nodes.
+
+
+if __name__ == '__main__':
+  if not os.isatty(0):
+    killallIfParentDies()
+  sys.exit(main())

Propchange: erp5/trunk/products/ERP5/bin/run_test_suite
------------------------------------------------------------------------------
    svn:executable = *

Added: erp5/trunk/tests/__init__.py
URL: http://svn.erp5.org/erp5/trunk/tests/__init__.py?rev=41845&view=auto
==============================================================================
--- erp5/trunk/tests/__init__.py (added)
+++ erp5/trunk/tests/__init__.py [utf8] Tue Dec 28 22:08:06 2010
@@ -0,0 +1,84 @@
+import glob, os, subprocess
+# test_suite is provided by 'run_test_suite'
+from test_suite import ERP5TypeTestSuite
+
+class _ERP5(ERP5TypeTestSuite):
+  realtime_output = False
+  enabled_product_list = ('CMFActivity', 'CMFCategory', 'ERP5', 'ERP5Catalog',
+                          'ERP5eGovSecurity', 'ERP5Form', 'ERP5Legacy',
+                          'ERP5OOo', 'ERP5PropertySheetLegacy', 'ERP5Security',
+                          'ERP5Subversion', 'ERP5SyncML', 'ERP5Type',
+                          'ERP5Wizard', 'Formulator', 'HBTreeFolder2',
+                          'MailTemplates', 'PortalTransforms', 'TimerService',
+                          'ZLDAPConnection', 'ZLDAPMethods', 'ZMySQLDA',
+                          'ZMySQLDDA', 'ZSQLCatalog')
+
+  def enableProducts(self):
+    product_set = set(self.enabled_product_list)
+    try:
+      dir_set = set(os.walk('Products').next()[1])
+      for product in dir_set - product_set:
+        os.unlink(os.path.join('Products', product))
+      product_set -= dir_set
+    except StopIteration:
+      os.mkdir('Products')
+    for product in product_set:
+      os.symlink(os.path.join('..', 'products', product),
+                 os.path.join('Products', product))
+
+  def update(self, working_copy_list=None):
+    self.checkout('products', 'bt5')
+    self.enableProducts()
+
+
+class PERF(_ERP5):
+  allow_restart = True
+
+  def getTestList(self):
+    return ('testPerformance',) * 3
+
+  def update(self):
+    self.checkout('products', 'bt5/erp5_base', 'bt5/erp5_ui_test')
+    self.enableProducts()
+
+class ERP5(_ERP5):
+  mysql_db_count = 3
+
+  def getTestList(self):
+    test_list = []
+    for test_path in glob.glob('Products/*/tests/test*.py') + \
+                     glob.glob('bt5/*/TestTemplateItem/test*.py'):
+      test_case = test_path.split(os.sep)[-1][:-3] # remove .py
+      product = test_path.split(os.sep)[-3]
+      # don't test 3rd party products
+      if product in ('PortalTransforms', 'MailTemplates'):
+        continue
+      # skip some tests
+      if test_case.startswith('testLive') or test_case.startswith('testVifib') \
+         or test_case in ('testPerformance', 'testSimulationPerformance'):
+        continue
+      test_list.append(test_case)
+    return test_list
+
+  def run(self, test):
+    if test in ('testConflictResolution', 'testInvalidationBug'):
+      status_dict = self.runUnitTest('--save', test)
+      if not status_dict['status_code']:
+        status_dict = self.runUnitTest('--load', '--activity_node=2', test)
+      return status_dict
+    return super(ERP5, self).run(test)
+
+class ERP5_simulation(_ERP5):
+
+  def getTestList(self):
+    p = subprocess.Popen(('grep', '-lr', '--include=test*.py',
+                          '-e', '@newSimulationExpectedFailure',
+                          '-e', 'erp5_report_new_simulation_failures',
+                          'Products/ERP5/tests'),
+                         stdout=subprocess.PIPE)
+    return sorted(os.path.basename(x)[:-3]
+                  for x in p.communicate()[0].splitlines())
+
+  def runUnitTest(self, *args, **kw):
+    return super(ERP5_simulation, self).runUnitTest(
+      erp5_report_new_simulation_failures='1', *args, **kw)



More information about the Erp5-report mailing list