[Erp5-report] r24988 - in /erp5/trunk/products/CMFActivity: ./ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Dec 26 16:43:31 CET 2008
Author: vincent
Date: Fri Dec 26 16:43:31 2008
New Revision: 24988
URL: http://svn.erp5.org?rev=24988&view=rev
Log:
Fix process_shutdown and test it.
Add missing "global" declarations.
Simplify: there is no need for both a global variable and a lock do just carry a boolean with atomic access.
Add the possibility to put activity tool back in a working state after process_shutdown has been called.
Modified:
erp5/trunk/products/CMFActivity/ActivityTool.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=24988&r1=24987&r2=24988&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] Fri Dec 26 16:43:31 2008
@@ -432,12 +432,17 @@
return '<%s at 0x%x to %r>' % (self.__class__.__name__, id(self),
self.__dict__['__passive_self'])
-# Set to False when shutting down. Access outside of process_shutdown must
-# be done under the protection of is_running_lock lock.
-is_running = True
# True when activities cannot be executing any more.
has_processed_shutdown = False
+def cancelProcessShutdown():
+ """
+ This method reverts the effect of calling "process_shutdown" on activity
+ tool.
+ """
+ global has_processed_shutdown
+ is_running_lock.release()
+ has_processed_shutdown = False
class ActivityTool (Folder, UniqueObject):
"""
@@ -785,13 +790,12 @@
Prevent shutdown from happening while an activity queue is
processing a batch.
"""
- is_running = False
+ global has_processed_shutdown
if phase == 3 and not has_processed_shutdown:
has_processed_shutdown = True
LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
is_running_lock.acquire()
LOG('CMFActivity', INFO, "Shutdown: Activities finished.")
- is_running_lock.release()
def process_timer(self, tick, interval, prev="", next=""):
"""
@@ -907,13 +911,13 @@
while has_awake_activity:
has_awake_activity = 0
for activity in activity_list:
- is_running_lock.acquire()
- try:
- if is_running:
+ acquired = is_running_lock.acquire(0)
+ if acquired:
+ try:
activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
- finally:
- is_running_lock.release()
+ finally:
+ is_running_lock.release()
finally:
# decrease the number of active_threads
tic_lock.acquire()
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=24988&r1=24987&r2=24988&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Dec 26 16:43:31 2008
@@ -44,6 +44,7 @@
from Products.CMFActivity.ActivityTool import Message
import random
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment
+import threading
try:
from transaction import get as get_transaction
@@ -3074,6 +3075,124 @@
delattr(Organisation, 'firstTest')
delattr(Organisation, 'secondTest')
+ def test_115_checkProcessShutdown(self, quiet=0, run=run_all_test):
+ if not run: return
+ if not quiet:
+ message = '\nCheck that no activity is executed after process_shutdown has been called'
+ ZopeTestCase._print(message)
+ LOG('Testing... ',0,message)
+ # Thread execution plan for this test:
+ # main ActivityThread ProcessShutdownThread
+ # start ActivityThread None None
+ # wait for rendez_vous_lock (run) None
+ # wait for rendez_vous_lock release rendez_vous_lock None
+ # start ProcessShutdownThread wait for activity_lock None
+ # release activity_lock wait for activity_lock internal wait
+ # wait for activity_thread (finish) internal wait
+ # wait for process_shutdown_thread None (finish)
+ #
+ # This test only checks that:
+ # - activity tool can exit between 2 processable activity batches
+ # - activity tool won't process activities after process_shutdown was called
+ # - process_shutdown returns before Activity.tic()
+ # This is not perect though, since it would require to have access to
+ # the waiting queue of CMFActivity's internal lock (is_running_lock) to
+ # make sure that it's what is preventing process_shutdown from returning.
+ portal = self.getPortalObject()
+ activity_tool = self.getActivityTool()
+ organisation = portal.organisation_module.newContent(portal_type='Organisation')
+ get_transaction().commit()
+ self.tic()
+ activity_lock = threading.Lock()
+ activity_lock.acquire()
+ rendez_vous_lock = threading.Lock()
+ rendez_vous_lock.acquire()
+ def waitingActivity(context):
+ # Inform test that we arrived at rendez-vous.
+ rendez_vous_lock.release()
+ # When this lock is available, it means test has called process_shutdown.
+ activity_lock.acquire()
+ activity_lock.release()
+ from Products.CMFActivity.Activity.Queue import Queue
+ original_queue_tic = Queue.tic
+ queue_tic_test_dict = {}
+ def Queue_tic(self, activity_tool, processing_node):
+ result = original_queue_tic(self, activity_tool, processing_node)
+ queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
+ # This is a one-shot method, revert after execution
+ Queue.tic = original_queue_tic
+ return result
+ Queue.tic = Queue_tic
+ Organisation.waitingActivity = waitingActivity
+ try:
+ # Use SQLDict with no gorup method so that both activities won't be
+ # executed in the same batch, letting activity tool a chance to check
+ # if execution should stop processing activities.
+ organisation.activate(activity='SQLDict', priority=1).waitingActivity()
+ organisation.activate(activity='SQLDict', priority=2).getTitle()
+ get_transaction().commit()
+ self.assertEqual(len(activity_tool.getMessageList()), 2)
+ activity_tool.distribute()
+ get_transaction().commit()
+
+ # Start a tic in another thread, so they can meet at rendez-vous.
+ class ActivityThread(threading.Thread):
+ def run(self):
+ # Call changeskin, since skin selection depend on thread id, and we
+ # are in a new thread.
+ activity_tool.changeSkin(None)
+ activity_tool.tic()
+ activity_thread = ActivityThread()
+ # Do not try to outlive main thread.
+ activity_thread.setDaemon(True)
+ # Call process_shutdown in yet another thread because it will wait for
+ # running activity to complete before returning, and we need to unlock
+ # activity *after* calling process_shutdown to make sure the next
+ # activity won't be executed.
+ class ProcessShutdownThread(threading.Thread):
+ def run(self):
+ activity_tool.process_shutdown(3, 0)
+ process_shutdown_thread = ProcessShutdownThread()
+ # Do not try to outlive main thread.
+ process_shutdown_thread.setDaemon(True)
+
+ activity_thread.start()
+ # Wait at rendez-vous for activity to arrive.
+ arrived = False
+ while (not arrived) and activity_thread.isAlive():
+ arrived = rendez_vous_lock.acquire(1)
+ if not arrived:
+ raise Exception, 'Something wrong happened in activity thread.'
+ # Initiate shutdown
+ process_shutdown_thread.start()
+ try:
+ # Let waiting activity finish and wait for thread exit
+ activity_lock.release()
+ activity_thread.join()
+ process_shutdown_thread.join()
+ # Check that there is still one activity pending
+ message_list = activity_tool.getMessageList()
+ self.assertEqual(len(message_list), 1)
+ self.assertEqual(message_list[0].method_id, 'getTitle')
+ # Check that process_shutdown_thread was still runing when Queue_tic returned.
+ self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict))
+ # Call tic in foreground. This must not lead to activity execution.
+ activity_tool.tic()
+ self.assertEqual(len(activity_tool.getMessageList()), 1)
+ finally:
+ # Put activity tool back in a working state
+ from Products.CMFActivity.ActivityTool import cancelProcessShutdown
+ try:
+ cancelProcessShutdown()
+ except:
+ # If something failed in process_shutdown, shutdown lock might not
+ # be taken in CMFActivity, leading to a new esception here hiding
+ # test error.
+ pass
+ finally:
+ delattr(Organisation, 'waitingActivity')
+ Queue.tic = original_queue_tic
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
More information about the Erp5-report
mailing list