[Erp5-report] r24988 - in /erp5/trunk/products/CMFActivity: ./ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Dec 26 16:43:31 CET 2008


Author: vincent
Date: Fri Dec 26 16:43:31 2008
New Revision: 24988

URL: http://svn.erp5.org?rev=24988&view=rev
Log:
Fix process_shutdown and test it.
  Add missing "global" declarations.
  Simplify: there is no need for both a global variable and a lock do just carry a boolean with atomic access.
  Add the possibility to put activity tool back in a working state after process_shutdown has been called.

Modified:
    erp5/trunk/products/CMFActivity/ActivityTool.py
    erp5/trunk/products/CMFActivity/tests/testCMFActivity.py

Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=24988&r1=24987&r2=24988&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] Fri Dec 26 16:43:31 2008
@@ -432,12 +432,17 @@
     return '<%s at 0x%x to %r>' % (self.__class__.__name__, id(self),
                                    self.__dict__['__passive_self'])
 
-# Set to False when shutting down. Access outside of process_shutdown must
-# be done under the protection of is_running_lock lock.
-is_running = True
 # True when activities cannot be executing any more.
 has_processed_shutdown = False
 
+def cancelProcessShutdown():
+  """
+    This method reverts the effect of calling "process_shutdown" on activity
+    tool.
+  """
+  global has_processed_shutdown
+  is_running_lock.release()
+  has_processed_shutdown = False
 
 class ActivityTool (Folder, UniqueObject):
     """
@@ -785,13 +790,12 @@
           Prevent shutdown from happening while an activity queue is
           processing a batch.
         """
-        is_running = False
+        global has_processed_shutdown
         if phase == 3 and not has_processed_shutdown:
           has_processed_shutdown = True
           LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
           is_running_lock.acquire()
           LOG('CMFActivity', INFO, "Shutdown: Activities finished.")
-          is_running_lock.release()
 
     def process_timer(self, tick, interval, prev="", next=""):
         """
@@ -907,13 +911,13 @@
         while has_awake_activity:
           has_awake_activity = 0
           for activity in activity_list:
-            is_running_lock.acquire()
-            try:
-              if is_running:
+            acquired = is_running_lock.acquire(0)
+            if acquired:
+              try:
                 activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
                 has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
-            finally:
-              is_running_lock.release()
+              finally:
+                is_running_lock.release()
       finally:
         # decrease the number of active_threads
         tic_lock.acquire()

Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=24988&r1=24987&r2=24988&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Dec 26 16:43:31 2008
@@ -44,6 +44,7 @@
 from Products.CMFActivity.ActivityTool import Message
 import random
 from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment
+import threading
 
 try:
   from transaction import get as get_transaction
@@ -3074,6 +3075,124 @@
       delattr(Organisation, 'firstTest')
       delattr(Organisation, 'secondTest')
 
+  def test_115_checkProcessShutdown(self, quiet=0, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = '\nCheck that no activity is executed after process_shutdown has been called'
+      ZopeTestCase._print(message)
+      LOG('Testing... ',0,message)
+    # Thread execution plan for this test:
+    # main                             ActivityThread           ProcessShutdownThread
+    # start ActivityThread             None                     None
+    # wait for rendez_vous_lock        (run)                    None
+    # wait for rendez_vous_lock        release rendez_vous_lock None
+    # start ProcessShutdownThread      wait for activity_lock   None
+    # release activity_lock            wait for activity_lock   internal wait
+    # wait for activity_thread         (finish)                 internal wait
+    # wait for process_shutdown_thread None                     (finish)
+    # 
+    # This test only checks that:
+    # - activity tool can exit between 2 processable activity batches
+    # - activity tool won't process activities after process_shutdown was called
+    # - process_shutdown returns before Activity.tic()
+    #   This is not perect though, since it would require to have access to
+    #   the waiting queue of CMFActivity's internal lock (is_running_lock) to
+    #   make sure that it's what is preventing process_shutdown from returning.
+    portal = self.getPortalObject()
+    activity_tool = self.getActivityTool()
+    organisation = portal.organisation_module.newContent(portal_type='Organisation')
+    get_transaction().commit()
+    self.tic()
+    activity_lock = threading.Lock()
+    activity_lock.acquire()
+    rendez_vous_lock = threading.Lock()
+    rendez_vous_lock.acquire()
+    def waitingActivity(context):
+      # Inform test that we arrived at rendez-vous.
+      rendez_vous_lock.release()
+      # When this lock is available, it means test has called process_shutdown.
+      activity_lock.acquire()
+      activity_lock.release()
+    from Products.CMFActivity.Activity.Queue import Queue
+    original_queue_tic = Queue.tic
+    queue_tic_test_dict = {}
+    def Queue_tic(self, activity_tool, processing_node):
+      result = original_queue_tic(self, activity_tool, processing_node)
+      queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
+      # This is a one-shot method, revert after execution
+      Queue.tic = original_queue_tic
+      return result
+    Queue.tic = Queue_tic
+    Organisation.waitingActivity = waitingActivity
+    try:
+      # Use SQLDict with no gorup method so that both activities won't be
+      # executed in the same batch, letting activity tool a chance to check
+      # if execution should stop processing activities.
+      organisation.activate(activity='SQLDict', priority=1).waitingActivity()
+      organisation.activate(activity='SQLDict', priority=2).getTitle()
+      get_transaction().commit()
+      self.assertEqual(len(activity_tool.getMessageList()), 2)
+      activity_tool.distribute()
+      get_transaction().commit()
+
+      # Start a tic in another thread, so they can meet at rendez-vous.
+      class ActivityThread(threading.Thread):
+        def run(self):
+          # Call changeskin, since skin selection depend on thread id, and we
+          # are in a new thread.
+          activity_tool.changeSkin(None)
+          activity_tool.tic()
+      activity_thread = ActivityThread()
+      # Do not try to outlive main thread.
+      activity_thread.setDaemon(True)
+      # Call process_shutdown in yet another thread because it will wait for
+      # running activity to complete before returning, and we need to unlock
+      # activity *after* calling process_shutdown to make sure the next
+      # activity won't be executed.
+      class ProcessShutdownThread(threading.Thread):
+        def run(self):
+          activity_tool.process_shutdown(3, 0)
+      process_shutdown_thread = ProcessShutdownThread()
+      # Do not try to outlive main thread.
+      process_shutdown_thread.setDaemon(True)
+
+      activity_thread.start()
+      # Wait at rendez-vous for activity to arrive.
+      arrived = False
+      while (not arrived) and activity_thread.isAlive():
+        arrived = rendez_vous_lock.acquire(1)
+      if not arrived:
+        raise Exception, 'Something wrong happened in activity thread.'
+      # Initiate shutdown
+      process_shutdown_thread.start()
+      try:
+        # Let waiting activity finish and wait for thread exit
+        activity_lock.release()
+        activity_thread.join()
+        process_shutdown_thread.join()
+        # Check that there is still one activity pending
+        message_list = activity_tool.getMessageList()
+        self.assertEqual(len(message_list), 1)
+        self.assertEqual(message_list[0].method_id, 'getTitle')
+        # Check that process_shutdown_thread was still runing when Queue_tic returned.
+        self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict))
+        # Call tic in foreground. This must not lead to activity execution.
+        activity_tool.tic()
+        self.assertEqual(len(activity_tool.getMessageList()), 1)
+      finally:
+        # Put activity tool back in a working state
+        from Products.CMFActivity.ActivityTool import cancelProcessShutdown
+        try:
+          cancelProcessShutdown()
+        except:
+          # If something failed in process_shutdown, shutdown lock might not
+          # be taken in CMFActivity, leading to a new esception here hiding
+          # test error.
+          pass
+    finally:
+      delattr(Organisation, 'waitingActivity')
+      Queue.tic = original_queue_tic
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestCMFActivity))




More information about the Erp5-report mailing list