[Erp5-report] r46025 arnaud.fontaine - /erp5/trunk/utils/erp5.utils.benchmark/src/erp5/util...

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Aug 31 13:45:14 CEST 2011


Author: arnaud.fontaine
Date: Wed Aug 31 13:45:14 2011
New Revision: 46025

URL: http://svn.erp5.org?rev=46025&view=rev
Log:
multiprocessing does  not cope well  with terminate() without  join() straight
away as it needs to send for example  the exit status over a pipe and also the
SIGCHLD was interrupting this pipe.

Moreover,  there  was a  race  condition in  the  previous  code leaving  some
children left in some cases because some SIGCHLD got lost.

Also, ignore further  SIGTERM once one has been received  to avoid receiving a
SIGTERM   before  putting   the   result  to   the   queue  and   interrupting
multiprocessing magic.

Modified:
    erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py
    erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py
    erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py

Modified: erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py
URL: http://svn.erp5.org/erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py?rev=46025&r1=46024&r2=46025&view=diff
==============================================================================
--- erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py [utf8] (original)
+++ erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py [utf8] Wed Aug 31 13:45:14 2011
@@ -50,8 +50,6 @@ class PerformanceTester(object):
     else:
       self._argument_namespace = namespace
 
-    self._process_terminated_counter = 0
-
   @staticmethod
   def _add_parser_arguments(parser):
     # Optional arguments
@@ -195,9 +193,6 @@ class PerformanceTester(object):
     ERP5BenchmarkResult.closeResultDocument(self._argument_namespace.erp5_publish_url,
                                             error_message_set)
 
-  def _child_terminated_handler(self, *args, **kwargs):
-    self._process_terminated_counter += 1
-
   def _run_constant(self, nb_users):
     process_list = []
     exit_msg_queue = multiprocessing.Queue(nb_users)
@@ -211,32 +206,24 @@ class PerformanceTester(object):
 
       process_list.append(process)
 
-    signal.signal(signal.SIGCHLD, self._child_terminated_handler)
-
     for process in process_list:
       process.start()
 
     error_message_set = set()
-    interrupted_counter = 0
-    while self._process_terminated_counter != len(process_list):
+    process_terminated_counter = 0
+
+    # Ensure that SIGTERM signal (sent by terminate()) is not sent twice
+    do_exit = False
+
+    while process_terminated_counter != len(process_list):
       try:
         error_message = exit_msg_queue.get()
 
       except KeyboardInterrupt, e:
-        if interrupted_counter == 0:
-          print >>sys.stderr, "\nInterrupted by user, stopping gracefully " \
-              "unless interrupted %d times" % MAXIMUM_KEYBOARD_INTERRUPT
-
-        interrupted_counter += 1
+        print >>sys.stderr, "\nInterrupted by user, stopping gracefully..."
+        do_exit = True
 
-        for process in process_list:
-          if (process.is_alive() and
-              (not getattr(process, '_stopping', False) or
-               interrupted_counter == MAXIMUM_KEYBOARD_INTERRUPT)):
-            process._stopping = True
-            process.terminate()
-
-      # An IOError may be raised when receiving a SIGCHLD which interrupts the
+      # An IOError may be raised  when receiving a SIGINT which interrupts the
       # blocking system call above and the system call should not be restarted
       # (using siginterrupt), otherwise the  process will stall forever as its
       # child has already exited
@@ -247,15 +234,18 @@ class PerformanceTester(object):
       else:
         if error_message is not None:
           error_message_set.add(error_message)
+          do_exit = True
 
-          # In case of error, kill  the other children because they are likely
-          # failing  as well (especially  because a  process only  exits after
-          # encountering 10 errors)
-          if interrupted_counter == 0:
-            for process in process_list:
-              if process.is_alive() and not getattr(process, '_stopping', False):
-                process._stopping = True
-                process.terminate()
+        process_terminated_counter += 1
+
+      # In case of  error or SIGINT, kill the other  children because they are
+      # likely failing as well (especially  because a process only exits after
+      # encountering 10 errors)
+      if do_exit:
+        for process in process_list:
+          if process.is_alive():
+            process.terminate()
+            process.join()
 
     if error_message_set:
       return (error_message_set, 1)

Modified: erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py
URL: http://svn.erp5.org/erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py?rev=46025&r1=46024&r2=46025&view=diff
==============================================================================
--- erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py [utf8] (original)
+++ erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py [utf8] Wed Aug 31 13:45:14 2011
@@ -56,6 +56,7 @@ class BenchmarkProcess(multiprocessing.P
     super(BenchmarkProcess, self).__init__(*args, **kwargs)
 
   def stopGracefully(self, *args, **kwargs):
+    signal.signal(signal.SIGTERM, signal.SIG_IGN)
     raise StopIteration("Interrupted by user or because of an error from "
                         "another process, flushing remaining results...")
 

Modified: erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py
URL: http://svn.erp5.org/erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py?rev=46025&r1=46024&r2=46025&view=diff
==============================================================================
--- erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py [utf8] (original)
+++ erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py [utf8] Wed Aug 31 13:45:14 2011
@@ -31,6 +31,7 @@ import math
 import os
 import csv
 import logging
+import signal
 
 class BenchmarkResultStatistic(object):
   def __init__(self, suite, label):
@@ -163,6 +164,7 @@ class BenchmarkResult(object):
 
   @abc.abstractmethod
   def __exit__(self, exc_type, exc_value, traceback):
+    signal.signal(signal.SIGTERM, signal.SIG_IGN)
     self.flush(partial=False)
     return True
 



More information about the Erp5-report mailing list