[Erp5-report] r37609 rafael - in /erp5/tags/version-5.4.6: ./ bt5/ products/ products/CMFAc...

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Aug 6 18:36:16 CEST 2010


Author: rafael
Date: Fri Aug  6 18:36:16 2010
New Revision: 37609

URL: http://svn.erp5.org?rev=37609&view=rev
Log:

Release version 5.4.6. 

Tags correspont to revision 37390 and includes few changes on CMFActivity, ERP5Catalog and ERP5Type for improve unindex/uncatalog performance. 

Added:
    erp5/tags/version-5.4.6/
    erp5/tags/version-5.4.6/bt5/   (props changed)
      - copied from r37390, erp5/trunk/bt5/
    erp5/tags/version-5.4.6/products/
      - copied from r37390, erp5/trunk/products/
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
Removed:
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
Modified:
    erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT
    erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py
    erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py
    erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py
    erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
    erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py
    erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py
    erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py
    erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py

Propchange: erp5/tags/version-5.4.6/bt5/
------------------------------------------------------------------------------
--- svn:externals (added)
+++ svn:externals Fri Aug  6 18:36:16 2010
@@ -0,0 +1,12 @@
+#
+# Used for maintenance of external resources in this svn bundle.  Edit
+# this file as appropriate and then run the following command from within
+# the checkout directory where this file lives on your local machine:
+#
+# svn propset svn:externals -F ./EXTERNALS.TXT .
+#
+
+erp5_accounting_l10n_fr_m9 -r 335 svn://svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
+# developers should use
+# svn+ssh://(developername)@svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
+

Modified: erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT [utf8] (original)
+++ erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT [utf8] Fri Aug  6 18:36:16 2010
@@ -6,7 +6,7 @@
 # svn propset svn:externals -F ./EXTERNALS.TXT .
 #
 
-erp5_accounting_l10n_fr_m9 svn://svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
+erp5_accounting_l10n_fr_m9 -r 335 svn://svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
 # developers should use
 # svn+ssh://(developername)@svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
 

Modified: erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py [utf8] Fri Aug  6 18:36:16 2010
@@ -27,14 +27,20 @@
 ##############################################################################
 
 import sys
+import transaction
 from zLOG import LOG, TRACE, INFO, WARNING, ERROR
 from ZODB.POSException import ConflictError
 from Products.CMFActivity.ActivityTool import (
   MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
 from Products.CMFActivity.ActiveObject import (
   INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
+from Products.CMFActivity.ActivityRuntimeEnvironment import (
+  ActivityRuntimeEnvironment, getTransactionalVariable)
 from Queue import VALIDATION_ERROR_DELAY
 
+# Stop electing more messages for processing if more than this number of
+# objects are impacted by elected messages.
+MAX_GROUPED_OBJECTS = 100
 
 def sort_message_key(message):
   # same sort key as in SQL{Dict,Queue}_readMessageList
@@ -129,6 +135,256 @@ class SQLBase:
     LOG(self.__class__.__name__, severity, summary,
         error=severity>INFO and sys.exc_info() or None)
 
+  def getReservedMessageList(self, activity_tool, date, processing_node,
+                             limit=None, group_method_id=None):
+    """
+      Get and reserve a list of messages.
+      limit
+        Maximum number of messages to fetch.
+        This number is not garanted to be reached, because of:
+         - not enough messages being pending execution
+         - race condition (other nodes reserving the same messages at the same
+           time)
+        This number is guaranted not to be exceeded.
+        If None (or not given) no limit apply.
+    """
+    select = activity_tool.SQLBase_selectReservedMessageList
+    result = not group_method_id and select(table=self.sql_table, count=limit,
+                                            processing_node=processing_node)
+    if not result:
+      activity_tool.SQLBase_reserveMessageList(table=self.sql_table,
+        count=limit, processing_node=processing_node, to_date=date,
+        group_method_id=group_method_id)
+      result = select(table=self.sql_table,
+                      processing_node=processing_node, count=limit)
+    return result
+
+  def makeMessageListAvailable(self, activity_tool, uid_list):
+    """
+      Put messages back in processing_node=0 .
+    """
+    if len(uid_list):
+      activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
+                                                     uid=uid_list)
+
+  def getProcessableMessageList(self, activity_tool, processing_node):
+    """
+      Always true:
+        For each reserved message, delete redundant messages when it gets
+        reserved (definitely lost, but they are expandable since redundant).
+
+      - reserve a message
+      - set reserved message to processing=1 state
+      - if this message has a group_method_id:
+        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+          - get one message from the reserved bunch (this messages will be
+            "needed")
+          - increase the number of impacted object
+        - set "needed" reserved messages to processing=1 state
+        - unreserve "unneeded" messages
+      - return still-reserved message list and a group_method_id
+
+      If any error happens in above described process, try to unreserve all
+      messages already reserved in that process.
+      If it fails, complain loudly that some messages might still be in an
+      unclean state.
+
+      Returned values:
+        4-tuple:
+          - list of messages
+          - impacted object count
+          - group_method_id
+          - uid_to_duplicate_uid_list_dict
+    """
+    def getReservedMessageList(limit, group_method_id=None):
+      line_list = self.getReservedMessageList(activity_tool=activity_tool,
+                                              date=now_date,
+                                              processing_node=processing_node,
+                                              limit=limit,
+                                              group_method_id=group_method_id)
+      if len(line_list):
+        self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
+      return line_list
+    def getDuplicateMessageUidList(line):
+      uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
+        line=line, processing_node=processing_node)
+      if len(uid_list):
+        self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
+      return uid_list
+    BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
+    now_date = self.getNow(activity_tool)
+    message_list = []
+    count = 0
+    group_method_id = None
+    try:
+      result = getReservedMessageList(limit=1)
+      uid_to_duplicate_uid_list_dict = {}
+      if len(result) > 0:
+        line = result[0]
+        uid = line.uid
+        m = self.loadMessage(line.message, uid=uid, line=line)
+        message_list.append(m)
+        group_method_id = line.group_method_id
+        activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
+        uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
+          .extend(getDuplicateMessageUidList(line))
+        if group_method_id not in (None, '', '\0'):
+          # Count the number of objects to prevent too many objects.
+          count += len(m.getObjectList(activity_tool))
+          if count < MAX_GROUPED_OBJECTS:
+            # Retrieve objects which have the same group method.
+            result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT,
+                                            group_method_id=group_method_id)
+            path_and_method_id_dict = {}
+            unreserve_uid_list = []
+            for line in result:
+              if line.uid == uid:
+                continue
+              # All fetched lines have the same group_method_id and
+              # processing_node.
+              # Their dates are lower-than or equal-to now_date.
+              # We read each line once so lines have distinct uids.
+              # So what remains to be filtered on are path, method_id and
+              # order_validation_text.
+              try:
+                key = line.path, line.method_id, line.order_validation_text
+              except AttributeError:
+                pass # message_queue does not have 'order_validation_text'
+              else:
+                original_uid = path_and_method_id_dict.get(key)
+                if original_uid is not None:
+                  uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
+                  .append(line.uid)
+                  continue
+                path_and_method_id_dict[key] = line.uid
+                uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
+                .extend(getDuplicateMessageUidList(line))
+              if count < MAX_GROUPED_OBJECTS:
+                m = self.loadMessage(line.message, uid=line.uid, line=line)
+                count += len(m.getObjectList(activity_tool))
+                message_list.append(m)
+              else:
+                unreserve_uid_list.append(line.uid)
+            activity_tool.SQLBase_processMessage(table=self.sql_table,
+              uid=[m.uid for m in message_list])
+            # Unreserve extra messages as soon as possible.
+            self.makeMessageListAvailable(activity_tool=activity_tool,
+                                          uid_list=unreserve_uid_list)
+      return (message_list, count, group_method_id,
+              uid_to_duplicate_uid_list_dict)
+    except:
+      self._log(WARNING, 'Exception while reserving messages.')
+      if len(message_list):
+        to_free_uid_list = [m.uid for m in message_list]
+        try:
+          self.makeMessageListAvailable(activity_tool=activity_tool,
+                                        uid_list=to_free_uid_list)
+        except:
+          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
+        else:
+          if len(to_free_uid_list):
+            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
+      else:
+        self._log(TRACE, '(no message was reserved)')
+      return [], 0, None, {}
+
+  # Queue semantic
+  def dequeueMessage(self, activity_tool, processing_node):
+    def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
+      final_uid_list = []
+      for uid in uid_list:
+        final_uid_list.append(uid)
+        final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+      self.makeMessageListAvailable(activity_tool=activity_tool,
+                                    uid_list=final_uid_list)
+    message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
+      self.getProcessableMessageList(activity_tool, processing_node)
+    if message_list:
+      # Remove group_id parameter from group_method_id
+      if group_method_id is not None:
+        group_method_id = group_method_id.split('\0')[0]
+      if group_method_id not in (None, ""):
+        method  = activity_tool.invokeGroup
+        args = (group_method_id, message_list, self.__class__.__name__)
+        activity_runtime_environment = ActivityRuntimeEnvironment(None)
+      else:
+        method = activity_tool.invoke
+        message = message_list[0]
+        args = (message, )
+        activity_runtime_environment = ActivityRuntimeEnvironment(message)
+      # Commit right before executing messages.
+      # As MySQL transaction does not start exactly at the same time as ZODB
+      # transactions but a bit later, messages available might be called
+      # on objects which are not available - or available in an old
+      # version - to ZODB connector.
+      # So all connectors must be committed now that we have selected
+      # everything needed from MySQL to get a fresh view of ZODB objects.
+      transaction.commit()
+      tv = getTransactionalVariable(None)
+      tv['activity_runtime_environment'] = activity_runtime_environment
+      # Try to invoke
+      try:
+        method(*args)
+      except:
+        self._log(WARNING,
+          'Exception raised when invoking messages (uid, path, method_id) %r'
+          % [(m.uid, m.object_path, m.method_id) for m in message_list])
+        try:
+          transaction.abort()
+        except:
+          # Unfortunately, database adapters may raise an exception against
+          # abort.
+          self._log(PANIC,
+              'abort failed, thus some objects may be modified accidentally')
+          raise
+        # XXX Is it still useful to free messages now that this node is able
+        #     to reselect them ?
+        to_free_uid_list = [x.uid for x in message_list]
+        try:
+          makeMessageListAvailable(to_free_uid_list,
+                                   uid_to_duplicate_uid_list_dict)
+        except:
+          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
+        else:
+          self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
+      # Abort if something failed.
+      if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
+        endTransaction = transaction.abort
+      else:
+        endTransaction = transaction.commit
+      try:
+        endTransaction()
+      except:
+        self._log(WARNING,
+          'Failed to end transaction for messages (uid, path, method_id) %r'
+          % [(m.uid, m.object_path, m.method_id) for m in message_list])
+        if endTransaction == transaction.abort:
+          self._log(PANIC, 'Failed to abort executed messages.'
+            ' Some objects may be modified accidentally.')
+        else:
+          try:
+            transaction.abort()
+          except:
+            self._log(PANIC, 'Failed to abort executed messages which also'
+              ' failed to commit. Some objects may be modified accidentally.')
+            raise
+        exc_info = sys.exc_info()
+        for m in message_list:
+          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
+        try:
+          makeMessageListAvailable([x.uid for x in message_list],
+                                   uid_to_duplicate_uid_list_dict)
+        except:
+          self._log(ERROR, 'Failed to free remaining messages: %r'
+                           % (message_list, ))
+        else:
+          self._log(TRACE, 'Freed messages %r' % (message_list, ))
+      self.finalizeMessageExecution(activity_tool, message_list,
+                                    uid_to_duplicate_uid_list_dict)
+    transaction.commit()
+    return not message_list
+
   def finalizeMessageExecution(self, activity_tool, message_list,
                                uid_to_duplicate_uid_list_dict=None):
     """

Modified: erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py [utf8] Fri Aug  6 18:36:16 2010
@@ -29,16 +29,10 @@
 from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
 from Queue import VALID, INVALID_PATH
 from RAMDict import RAMDict
-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 from Products.CMFActivity.Errors import ActivityFlushError
-from ZODB.POSException import ConflictError
 import sys
-from types import ClassType
 #from time import time
 from SQLBase import SQLBase, sort_message_key
-from Products.CMFActivity.ActivityRuntimeEnvironment import (
-  ActivityRuntimeEnvironment, getTransactionalVariable)
-from zExceptions import ExceptionFormatter
 
 import transaction
 
@@ -48,9 +42,6 @@ from zLOG import LOG, TRACE, WARNING, ER
 MAX_VALIDATED_LIMIT = 1000 
 # Read up to this number of messages to validate.
 READ_MESSAGE_LIMIT = 1000
-# Stop electing more messages for processing if more than this number of
-# objects are impacted by elected messages.
-MAX_GROUPED_OBJECTS = 100
 
 MAX_MESSAGE_LIST_SIZE = 100
 
@@ -131,33 +122,6 @@ class SQLDict(RAMDict, SQLBase):
     message_list = activity_buffer.getMessageList(self)
     return [m for m in message_list if m.is_registered]
 
-  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
-    """
-      Get and reserve a list of messages.
-      limit
-        Maximum number of messages to fetch.
-        This number is not garanted to be reached, because of:
-         - not enough messages being pending execution
-         - race condition (other nodes reserving the same messages at the same
-           time)
-        This number is guaranted not to be exceeded.
-        If None (or not given) no limit apply.
-    """
-    result = not group_method_id and \
-      activity_tool.SQLDict_selectReservedMessageList(
-        processing_node=processing_node, count=limit)
-    if not result:
-      activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
-      result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
-    return result
-
-  def makeMessageListAvailable(self, activity_tool, uid_list):
-    """
-      Put messages back in processing_node=0 .
-    """
-    if len(uid_list):
-      activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
-
   def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
     """
       Reserve unreserved messages matching given line.
@@ -188,202 +152,7 @@ class SQLDict(RAMDict, SQLBase):
       raise
     return uid_list
 
-  def getProcessableMessageList(self, activity_tool, processing_node):
-    """
-      Always true:
-        For each reserved message, delete redundant messages when it gets
-        reserved (definitely lost, but they are expandable since redundant).
-
-      - reserve a message
-      - set reserved message to processing=1 state
-      - if this message has a group_method_id:
-        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
-        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
-          - get one message from the reserved bunch (this messages will be
-            "needed")
-          - increase the number of impacted object
-        - set "needed" reserved messages to processing=1 state
-        - unreserve "unneeded" messages
-      - return still-reserved message list and a group_method_id
-
-      If any error happens in above described process, try to unreserve all
-      messages already reserved in that process.
-      If it fails, complain loudly that some messages might still be in an
-      unclean state.
-
-      Returned values:
-        4-tuple:
-          - list of messages
-          - impacted object count
-          - group_method_id
-          - uid_to_duplicate_uid_list_dict
-    """
-    def getReservedMessageList(limit, group_method_id=None):
-      line_list = self.getReservedMessageList(activity_tool=activity_tool,
-                                              date=now_date,
-                                              processing_node=processing_node,
-                                              limit=limit,
-                                              group_method_id=group_method_id)
-      if len(line_list):
-        LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
-      return line_list
-    def getDuplicateMessageUidList(line):
-      uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool, 
-        line=line, processing_node=processing_node)
-      if len(uid_list):
-        LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
-      return uid_list
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
-    now_date = self.getNow(activity_tool)
-    message_list = []
-    count = 0
-    group_method_id = None
-    try:
-      result = getReservedMessageList(limit=1)
-      uid_to_duplicate_uid_list_dict = {}
-      if len(result) > 0:
-        line = result[0]
-        uid = line.uid
-        m = self.loadMessage(line.message, uid=uid, line=line)
-        message_list.append(m)
-        group_method_id = line.group_method_id
-        activity_tool.SQLDict_processMessage(uid=[uid])
-        uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
-          .extend(getDuplicateMessageUidList(line))
-        if group_method_id not in (None, '', '\0'):
-          # Count the number of objects to prevent too many objects.
-          count += len(m.getObjectList(activity_tool))
-          if count < MAX_GROUPED_OBJECTS:
-            # Retrieve objects which have the same group method.
-            result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
-            path_and_method_id_dict = {}
-            unreserve_uid_list = []
-            for line in result:
-              if line.uid == uid:
-                continue
-              # All fetched lines have the same group_method_id and
-              # processing_node.
-              # Their dates are lower-than or equal-to now_date.
-              # We read each line once so lines have distinct uids.
-              # So what remains to be filtered on are path, method_id and
-              # order_validation_text.
-              key = (line.path, line.method_id, line.order_validation_text)
-              original_uid = path_and_method_id_dict.get(key)
-              if original_uid is not None:
-                uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
-                continue
-              path_and_method_id_dict[key] = line.uid
-              uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
-              if count < MAX_GROUPED_OBJECTS:
-                m = self.loadMessage(line.message, uid=line.uid, line=line)
-                count += len(m.getObjectList(activity_tool))
-                message_list.append(m)
-              else:
-                unreserve_uid_list.append(line.uid)
-            activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
-            # Unreserve extra messages as soon as possible.
-            makeMessageListAvailable(unreserve_uid_list)
-      return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
-    except:
-      LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
-      if len(message_list):
-        to_free_uid_list = [m.uid for m in message_list]
-        try:
-          makeMessageListAvailable(to_free_uid_list)
-        except:
-          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
-        else:
-          if len(to_free_uid_list):
-            LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
-      else:
-        LOG('SQLDict', TRACE, '(no message was reserved)')
-      return [], 0, None, {}
-
-  # Queue semantic
-  def dequeueMessage(self, activity_tool, processing_node):
-    def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
-      final_uid_list = []
-      for uid in uid_list:
-        final_uid_list.append(uid)
-        final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
-    message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
-      self.getProcessableMessageList(activity_tool, processing_node)
-    if message_list:
-      # Remove group_id parameter from group_method_id
-      if group_method_id is not None:
-        group_method_id = group_method_id.split('\0')[0]
-      if group_method_id not in (None, ""):
-        method  = activity_tool.invokeGroup
-        args = (group_method_id, message_list)
-        activity_runtime_environment = ActivityRuntimeEnvironment(None)
-      else:
-        method = activity_tool.invoke
-        message = message_list[0]
-        args = (message, )
-        activity_runtime_environment = ActivityRuntimeEnvironment(message)
-      # Commit right before executing messages.
-      # As MySQL transaction does not start exactly at the same time as ZODB
-      # transactions but a bit later, messages available might be called
-      # on objects which are not available - or available in an old
-      # version - to ZODB connector.
-      # So all connectors must be committed now that we have selected
-      # everything needed from MySQL to get a fresh view of ZODB objects.
-      transaction.commit()
-      tv = getTransactionalVariable(None)
-      tv['activity_runtime_environment'] = activity_runtime_environment
-      # Try to invoke
-      try:
-        method(*args)
-      except:
-        LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
-        try:
-          transaction.abort()
-        except:
-          # Unfortunately, database adapters may raise an exception against abort.
-          LOG('SQLDict', PANIC,
-              'abort failed, thus some objects may be modified accidentally')
-          raise
-        # XXX Is it still useful to free messages now that this node is able
-        #     to reselect them ?
-        to_free_uid_list = [x.uid for x in message_list]
-        try:
-          makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
-        except:
-          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
-        else:
-          LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
-      # Abort if something failed.
-      if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
-        endTransaction = transaction.abort
-      else:
-        endTransaction = transaction.commit
-      try:
-        endTransaction()
-      except:
-        LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
-        if endTransaction == transaction.abort:
-          LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
-        else:
-          try:
-            transaction.abort()
-          except:
-            LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
-            raise
-        exc_info = sys.exc_info()
-        for m in message_list:
-          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
-        try:
-          makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
-        except:
-          LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
-        else:
-          LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
-      self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
-    transaction.commit()
-    return not message_list
+  dequeueMessage = SQLBase.dequeueMessage
 
   def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
     hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
@@ -530,9 +299,10 @@ class SQLDict(RAMDict, SQLBase):
               message_list.sort(key=sort_message_key)
               deletable_uid_list += [m.uid for m in message_list[1:]]
             message = message_list[0]
-            distributable_uid_set.add(message.uid)
             serialization_tag = message.activity_kw.get('serialization_tag')
-            if serialization_tag is not None:
+            if serialization_tag is None:
+              distributable_uid_set.add(message.uid)
+            else:
               serialization_tag_dict.setdefault(serialization_tag,
                                                 []).append(message)
           # Don't let through if there is the same serialization tag in the
@@ -542,15 +312,15 @@ class SQLDict(RAMDict, SQLBase):
           # does not stop validating together. Because those messages should
           # be processed together at once.
           for message_list in serialization_tag_dict.itervalues():
-            if len(message_list) == 1:
-              continue
             # Sort list of messages to validate the message with highest score
             message_list.sort(key=sort_message_key)
+            distributable_uid_set.add(message_list[0].uid)
             group_method_id = message_list[0].activity_kw.get('group_method_id')
+            if group_method_id is None:
+              continue
             for message in message_list[1:]:
-              if group_method_id is None or \
-                 group_method_id != message.activity_kw.get('group_method_id'):
-                distributable_uid_set.remove(message.uid)
+              if group_method_id == message.activity_kw.get('group_method_id'):
+                distributable_uid_set.add(message.uid)
           if deletable_uid_list:
             activity_tool.SQLBase_delMessage(table=self.sql_table,
                                              uid=deletable_uid_list)

Modified: erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py [utf8] Fri Aug  6 18:36:16 2010
@@ -29,15 +29,9 @@
 from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
 from RAMQueue import RAMQueue
 from Queue import VALID, INVALID_PATH
-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 from Products.CMFActivity.Errors import ActivityFlushError
 from ZODB.POSException import ConflictError
-from types import ClassType
-import sys
-from time import time
 from SQLBase import SQLBase, sort_message_key
-from Products.CMFActivity.ActivityRuntimeEnvironment import (
-  ActivityRuntimeEnvironment, getTransactionalVariable)
 from zExceptions import ExceptionFormatter
 
 import transaction
@@ -48,17 +42,6 @@ from zLOG import LOG, WARNING, ERROR, IN
 MAX_VALIDATED_LIMIT = 1000
 # Read this many messages to validate.
 READ_MESSAGE_LIMIT = 1000
-# Process this many messages in each dequeueMessage call.
-# Downside of setting to a "small" value: the cost of reserving a batch of
-# few messages increases relatively to the cost of executing activities,
-# making CMFActivity overhead significant.
-# Downside of setting to a "big" value: if there are many slow activities in
-# a multi-activity-node environment, multiple slow activities will be reserved
-# by a single node, making a suboptimal use of the parallelisation offered by
-# the cluster.
-# Before increasing this value, consider using SQLDict with group methods
-# first.
-MESSAGE_BUNDLE_SIZE = 1
 
 MAX_MESSAGE_LIST_SIZE = 100
 
@@ -83,6 +66,8 @@ class SQLQueue(RAMQueue, SQLBase):
       method_id_list = [m.method_id for m in registered_message_list]
       priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
       date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
+      group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
+                              for message in registered_message_list]
       tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
       serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
       dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
@@ -92,6 +77,7 @@ class SQLQueue(RAMQueue, SQLBase):
                                               method_id_list=method_id_list,
                                               priority_list=priority_list,
                                               message_list=dumped_message_list,
+                                              group_method_id_list = group_method_id_list,
                                               date_list=date_list,
                                               tag_list=tag_list,
                                               processing_node_list=None,
@@ -110,162 +96,14 @@ class SQLQueue(RAMQueue, SQLBase):
     # Nothing to do in SQLQueue.
     pass
 
-  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
+  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
     """
-      Get and reserve a list of messages.
-      limit
-        Maximum number of messages to fetch.
-        This number is not garanted to be reached, because of:
-         - not enough messages being pending execution
-         - race condition (other nodes reserving the same messages at the same
-           time)
-        This number is guaranted not to be exceeded.
-        If None (or not given) no limit apply.
+      Reserve unreserved messages matching given line.
+      Return their uids.
     """
-    result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
-    if len(result) == 0:
-      activity_tool.SQLQueue_reserveMessageList(count=limit, processing_node=processing_node, to_date=date)
-      result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
-    return result
-
-  def makeMessageListAvailable(self, activity_tool, uid_list):
-    """
-      Put messages back in processing_node=0 .
-    """
-    if len(uid_list):
-      activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
-
-  def getProcessableMessageList(self, activity_tool, processing_node):
-    """
-      Always true:
-        For each reserved message, delete redundant messages when it gets
-        reserved (definitely lost, but they are expandable since redundant).
-
-      - reserve a message
-      - set reserved message to processing=1 state
-      - if this message has a group_method_id:
-        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
-        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
-          - get one message from the reserved bunch (this messages will be
-            "needed")
-          - increase the number of impacted object
-        - set "needed" reserved messages to processing=1 state
-        - unreserve "unneeded" messages
-      - return still-reserved message list
-
-      If any error happens in above described process, try to unreserve all
-      messages already reserved in that process.
-      If it fails, complain loudly that some messages might still be in an
-      unclean state.
-
-      Returned values:
-        list of messages
-    """
-    def getReservedMessageList(limit):
-      line_list = self.getReservedMessageList(activity_tool=activity_tool,
-                                              date=now_date,
-                                              processing_node=processing_node,
-                                              limit=limit)
-      if len(line_list):
-        LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
-      return line_list
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    now_date = self.getNow(activity_tool)
-    message_list = []
-    try:
-      result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
-      for line in result:
-        m = self.loadMessage(line.message, uid=line.uid, line=line)
-        message_list.append(m)
-      if len(message_list):
-        activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
-      return message_list
-    except:
-      LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
-      if len(message_list):
-        to_free_uid_list = [m.uid for m in message_list]
-        try:
-          makeMessageListAvailable(to_free_uid_list)
-        except:
-          LOG('SQLQueue', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
-        else:
-          if len(to_free_uid_list):
-            LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
-      else:
-        LOG('SQLQueue', TRACE, '(no message was reserved)')
-      return []
-
-  def dequeueMessage(self, activity_tool, processing_node):
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    message_list = \
-      self.getProcessableMessageList(activity_tool, processing_node)
-    if message_list:
-      processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
-      processed_count = 0
-      # Commit right before executing messages.
-      # As MySQL transaction does not start exactly at the same time as ZODB
-      # transactions but a bit later, messages available might be called
-      # on objects which are not available - or available in an old
-      # version - to ZODB connector.
-      # So all connectors must be committed now that we have selected
-      # everything needed from MySQL to get a fresh view of ZODB objects.
-      transaction.commit()
-      tv = getTransactionalVariable(None)
-      for m in message_list:
-        tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
-        processed_count += 1
-        # Try to invoke
-        try:
-          activity_tool.invoke(m)
-          if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
-            # Commit so that if a message raises it doesn't causes previous
-            # successfull messages to be rolled back. This commit might fail,
-            # so it is protected the same way as activity execution by the
-            # same "try" block.
-            transaction.commit()
-          else:
-            # This message failed, abort.
-            transaction.abort()
-        except:
-          value = m.uid, m.object_path, m.method_id
-          LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
-          try:
-            transaction.abort()
-          except:
-            # Unfortunately, database adapters may raise an exception against abort.
-            LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
-            raise
-          # We must make sure that the message is not set as executed.
-          # It is possible that the message is executed but the commit
-          # of the transaction fails
-          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
-          # XXX Is it still useful to free message now that this node is able
-          #     to reselect it ?
-          try:
-            makeMessageListAvailable([m.uid])
-          except:
-            LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
-          else:
-            LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
-        if time() > processing_stop_time:
-          LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
-          break
-      # Release all unprocessed messages
-      to_free_uid_list = [m.uid for m in message_list[processed_count:]]
-      if to_free_uid_list:
-        try:
-          makeMessageListAvailable(to_free_uid_list)
-        except:
-          LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
-        else:
-          LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
-      self.finalizeMessageExecution(activity_tool,
-                                    message_list[:processed_count])
-    transaction.commit()
-    return not message_list
+    return ()
 
+  dequeueMessage = SQLBase.dequeueMessage
 
   def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
     hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
@@ -416,6 +254,12 @@ class SQLQueue(RAMQueue, SQLBase):
             # Sort list of messages to validate the message with highest score
             message_list.sort(key=sort_message_key)
             distributable_uid_set.add(message_list[0].uid)
+            group_method_id = message_list[0].activity_kw.get('group_method_id')
+            if group_method_id is None:
+              continue
+            for message in message_list[1:]:
+              if group_method_id == message.activity_kw.get('group_method_id'):
+                distributable_uid_set.add(message.uid)
           distributable_count = len(distributable_uid_set)
           if distributable_count:
             activity_tool.SQLBase_assignMessage(table=self.sql_table,

Modified: erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py [utf8] Fri Aug  6 18:36:16 2010
@@ -1172,16 +1172,18 @@ class ActivityTool (Folder, UniqueObject
         for held in my_self.REQUEST._held:
           self.REQUEST._hold(held)
 
-    def invokeGroup(self, method_id, message_list):
+    def invokeGroup(self, method_id, message_list, activity):
       if self.activity_tracking:
-        activity_tracking_logger.info('invoking group messages: method_id=%s, paths=%s' % (method_id, ['/'.join(m.object_path) for m in message_list]))
+        activity_tracking_logger.info(
+          'invoking group messages: method_id=%s, paths=%s'
+          % (method_id, ['/'.join(m.object_path) for m in message_list]))
       # Invoke a group method.
-      object_list = []
       expanded_object_list = []
       new_message_list = []
-      path_dict = {}
-      # Filter the list of messages. If an object is not available, mark its message as non-executable.
-      # In addition, expand an object if necessary, and make sure that no duplication happens.
+      path_set = set()
+      # Filter the list of messages. If an object is not available, mark its
+      # message as non-executable. In addition, expand an object if necessary,
+      # and make sure that no duplication happens.
       for m in message_list:
         # alternate method is used to segregate objects which cannot be grouped.
         alternate_method_id = m.activity_kw.get('alternate_method_id')
@@ -1195,42 +1197,29 @@ class ActivityTool (Folder, UniqueObject
           m.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=self)
           continue
         try:
-          i = len(new_message_list) # This is an index of this message in new_message_list.
-          if m.hasExpandMethod():
-            for subobj in m.getObjectList(self):
+          if activity == 'SQLQueue':
+            expanded_object_list.append((obj, m.args, m.kw))
+          else:
+            if m.hasExpandMethod():
+              subobject_list = m.getObjectList(self)
+            else:
+              subobject_list = (obj,)
+            for subobj in subobject_list:
               path = subobj.getPath()
-              if path not in path_dict:
-                path_dict[path] = i
+              if path not in path_set:
+                path_set.add(path)
                 if alternate_method_id is not None \
                    and hasattr(aq_base(subobj), alternate_method_id):
-                  # if this object is alternated, generate a new single active object.
+                  # if this object is alternated,
+                  # generate a new single active object
                   activity_kw = m.activity_kw.copy()
-                  if 'group_method_id' in activity_kw:
-                    del activity_kw['group_method_id']
-                  if 'group_id' in activity_kw:
-                    del activity_kw['group_id']                    
-                  active_obj = subobj.activate(**activity_kw)
+                  activity_kw.pop('group_method_id', None)
+                  activity_kw.pop('group_id', None)
+                  active_obj = subobj.activate(activity=activity, **activity_kw)
                   getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
                 else:
-                  expanded_object_list.append(subobj)
-          else:
-            path = obj.getPath()
-            if path not in path_dict:
-              path_dict[path] = i
-              if alternate_method_id is not None \
-                  and hasattr(aq_base(obj), alternate_method_id):
-                # if this object is alternated, generate a new single active object.
-                activity_kw = m.activity_kw.copy()
-                if 'group_method_id' in activity_kw:
-                  del activity_kw['group_method_id']
-                if 'group_id' in activity_kw:
-                  del activity_kw['group_id']
-                active_obj = obj.activate(**activity_kw)
-                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
-              else:
-                expanded_object_list.append(obj)
-          object_list.append(obj)
-          new_message_list.append(m)
+                  expanded_object_list.append((subobj, m.args, m.kw))
+          new_message_list.append((m, obj))
         except:
           m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
 
@@ -1238,39 +1227,36 @@ class ActivityTool (Folder, UniqueObject
         if len(expanded_object_list) > 0:
           method = self.unrestrictedTraverse(method_id)
           # FIXME: how to apply security here?
-          # NOTE: expanded_object_list must be set to failed objects by the callee.
-          #       If it fully succeeds, expanded_object_list must be empty when returning.
-          result = method(expanded_object_list, **m.kw)
+          # NOTE: expanded_object_list must be set to failed objects by the
+          #       callee. If it fully succeeds, expanded_object_list must be
+          #       empty when returning.
+          result = method(expanded_object_list)
         else:
           result = None
       except:
         # In this case, the group method completely failed.
         exc_info = sys.exc_info()
-        for m in new_message_list:
-          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
+        for m, obj in new_message_list:
+          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
         LOG('WARNING ActivityTool', 0,
             'Could not call method %s on objects %s' %
-            (method_id, expanded_object_list), error=exc_info)
+            (method_id, [x[0] for x in expanded_object_list]), error=exc_info)
         error_log = getattr(self, 'error_log', None)
         if error_log is not None:
           error_log.raising(exc_info)
       else:
-        # Obtain all indices of failed messages. Note that this can be a partial failure.
-        failed_message_dict = {}
-        for obj in expanded_object_list:
-          path = obj.getPath()
-          i = path_dict[path]
-          failed_message_dict[i] = None
-
+        # Obtain all indices of failed messages.
+        # Note that this can be a partial failure.
+        failed_message_set = set(id(x[2]) for x in expanded_object_list)
         # Only for succeeded messages, an activity process is invoked (if any).
-        for i in xrange(len(object_list)):
-          object = object_list[i]
-          m = new_message_list[i]
-          if i in failed_message_dict:
+        for m, obj in new_message_list:
+          # We use id of kw dict (persistent object) to know if there is a
+          # failed 3-tuple corresponding to Message m.
+          if id(m.kw) in failed_message_set:
             m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
           else:
             try:
-              m.activateResult(self, result, object)
+              m.activateResult(self, result, obj)
             except:
               m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
             else:

Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql [utf8] Fri Aug  6 18:36:16 2010
@@ -0,0 +1,20 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+uid</params>
+UPDATE
+  <dtml-var table>
+SET
+  processing_node=0,
+  processing=0
+WHERE
+  <dtml-sqltest uid type="int" multiple>
+<dtml-var sql_delimiter>
+COMMIT

Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql [utf8] Fri Aug  6 18:36:16 2010
@@ -0,0 +1,20 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:1
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+uid</params>
+UPDATE
+  <dtml-var table>
+SET
+  processing_date = UTC_TIMESTAMP(),
+  processing = 1
+WHERE
+  <dtml-sqltest uid type="int" multiple>
+<dtml-var sql_delimiter>
+COMMIT

Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql [utf8] Fri Aug  6 18:36:16 2010
@@ -0,0 +1,39 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+processing_node
+to_date
+count
+group_method_id
+</params>
+UPDATE
+  <dtml-var table>
+SET
+  processing_node=<dtml-sqlvar processing_node type="int">
+WHERE
+  processing_node=0
+  AND date <= <dtml-sqlvar to_date type="datetime">
+  <dtml-if expr="group_method_id is not None">
+    AND group_method_id = <dtml-sqlvar group_method_id type="string">
+  </dtml-if>
+ORDER BY
+<dtml-comment>
+  Explanation of the order by:
+  - priority must be respected (it is a feature)
+  - when multiple nodes simultaneously try to fetch activities, they should not
+    be given the same set of lines as it would cause all minus one to wait for
+    a write lock (and be ultimately aborted), effectively serializing their
+    action (so breaking paralellism).
+    So we must force MySQL to update lines in a random order.
+</dtml-comment>
+  priority, RAND()
+LIMIT <dtml-sqlvar count type="int">
+<dtml-var sql_delimiter>
+COMMIT

Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql [utf8] Fri Aug  6 18:36:16 2010
@@ -0,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+processing_node
+count</params>
+SELECT
+  *
+FROM
+  <dtml-var table>
+WHERE
+  processing_node = <dtml-sqlvar processing_node type="int">
+<dtml-if expr="count is not None">
+  LIMIT <dtml-sqlvar count type="int">
+</dtml-if>

Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
-  message
-SET
-  processing_node=0,
-  processing=0
-WHERE
-  uid IN (
-  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
-  )
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid</params>
-UPDATE message
-SET
-  processing_date = UTC_TIMESTAMP(),
-  processing = 1
-WHERE
-  uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-  )
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql (removed)
@@ -1,36 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-group_method_id
-</params>
-UPDATE
-  message
-SET
-  processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
-  processing_node=0
-  AND date <= <dtml-sqlvar to_date type="datetime">
-  <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
-ORDER BY
-<dtml-comment>
-  Explanation of the order by:
-  - priority must be respected (it is a feature)
-  - when multiple nodes simultaneously try to fetch activities, they should not
-    be given the same set of lines as it would cause all minus one to wait for
-    a write lock (and be ultimately aborted), effectively serializing their
-    action (so breaking paralellism).
-    So we must force MySQL to update lines in a random order.
-</dtml-comment>
-  priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
-  *
-FROM
-  message
-WHERE
-  processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
-  LIMIT <dtml-sqlvar count type="int">
-</dtml-if>

Modified: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] Fri Aug  6 18:36:16 2010
@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
   `processing` TINYINT NOT NULL DEFAULT 0,
   `processing_date` DATETIME,
   `priority` TINYINT NOT NULL DEFAULT 0,
+  `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
   `tag` VARCHAR(255) NOT NULL,
   `serialization_tag` VARCHAR(255) NOT NULL,
   `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,

Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
-  message_queue
-SET
-  processing_node=0,
-  processing=0
-WHERE
-  uid IN (
-  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
-  )
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql (removed)
@@ -1,34 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-</params>
-UPDATE
-  message_queue
-SET
-  processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
-  processing_node=0
-  AND date <= <dtml-sqlvar to_date type="datetime">
-ORDER BY
-<dtml-comment>
-  Explanation of the order by:
-  - priority must be respected (it is a feature)
-  - when multiple nodes simultaneously try to fetch activities, they should not
-    be given the same set of lines as it would cause all minus one to wait for
-    a write lock (and be ultimately aborted), effectively serializing their
-    action (so breaking paralellism).
-    So we must force MySQL to update lines in a random order.
-</dtml-comment>
-  priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
-  *
-FROM
-  message_queue
-WHERE
-  processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
-  LIMIT <dtml-sqlvar count type="int">
-</dtml-if>

Modified: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] Fri Aug  6 18:36:16 2010
@@ -15,11 +15,12 @@ message_list
 priority_list
 processing_node_list
 date_list
+group_method_id_list
 tag_list
 serialization_tag_list
 </params>
 INSERT INTO message_queue
-(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, tag, serialization_tag, message)
+(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
 VALUES
 <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
 <dtml-if sequence-start><dtml-else>,</dtml-if>
@@ -32,6 +33,7 @@ VALUES
   <dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
   0,
   <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
+  <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
   <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
   <dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
   <dtml-sqlvar expr="message_list[loop_item]" type="string">

Modified: erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Aug  6 18:36:16 2010
@@ -430,7 +430,7 @@ class TestCMFActivity(ERP5TypeTestCase, 
   def TryActiveProcessInsideActivity(self, activity):
     """
     Try two levels with active_process, we create one first
-    activity with an acitive process, then this new activity
+    activity with an active process, then this new activity
     uses another active process
     """
     portal = self.getPortal()
@@ -1872,26 +1872,26 @@ class TestCMFActivity(ERP5TypeTestCase, 
     getattr(organisation, 'uid')
 
 
-  def test_80_CallWithGroupIdParamater(self, quiet=0, run=run_all_test):
-    """
-    Test that group_id parameter is used to separate execution of the same method
-    """
+  def callWithGroupIdParamater(self, activity, quiet, run):
     if not run: return
     if not quiet:
-      message = '\nTest Activity with group_id parameter'
+      message = '\nTest Activity with group_id parameter (%s)' % activity
       ZopeTestCase._print(message)
       LOG('Testing... ',0,message)
 
     portal = self.getPortal()    
     organisation =  portal.organisation._getOb(self.company_id)
     # Defined a group method
-    def setFoobar(self, object_list, number=1):
-      for obj in object_list:
+    foobar_list = []
+    def setFoobar(self, object_list):
+      foobar_list.append(len(object_list))
+      for obj, args, kw in object_list:
+        number = kw.get('number', 1)
         if getattr(obj,'foobar', None) is not None:
           obj.foobar = obj.foobar + number
         else:
           obj.foobar = number
-      object_list[:] = []
+      del object_list[:]
     from Products.ERP5Type.Document.Folder import Folder
     Folder.setFoobar = setFoobar    
 
@@ -1904,46 +1904,65 @@ class TestCMFActivity(ERP5TypeTestCase, 
 
     # Test group_method_id is working without group_id
     for x in xrange(5):
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
       transaction.commit()      
 
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list),5)
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    self.assertEquals(1, organisation.getFoobar())
+    expected = dict(SQLDict=1, SQLQueue=5)[activity]
+    self.assertEquals(expected, organisation.getFoobar())
 
 
     # Test group_method_id is working with one group_id defined
     for x in xrange(5):
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
       transaction.commit()      
 
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list),5)
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    self.assertEquals(2, organisation.getFoobar())
+    self.assertEquals(expected * 2, organisation.getFoobar())
+
+    self.assertEquals([expected, expected], foobar_list)
+    del foobar_list[:]
 
     # Test group_method_id is working with many group_id defined
     for x in xrange(5):
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
       transaction.commit()      
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
       transaction.commit()
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
       transaction.commit()
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
       transaction.commit()
 
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list),20)
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    self.assertEquals(11, organisation.getFoobar())
+    self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
+                      organisation.getFoobar())
+    self.assertEquals(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity],
+                      sorted(foobar_list))
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list), 0)
-    
+
+  def test_80a_CallWithGroupIdParamaterSQLDict(self, quiet=0, run=run_all_test):
+    """
+    Test that group_id parameter is used to separate execution of the same method
+    """
+    self.callWithGroupIdParamater('SQLDict', quiet=quiet, run=run)
+
+  def test_80b_CallWithGroupIdParamaterSQLQueue(self, quiet=0,
+                                                run=run_all_test):
+    """
+    Test that group_id parameter is used to separate execution of the same method
+    """
+    self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
 
   def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
     """
@@ -2009,7 +2028,7 @@ class TestCMFActivity(ERP5TypeTestCase, 
     transaction.commit()
     self.tic()
     activity_tool = self.getActivityTool()
-    def modifySQLAndFail(self, object_list, **kw):
+    def modifySQLAndFail(self, object_list):
       # Only create the dummy activity if none is present: we would just
       # generate missleading errors (duplicate uid).
       if activity_tool.countMessage(method_id='dummy_activity') == 0:
@@ -2437,7 +2456,7 @@ class TestCMFActivity(ERP5TypeTestCase, 
     transaction.commit()
     self.tic()
     activity_tool = self.getActivityTool()
-    def modifySQL(self, object_list, *arg, **kw):
+    def modifySQL(self, object_list):
       # Only create the dummy activity if none is present: we would just
       # generate missleading errors (duplicate uid).
       if activity_tool.countMessage(method_id='dummy_activity') == 0:
@@ -3147,7 +3166,7 @@ class TestCMFActivity(ERP5TypeTestCase, 
     self.assertEqual(len(message_list), 1)
     message = message_list[0]
     portal.organisation_module._delOb(organisation.id)
-    activity_tool.invokeGroup('getTitle', [message])
+    activity_tool.invokeGroup('getTitle', [message], 'SQLDict')
     checkMessage(message, KeyError)
     activity_tool.manageCancel(message.object_path, message.method_id)
     # 2: activity method does not exist when activity is executed
@@ -3156,7 +3175,8 @@ class TestCMFActivity(ERP5TypeTestCase, 
     message_list = activity_tool.getMessageList()
     self.assertEqual(len(message_list), 1)
     message = message_list[0]
-    activity_tool.invokeGroup('this_method_does_not_exist', [message])
+    activity_tool.invokeGroup('this_method_does_not_exist',
+                              [message], 'SQLDict')
     checkMessage(message, KeyError)
     activity_tool.manageCancel(message.object_path, message.method_id)
 
@@ -3738,7 +3758,6 @@ class TestCMFActivity(ERP5TypeTestCase, 
       LOG('Testing... ',0,message)
     self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict')
 
-  @expectedFailure
   def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test):
     if not run: return
     if not quiet:
@@ -3779,7 +3798,49 @@ class TestCMFActivity(ERP5TypeTestCase, 
       activity_tool.manageClearActivities(keep=0)
     finally:
       SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
-  
+
+  def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self):
+    activity_tool = self.portal.portal_activities
+    obj1 = activity_tool.newActiveProcess()
+    obj2 = activity_tool.newActiveProcess()
+    transaction.commit()
+    self.tic()
+    group_method_call_list = []
+    def doSomething(self, message_list):
+      group_method_call_list.append(sorted((obj.getPath(), args, kw)
+                                           for obj, args, kw in message_list))
+      del message_list[:]
+    activity_tool.__class__.doSomething = doSomething
+    try:
+      for activity in 'SQLDict', 'SQLQueue':
+        activity_kw = dict(activity=activity, serialization_tag=self.id(),
+                           group_method_id='portal_activities/doSomething')
+        obj1.activate(**activity_kw).dummy(1, x=None)
+        obj2.activate(**activity_kw).dummy(2, y=None)
+        transaction.commit()
+        activity_tool.distribute()
+        activity_tool.tic()
+        self.assertEqual(group_method_call_list.pop(),
+                         sorted([(obj1.getPath(), (1,), dict(x=None)),
+                                 (obj2.getPath(), (2,), dict(y=None))]))
+        self.assertFalse(group_method_call_list)
+        self.assertFalse(activity_tool.getMessageList())
+        obj1.activate(priority=2, **activity_kw).dummy1(1, x=None)
+        obj1.activate(priority=1, **activity_kw).dummy2(2, y=None)
+        message1 = obj1.getPath(), (1,), dict(x=None)
+        message2 = obj1.getPath(), (2,), dict(y=None)
+        transaction.commit()
+        activity_tool.distribute()
+        self.assertEqual(len(activity_tool.getMessageList()), 2)
+        activity_tool.tic()
+        self.assertEqual(group_method_call_list.pop(),
+                         dict(SQLDict=[message2],
+                              SQLQueue=[message1, message2])[activity])
+        self.assertFalse(group_method_call_list)
+        self.assertFalse(activity_tool.getMessageList())
+    finally:
+      del activity_tool.__class__.doSomething
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestCMFActivity))

Modified: erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py [utf8] Fri Aug  6 18:36:16 2010
@@ -759,6 +759,23 @@ class CatalogTool (UniqueObject, ZCatalo
         self.catalog_object(object, url, idxs=idxs, sql_catalog_id=sql_catalog_id,**kw)
 
 
+    def catalogObjectList(self, object_list, *args, **kw):
+        """Catalog a list of objects"""
+        if type(object_list[0]) is tuple:
+          tmp_object_list = [x[0] for x in object_list]
+          ZCatalog.catalogObjectList(self, tmp_object_list, **x[2])
+          # keep failed objects in 'object_list'
+          object_list[:] = [x for x in object_list if x[0] in tmp_object_list]
+        else:
+          ZCatalog.catalogObjectList(self, object_list, *args, **kw)
+
+    security.declarePrivate('uncatalogObjectList')
+    def uncatalogObjectList(self, message_list):
+      """Uncatalog a list of objects"""
+      for obj, args, kw in message_list:
+        self.unindexObject(*args, **kw)
+      del message_list[:]
+
     security.declarePrivate('unindexObject')
     def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None):
         """

Modified: erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py [utf8] Fri Aug  6 18:36:16 2010
@@ -375,14 +375,12 @@ class CopyContainer:
           catalog.beforeUnindexObject(None,path=path,uid=uid)
           # Then start activity in order to remove lines in catalog,
           # sql wich generate locks
-          if path is None:
-            path = self.getPath()
           # - serialization_tag is used in order to prevent unindexation to 
           # happen before/in parallel with reindexations of the same object.
           catalog.activate(activity='SQLQueue',
                            tag='%s' % uid,
-                           serialization_tag=self.getRootDocumentPath()).unindexObject(None, 
-                                           path=path,uid=uid)
+                           group_method_id='portal_catalog/uncatalogObjectList',
+                           serialization_tag=self.getRootDocumentPath()).unindexObject(uid=uid)
 
   security.declareProtected(Permissions.ModifyPortalContent, 'moveObject')
   def moveObject(self, idxs=None):

Modified: erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py [utf8] Fri Aug  6 18:36:16 2010
@@ -31,6 +31,7 @@ import transaction
 
 from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
 #from AccessControl.SecurityManagement import newSecurityManager
+from Products.CMFActivity.ActivityTool import ActivityTool
 from Products.CMFActivity.Errors import ActivityPendingError
 
 class TestCopySupport(ERP5TypeTestCase):
@@ -100,13 +101,49 @@ class TestCopySupport(ERP5TypeTestCase):
     # Currently, the test passes only because ActivityTool.distribute always
     # iterates on queues in the same order: SQLQueue before SQLDict.
     # If Python returned dictionary values in a different order,
-    # reindex activities fail with the following error:
+    # reindex activities would fail with the following error:
     #   uid of <Products.ERP5Catalog.CatalogTool.IndexableObjectWrapper for
     #   /erp5/person_module/1/old_address> is 599L and is already assigned
     #   to deleted in catalog !!! This can be fatal.
     # This test would also fail if SQLDict was used for 'unindexObject'.
     self.tic()
 
+  def test_03_unindexObjectGrouping(self):
+    person = self.portal.person_module.newContent(portal_type='Person',
+                                                  address_city='Lille',
+                                                  email_text='foo at bar.com')
+    transaction.commit()
+    self.tic()
+    search_catalog = self.portal.portal_catalog.unrestrictedSearchResults
+    uid_list = [person.getUid(),
+                person.default_address.getUid(),
+                person.default_email.getUid()]
+    uid_list.sort()
+    self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
+    self.portal.person_module._delObject(person.getId())
+    del person
+    transaction.commit()
+    self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
+    activity_tool = self.portal.portal_activities
+    self.assertEqual(len(activity_tool.getMessageList()), len(uid_list))
+
+    ActivityTool_invokeGroup = ActivityTool.invokeGroup
+    invokeGroup_list = []
+    def invokeGroup(self, method_id, message_list, activity):
+      invokeGroup_list.extend((method_id,
+                               sorted(m.kw.get('uid') for m in message_list),
+                               activity))
+      return ActivityTool_invokeGroup(self, method_id, message_list, activity)
+    try:
+      ActivityTool.invokeGroup = invokeGroup
+      self.tic()
+    finally:
+      ActivityTool.invokeGroup = ActivityTool_invokeGroup
+    self.assertEqual(invokeGroup_list,
+      ['portal_catalog/uncatalogObjectList', uid_list, 'SQLQueue'])
+    self.assertEqual(len(search_catalog(uid=uid_list)), 0)
+
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestCopySupport))




More information about the Erp5-report mailing list