[Erp5-report] r32089 jm - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/...

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Jan 29 14:58:56 CET 2010


Author: jm
Date: Fri Jan 29 14:58:55 2010
New Revision: 32089

URL: http://svn.erp5.org?rev=32089&view=rev
Log:
CMFActivity: automatically reselect messages in 'processing=1' state

When a node is looking for activities to process, there's no point testing the
'processing' column. This fixes cases leaving activities at 'processing=1'
forever (unless, of course, a cluster is killed and restarted with fewer nodes).
Also remove now useless cleanup at startup.

One known case leaving such activities is when the first commit of
dequeueMessage raises, which can happen, for example, during migration of
portal_types.

It seems the 'processing' column becomes useless for CMFActivity.

Acked-by: Vincent Pelletier

Removed:
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
    erp5/trunk/products/CMFActivity/ActivityTool.py
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
    erp5/trunk/products/CMFActivity/tests/testCMFActivity.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=32089&r1=32088&r2=32089&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Fri Jan 29 14:58:55 2010
@@ -163,8 +163,10 @@
         This number is guaranted not to be exceeded.
         If None (or not given) no limit apply.
     """
-    result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
-    if len(result) == 0:
+    result = not group_method_id and \
+      activity_tool.SQLDict_selectReservedMessageList(
+        processing_node=processing_node, count=limit)
+    if not result:
       activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
       result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
     return result
@@ -284,6 +286,8 @@
             path_and_method_id_dict = {}
             unreserve_uid_list = []
             for line in result:
+              if line.uid == uid:
+                continue
               # All fetched lines have the same group_method_id and
               # processing_node.
               # Their dates are lower-than or equal-to now_date.
@@ -468,6 +472,8 @@
           LOG('SQLDict', PANIC,
               'abort failed, thus some objects may be modified accidentally')
           raise
+        # XXX Is it still useful to free messages now that this node is able
+        #     to reselect them ?
         to_free_uid_list = [x[0] for x in message_uid_priority_list]
         try:
           makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=32089&r1=32088&r2=32089&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Fri Jan 29 14:58:55 2010
@@ -332,6 +332,8 @@
           # It is possible that the message is executed but the commit
           # of the transaction fails
           value[1].setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
+          # XXX Is it still useful to free message now that this node is able
+          #     to reselect it ?
           try:
             makeMessageListAvailable([value[0]])
           except:

Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=32089&r1=32088&r2=32089&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] Fri Jan 29 14:58:55 2010
@@ -93,7 +93,6 @@
 tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
 timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
 is_running_lock = threading.Lock()
-first_run = True
 currentNode = None
 ROLE_IDLE = 0
 ROLE_PROCESSING = 1
@@ -918,7 +917,7 @@
         Starts again an activity
         processing_node starts from 1 (there is not node 0)
       """
-      global active_threads, first_run
+      global active_threads
 
       # return if the number of threads is too high
       # else, increase the number of active_threads and continue
@@ -936,15 +935,6 @@
         self.initialize()
 
       inner_self = aq_inner(self)
-
-      # If this is the first tic after zope is started, reset the processing
-      # flag for activities of this node
-      if first_run:
-        inner_self.SQLDict_clearProcessingFlag(
-                                processing_node=processing_node)
-        inner_self.SQLQueue_clearProcessingFlag(
-                                processing_node=processing_node)
-        first_run = False
 
       try:
         #Sort activity list by priority

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql?rev=32088&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql (removed)
@@ -1,17 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node</params>
-UPDATE
-  message
-SET
-  processing=0,
-  processing_node=0
-WHERE
-  processing_node=<dtml-sqlvar processing_node type="int">

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql?rev=32089&r1=32088&r2=32089&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql [utf8] Fri Jan 29 14:58:55 2010
@@ -15,7 +15,6 @@
   message
 WHERE
   processing_node = <dtml-sqlvar processing_node type="int">
-  AND processing = 0
 <dtml-if expr="count is not None">
   LIMIT <dtml-sqlvar count type="int">
 </dtml-if>

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql?rev=32088&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql (removed)
@@ -1,17 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node</params>
-UPDATE
-  message_queue
-SET
-  processing=0,
-  processing_node=0
-WHERE
-  processing_node=<dtml-sqlvar processing_node type="int">

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql?rev=32089&r1=32088&r2=32089&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] Fri Jan 29 14:58:55 2010
@@ -15,7 +15,6 @@
   message_queue
 WHERE
   processing_node = <dtml-sqlvar processing_node type="int">
-  AND processing = 0
 <dtml-if expr="count is not None">
   LIMIT <dtml-sqlvar count type="int">
 </dtml-if>

Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=32089&r1=32088&r2=32089&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Jan 29 14:58:55 2010
@@ -3515,6 +3515,33 @@
       activity_tool.SQLDict_delMessage(uid=[message.uid for message in result])
       get_transaction().commit()
 
+  def test_116_RaiseInCommitBeforeMessageExecution(self):
+    """
+      Test behaviour of CMFActivity when the commit just before message
+      execution fails. In particular, CMFActivity should restart the
+      activities it selected (processing=1) instead of ignoring them forever.
+    """
+    processed = []
+    activity_tool = self.portal.portal_activities
+    activity_tool.__class__.doSomething = processed.append
+    try:
+      for activity in 'SQLDict', 'SQLQueue':
+        activity_tool.activate(activity=activity).doSomething(activity)
+        get_transaction().commit()
+        activity_tool.distribute()
+        # Make first commit in dequeueMessage raise
+        registerFailingTransactionManager()
+        self.assertRaises(CommitFailed, activity_tool.tic)
+        # Normally, the request stops here and Zope aborts the transaction
+        get_transaction().abort()
+        self.assertEqual(processed, [])
+        # Activity is already in 'processing=1' state. Check tic reselects it.
+        activity_tool.tic()
+        self.assertEqual(processed, [activity])
+        del processed[:]
+    finally:
+      del activity_tool.__class__.doSomething
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestCMFActivity))




More information about the Erp5-report mailing list