[Erp5-report] r46025 arnaud.fontaine - /erp5/trunk/utils/erp5.utils.benchmark/src/erp5/util...
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Aug 31 13:45:14 CEST 2011
Author: arnaud.fontaine
Date: Wed Aug 31 13:45:14 2011
New Revision: 46025
URL: http://svn.erp5.org?rev=46025&view=rev
Log:
multiprocessing does not cope well with terminate() without join() straight
away as it needs to send for example the exit status over a pipe and also the
SIGCHLD was interrupting this pipe.
Moreover, there was a race condition in the previous code leaving some
children left in some cases because some SIGCHLD got lost.
Also, ignore further SIGTERM once one has been received to avoid receiving a
SIGTERM before putting the result to the queue and interrupting
multiprocessing magic.
Modified:
erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py
erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py
erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py
Modified: erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py
URL: http://svn.erp5.org/erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py?rev=46025&r1=46024&r2=46025&view=diff
==============================================================================
--- erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py [utf8] (original)
+++ erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/performance_tester.py [utf8] Wed Aug 31 13:45:14 2011
@@ -50,8 +50,6 @@ class PerformanceTester(object):
else:
self._argument_namespace = namespace
- self._process_terminated_counter = 0
-
@staticmethod
def _add_parser_arguments(parser):
# Optional arguments
@@ -195,9 +193,6 @@ class PerformanceTester(object):
ERP5BenchmarkResult.closeResultDocument(self._argument_namespace.erp5_publish_url,
error_message_set)
- def _child_terminated_handler(self, *args, **kwargs):
- self._process_terminated_counter += 1
-
def _run_constant(self, nb_users):
process_list = []
exit_msg_queue = multiprocessing.Queue(nb_users)
@@ -211,32 +206,24 @@ class PerformanceTester(object):
process_list.append(process)
- signal.signal(signal.SIGCHLD, self._child_terminated_handler)
-
for process in process_list:
process.start()
error_message_set = set()
- interrupted_counter = 0
- while self._process_terminated_counter != len(process_list):
+ process_terminated_counter = 0
+
+ # Ensure that SIGTERM signal (sent by terminate()) is not sent twice
+ do_exit = False
+
+ while process_terminated_counter != len(process_list):
try:
error_message = exit_msg_queue.get()
except KeyboardInterrupt, e:
- if interrupted_counter == 0:
- print >>sys.stderr, "\nInterrupted by user, stopping gracefully " \
- "unless interrupted %d times" % MAXIMUM_KEYBOARD_INTERRUPT
-
- interrupted_counter += 1
+ print >>sys.stderr, "\nInterrupted by user, stopping gracefully..."
+ do_exit = True
- for process in process_list:
- if (process.is_alive() and
- (not getattr(process, '_stopping', False) or
- interrupted_counter == MAXIMUM_KEYBOARD_INTERRUPT)):
- process._stopping = True
- process.terminate()
-
- # An IOError may be raised when receiving a SIGCHLD which interrupts the
+ # An IOError may be raised when receiving a SIGINT which interrupts the
# blocking system call above and the system call should not be restarted
# (using siginterrupt), otherwise the process will stall forever as its
# child has already exited
@@ -247,15 +234,18 @@ class PerformanceTester(object):
else:
if error_message is not None:
error_message_set.add(error_message)
+ do_exit = True
- # In case of error, kill the other children because they are likely
- # failing as well (especially because a process only exits after
- # encountering 10 errors)
- if interrupted_counter == 0:
- for process in process_list:
- if process.is_alive() and not getattr(process, '_stopping', False):
- process._stopping = True
- process.terminate()
+ process_terminated_counter += 1
+
+ # In case of error or SIGINT, kill the other children because they are
+ # likely failing as well (especially because a process only exits after
+ # encountering 10 errors)
+ if do_exit:
+ for process in process_list:
+ if process.is_alive():
+ process.terminate()
+ process.join()
if error_message_set:
return (error_message_set, 1)
Modified: erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py
URL: http://svn.erp5.org/erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py?rev=46025&r1=46024&r2=46025&view=diff
==============================================================================
--- erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py [utf8] (original)
+++ erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/process.py [utf8] Wed Aug 31 13:45:14 2011
@@ -56,6 +56,7 @@ class BenchmarkProcess(multiprocessing.P
super(BenchmarkProcess, self).__init__(*args, **kwargs)
def stopGracefully(self, *args, **kwargs):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
raise StopIteration("Interrupted by user or because of an error from "
"another process, flushing remaining results...")
Modified: erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py
URL: http://svn.erp5.org/erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py?rev=46025&r1=46024&r2=46025&view=diff
==============================================================================
--- erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py [utf8] (original)
+++ erp5/trunk/utils/erp5.utils.benchmark/src/erp5/utils/benchmark/result.py [utf8] Wed Aug 31 13:45:14 2011
@@ -31,6 +31,7 @@ import math
import os
import csv
import logging
+import signal
class BenchmarkResultStatistic(object):
def __init__(self, suite, label):
@@ -163,6 +164,7 @@ class BenchmarkResult(object):
@abc.abstractmethod
def __exit__(self, exc_type, exc_value, traceback):
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.flush(partial=False)
return True
More information about the Erp5-report
mailing list