[Erp5-report] r36919 rafael - in /erp5/release/candidate: ./ local-eggs/ profiles/ release-...

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Jul 7 00:34:54 CEST 2010


Author: rafael
Date: Wed Jul  7 00:34:52 2010
New Revision: 36919

URL: http://svn.erp5.org?rev=36919&view=rev
Log:
Set Release Candidate Signature, versions, revisions and patches are defined now.

Added:
    erp5/release/candidate/CANDIDATE.txt   (with props)
    erp5/release/candidate/profiles/release.cfg
    erp5/release/candidate/release-patch/
    erp5/release/candidate/release-patch/unindexObject.patch
Modified:
    erp5/release/candidate/buildout.cfg
    erp5/release/candidate/local-eggs/   (props changed)
    erp5/release/candidate/local-eggs/EXTERNALS.TXT
    erp5/release/candidate/profiles/versions.cfg

Added: erp5/release/candidate/CANDIDATE.txt
URL: http://svn.erp5.org/erp5/release/candidate/CANDIDATE.txt?rev=36919&view=auto
==============================================================================
--- erp5/release/candidate/CANDIDATE.txt (added)
+++ erp5/release/candidate/CANDIDATE.txt [utf8] Wed Jul  7 00:34:52 2010
@@ -0,0 +1,8 @@
+RELEASE CANDIDATE NOTES FOR 5.4.6 RELEASE
+==========================================
+
+This release candidate buildout is temporary. This folder will be 
+kept during the tests and revision, and removed after release be 
+ready.
+
+DON'T USE THIS BUILDOUT FOR PRODUCTION ONLY FOR TESTING PROPOSE.

Propchange: erp5/release/candidate/CANDIDATE.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: erp5/release/candidate/buildout.cfg
URL: http://svn.erp5.org/erp5/release/candidate/buildout.cfg?rev=36919&r1=36918&r2=36919&view=diff
==============================================================================
--- erp5/release/candidate/buildout.cfg [utf8] (original)
+++ erp5/release/candidate/buildout.cfg [utf8] Wed Jul  7 00:34:52 2010
@@ -1,2 +1,2 @@
 [buildout]
-extends = profiles/official.cfg
+extends = profiles/release.cfg

Propchange: erp5/release/candidate/local-eggs/
------------------------------------------------------------------------------
--- svn:externals (original)
+++ svn:externals Wed Jul  7 00:34:52 2010
@@ -3,6 +3,6 @@
 # BEWARE: Any addition might be removed without further notice.
 # To play with eggs use mr.developer and publish eggs in proper places
 
-z3c.recipe.openoffice   http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
-Acquisition svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
-Products.DCWorkflow svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow
+z3c.recipe.openoffice -r114272 http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
+Acquisition -r114272 svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
+Products.DCWorkflow -r114272 svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow

Modified: erp5/release/candidate/local-eggs/EXTERNALS.TXT
URL: http://svn.erp5.org/erp5/release/candidate/local-eggs/EXTERNALS.TXT?rev=36919&r1=36918&r2=36919&view=diff
==============================================================================
--- erp5/release/candidate/local-eggs/EXTERNALS.TXT [utf8] (original)
+++ erp5/release/candidate/local-eggs/EXTERNALS.TXT [utf8] Wed Jul  7 00:34:52 2010
@@ -3,6 +3,6 @@
 # BEWARE: Any addition might be removed without further notice.
 # To play with eggs use mr.developer and publish eggs in proper places
 
-z3c.recipe.openoffice   http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
-Acquisition svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
-Products.DCWorkflow svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow
+z3c.recipe.openoffice -r114272 http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
+Acquisition -r114272 svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
+Products.DCWorkflow -r114272 svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow

Added: erp5/release/candidate/profiles/release.cfg
URL: http://svn.erp5.org/erp5/release/candidate/profiles/release.cfg?rev=36919&view=auto
==============================================================================
--- erp5/release/candidate/profiles/release.cfg (added)
+++ erp5/release/candidate/profiles/release.cfg [utf8] Wed Jul  7 00:34:52 2010
@@ -0,0 +1,9 @@
+[buildout]
+extends = ../profiles/official.cfg
+
+parts += release-patch
+
+[release-patch]
+recipe = plone.recipe.command
+command = cd ${buildout:directory}/parts/products-erp5/ && patch -p2 < ${buildout:directory}/release-patch/unindexObject.patch
+update-command = echo "Do nothing"

Modified: erp5/release/candidate/profiles/versions.cfg
URL: http://svn.erp5.org/erp5/release/candidate/profiles/versions.cfg?rev=36919&r1=36918&r2=36919&view=diff
==============================================================================
--- erp5/release/candidate/profiles/versions.cfg [utf8] (original)
+++ erp5/release/candidate/profiles/versions.cfg [utf8] Wed Jul  7 00:34:52 2010
@@ -1,16 +1,58 @@
-# taken from https://svn.erp5.org/repos/public/experimental/erp5.buildout/profiles/versions.cfg
 [versions]
-python-memcached = 1.45
+# taken from https://svn.erp5.org/repos/public/experimental/erp5.buildout/profiles/versions.cfg
+ClientForm = 0.2.10  
+MySQL-python = 1.2.3 
+Products.ExternalEditor = 1.0a2
 PyXML = 0.8.4
+SOAPpy = 0.12.0.1  
+cElementTree = 1.0.5-20051216  
+collective.recipe.supervisor = 0.10 
+collective.recipe.template = 1.8 
+elementtree = 1.2.7-20070827-preview   
+erp5.recipe.memcachedserver = 0.0.4
+erp5.recipe.mysqldatabase = 1.0.1  
+erp5.recipe.mysqlserver = 1.1.2  
+erp5.recipe.ooodinstance = 0.0.1
+erp5.recipe.softwarebuild = 0.1.1  
+erp5.recipe.standaloneinstance = 0.4.1 
+erp5.recipe.testrunner = 1.0.4  
+erp5.recipe.zope2install = 1.0  
+erp5_bt5_revision = ${:erp5_products_revision}
+erp5_products_revision = @36706
+erp5diff = 0.8.0   
+fpconst = 0.7.2
+hexagonit.recipe.cmmi = 1.3.0
+hexagonit.recipe.download = 1.4.1
+infrae.subversion = 1.4.5
+ipdb = 0.1
 ipython = 0.10
 itools = 0.20.8
+lxml = 2.2.6
+mechanize = 0.2.1
+meld3 = 0.6.6
 numpy = 1.3.0
+paramiko = 1.7.6
+plone.recipe.command = 1.1
+plone.recipe.distros = 1.5
+plone.recipe.zope2install = 3.2
 plone.recipe.zope2instance = 3.6
-erp5.recipe.standaloneinstance >= 0.4
-erp5.recipe.mysqlserver >= 1.1.1
+ply = 3.3
+py = 1.3.1
+pycrypto = 2.1.0
 pysvn = 1.7.2
-xml-marshaller = 0.9a
+python-ldap = 2.3.11
+python-memcached = 1.45
+pytz = 2010h
 rdiff-backup = 1.0.5
-erp5_products_revision =
-erp5_bt5_revision = ${:erp5_products_revision}
-lxml = 2.2.6
+setuptools = 0.6c11
+simplejson = 2.1.1
+supervisor = 3.0a8
+threadframe = 0.2
+timerserver = 2.0
+uuid = 1.30
+xml-marshaller = 0.9a
+xupdate-processor = 0.1
+zc.buildout = 1.5.0b2
+zc.recipe.cmmi = 1.3.1
+zc.recipe.egg = 1.2.3b2
+zope.testbrowser = 3.9.0

Added: erp5/release/candidate/release-patch/unindexObject.patch
URL: http://svn.erp5.org/erp5/release/candidate/release-patch/unindexObject.patch?rev=36919&view=auto
==============================================================================
--- erp5/release/candidate/release-patch/unindexObject.patch (added)
+++ erp5/release/candidate/release-patch/unindexObject.patch [utf8] Wed Jul  7 00:34:52 2010
@@ -0,0 +1,1675 @@
+diff --git a/products/CMFActivity/Activity/SQLBase.py b/products/CMFActivity/Activity/SQLBase.py
+index 0ff6520..8f9317f 100644
+--- a/products/CMFActivity/Activity/SQLBase.py
++++ b/products/CMFActivity/Activity/SQLBase.py
+@@ -33,8 +33,18 @@
+   MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
+ from Products.CMFActivity.ActiveObject import (
+   INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
+-from Queue import VALIDATION_ERROR_DELAY
++from Products.CMFActivity.ActivityRuntimeEnvironment import (
++  ActivityRuntimeEnvironment, getTransactionalVariable)
++from Queue import VALIDATION_ERROR_DELAY, abortTransactionSynchronously
+ 
++try:
++  from transaction import get as get_transaction
++except ImportError:
++  pass
++
++# Stop electing more messages for processing if more than this number of
++# objects are impacted by elected messages.
++MAX_GROUPED_OBJECTS = 100
+ 
+ def sort_message_key(message):
+   # same sort key as in SQL{Dict,Queue}_readMessageList
+@@ -129,6 +139,256 @@ def _log(self, severity, summary):
+     LOG(self.__class__.__name__, severity, summary,
+         error=severity>INFO and sys.exc_info() or None)
+ 
++  def getReservedMessageList(self, activity_tool, date, processing_node,
++                             limit=None, group_method_id=None):
++    """
++      Get and reserve a list of messages.
++      limit
++        Maximum number of messages to fetch.
++        This number is not garanted to be reached, because of:
++         - not enough messages being pending execution
++         - race condition (other nodes reserving the same messages at the same
++           time)
++        This number is guaranted not to be exceeded.
++        If None (or not given) no limit apply.
++    """
++    select = activity_tool.SQLBase_selectReservedMessageList
++    result = not group_method_id and select(table=self.sql_table, count=limit,
++                                            processing_node=processing_node)
++    if not result:
++      activity_tool.SQLBase_reserveMessageList(table=self.sql_table,
++        count=limit, processing_node=processing_node, to_date=date,
++        group_method_id=group_method_id)
++      result = select(table=self.sql_table,
++                      processing_node=processing_node, count=limit)
++    return result
++
++  def makeMessageListAvailable(self, activity_tool, uid_list):
++    """
++      Put messages back in processing_node=0 .
++    """
++    if len(uid_list):
++      activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
++                                                     uid=uid_list)
++
++  def getProcessableMessageList(self, activity_tool, processing_node):
++    """
++      Always true:
++        For each reserved message, delete redundant messages when it gets
++        reserved (definitely lost, but they are expandable since redundant).
++
++      - reserve a message
++      - set reserved message to processing=1 state
++      - if this message has a group_method_id:
++        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
++        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
++          - get one message from the reserved bunch (this messages will be
++            "needed")
++          - increase the number of impacted object
++        - set "needed" reserved messages to processing=1 state
++        - unreserve "unneeded" messages
++      - return still-reserved message list and a group_method_id
++
++      If any error happens in above described process, try to unreserve all
++      messages already reserved in that process.
++      If it fails, complain loudly that some messages might still be in an
++      unclean state.
++
++      Returned values:
++        4-tuple:
++          - list of messages
++          - impacted object count
++          - group_method_id
++          - uid_to_duplicate_uid_list_dict
++    """
++    def getReservedMessageList(limit, group_method_id=None):
++      line_list = self.getReservedMessageList(activity_tool=activity_tool,
++                                              date=now_date,
++                                              processing_node=processing_node,
++                                              limit=limit,
++                                              group_method_id=group_method_id)
++      if len(line_list):
++        self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
++      return line_list
++    def getDuplicateMessageUidList(line):
++      uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
++        line=line, processing_node=processing_node)
++      if len(uid_list):
++        self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
++      return uid_list
++    BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
++    now_date = self.getNow(activity_tool)
++    message_list = []
++    count = 0
++    group_method_id = None
++    try:
++      result = getReservedMessageList(limit=1)
++      uid_to_duplicate_uid_list_dict = {}
++      if len(result) > 0:
++        line = result[0]
++        uid = line.uid
++        m = self.loadMessage(line.message, uid=uid, line=line)
++        message_list.append(m)
++        group_method_id = line.group_method_id
++        activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
++        uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
++          .extend(getDuplicateMessageUidList(line))
++        if group_method_id not in (None, '', '\0'):
++          # Count the number of objects to prevent too many objects.
++          count += len(m.getObjectList(activity_tool))
++          if count < MAX_GROUPED_OBJECTS:
++            # Retrieve objects which have the same group method.
++            result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT,
++                                            group_method_id=group_method_id)
++            path_and_method_id_dict = {}
++            unreserve_uid_list = []
++            for line in result:
++              if line.uid == uid:
++                continue
++              # All fetched lines have the same group_method_id and
++              # processing_node.
++              # Their dates are lower-than or equal-to now_date.
++              # We read each line once so lines have distinct uids.
++              # So what remains to be filtered on are path, method_id and
++              # order_validation_text.
++              try:
++                key = line.path, line.method_id, line.order_validation_text
++              except AttributeError:
++                pass # message_queue does not have 'order_validation_text'
++              else:
++                original_uid = path_and_method_id_dict.get(key)
++                if original_uid is not None:
++                  uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
++                  .append(line.uid)
++                  continue
++                path_and_method_id_dict[key] = line.uid
++                uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
++                .extend(getDuplicateMessageUidList(line))
++              if count < MAX_GROUPED_OBJECTS:
++                m = self.loadMessage(line.message, uid=line.uid, line=line)
++                count += len(m.getObjectList(activity_tool))
++                message_list.append(m)
++              else:
++                unreserve_uid_list.append(line.uid)
++            activity_tool.SQLBase_processMessage(table=self.sql_table,
++              uid=[m.uid for m in message_list])
++            # Unreserve extra messages as soon as possible.
++            self.makeMessageListAvailable(activity_tool=activity_tool,
++                                          uid_list=unreserve_uid_list)
++      return (message_list, count, group_method_id,
++              uid_to_duplicate_uid_list_dict)
++    except:
++      self._log(WARNING, 'Exception while reserving messages.')
++      if len(message_list):
++        to_free_uid_list = [m.uid for m in message_list]
++        try:
++          self.makeMessageListAvailable(activity_tool=activity_tool,
++                                        uid_list=to_free_uid_list)
++        except:
++          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
++        else:
++          if len(to_free_uid_list):
++            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
++      else:
++        self._log(TRACE, '(no message was reserved)')
++      return [], 0, None, {}
++
++  # Queue semantic
++  def dequeueMessage(self, activity_tool, processing_node):
++    def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
++      final_uid_list = []
++      for uid in uid_list:
++        final_uid_list.append(uid)
++        final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
++      self.makeMessageListAvailable(activity_tool=activity_tool,
++                                    uid_list=final_uid_list)
++    message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
++      self.getProcessableMessageList(activity_tool, processing_node)
++    if message_list:
++      # Remove group_id parameter from group_method_id
++      if group_method_id is not None:
++        group_method_id = group_method_id.split('\0')[0]
++      if group_method_id not in (None, ""):
++        method  = activity_tool.invokeGroup
++        args = (group_method_id, message_list, self.__class__.__name__)
++        activity_runtime_environment = ActivityRuntimeEnvironment(None)
++      else:
++        method = activity_tool.invoke
++        message = message_list[0]
++        args = (message, )
++        activity_runtime_environment = ActivityRuntimeEnvironment(message)
++      # Commit right before executing messages.
++      # As MySQL transaction does not start exactly at the same time as ZODB
++      # transactions but a bit later, messages available might be called
++      # on objects which are not available - or available in an old
++      # version - to ZODB connector.
++      # So all connectors must be committed now that we have selected
++      # everything needed from MySQL to get a fresh view of ZODB objects.
++      get_transaction().commit()
++      tv = getTransactionalVariable(None)
++      tv['activity_runtime_environment'] = activity_runtime_environment
++      # Try to invoke
++      try:
++        method(*args)
++      except:
++        self._log(WARNING,
++          'Exception raised when invoking messages (uid, path, method_id) %r'
++          % [(m.uid, m.object_path, m.method_id) for m in message_list])
++        try:
++          abortTransactionSynchronously()
++        except:
++          # Unfortunately, database adapters may raise an exception against
++          # abort.
++          self._log(PANIC,
++              'abort failed, thus some objects may be modified accidentally')
++          raise
++        # XXX Is it still useful to free messages now that this node is able
++        #     to reselect them ?
++        to_free_uid_list = [x.uid for x in message_list]
++        try:
++          makeMessageListAvailable(to_free_uid_list,
++                                   uid_to_duplicate_uid_list_dict)
++        except:
++          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
++        else:
++          self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
++      # Abort if something failed.
++      if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
++        endTransaction = abortTransactionSynchronously
++      else:
++        endTransaction = get_transaction().commit
++      try:
++        endTransaction()
++      except:
++        self._log(WARNING,
++          'Failed to end transaction for messages (uid, path, method_id) %r'
++          % [(m.uid, m.object_path, m.method_id) for m in message_list])
++        if endTransaction == abortTransactionSynchronously:
++          self._log(PANIC, 'Failed to abort executed messages.'
++            ' Some objects may be modified accidentally.')
++        else:
++          try:
++            abortTransactionSynchronously()
++          except:
++            self._log(PANIC, 'Failed to abort executed messages which also'
++              ' failed to commit. Some objects may be modified accidentally.')
++            raise
++        exc_info = sys.exc_info()
++        for m in message_list:
++          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
++        try:
++          makeMessageListAvailable([x.uid for x in message_list],
++                                   uid_to_duplicate_uid_list_dict)
++        except:
++          self._log(ERROR, 'Failed to free remaining messages: %r'
++                           % (message_list, ))
++        else:
++          self._log(TRACE, 'Freed messages %r' % (message_list, ))
++      self.finalizeMessageExecution(activity_tool, message_list,
++                                    uid_to_duplicate_uid_list_dict)
++    get_transaction().commit()
++    return not message_list
++
+   def finalizeMessageExecution(self, activity_tool, message_list,
+                                uid_to_duplicate_uid_list_dict=None):
+     """
+diff --git a/products/CMFActivity/Activity/SQLDict.py b/products/CMFActivity/Activity/SQLDict.py
+index 6cf5b7f..43e72a8 100644
+--- a/products/CMFActivity/Activity/SQLDict.py
++++ b/products/CMFActivity/Activity/SQLDict.py
+@@ -27,18 +27,12 @@
+ ##############################################################################
+ 
+ from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
+-from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
++from Queue import VALID, INVALID_PATH
+ from RAMDict import RAMDict
+-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
+ from Products.CMFActivity.Errors import ActivityFlushError
+-from ZODB.POSException import ConflictError
+ import sys
+-from types import ClassType
+ #from time import time
+ from SQLBase import SQLBase, sort_message_key
+-from Products.CMFActivity.ActivityRuntimeEnvironment import (
+-  ActivityRuntimeEnvironment, getTransactionalVariable)
+-from zExceptions import ExceptionFormatter
+ 
+ try:
+   from transaction import get as get_transaction
+@@ -51,9 +45,6 @@
+ MAX_VALIDATED_LIMIT = 1000 
+ # Read up to this number of messages to validate.
+ READ_MESSAGE_LIMIT = 1000
+-# Stop electing more messages for processing if more than this number of
+-# objects are impacted by elected messages.
+-MAX_GROUPED_OBJECTS = 100
+ 
+ MAX_MESSAGE_LIST_SIZE = 100
+ 
+@@ -134,33 +125,6 @@ def getRegisteredMessageList(self, activity_buffer, activity_tool):
+     message_list = activity_buffer.getMessageList(self)
+     return [m for m in message_list if m.is_registered]
+ 
+-  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
+-    """
+-      Get and reserve a list of messages.
+-      limit
+-        Maximum number of messages to fetch.
+-        This number is not garanted to be reached, because of:
+-         - not enough messages being pending execution
+-         - race condition (other nodes reserving the same messages at the same
+-           time)
+-        This number is guaranted not to be exceeded.
+-        If None (or not given) no limit apply.
+-    """
+-    result = not group_method_id and \
+-      activity_tool.SQLDict_selectReservedMessageList(
+-        processing_node=processing_node, count=limit)
+-    if not result:
+-      activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
+-      result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
+-    return result
+-
+-  def makeMessageListAvailable(self, activity_tool, uid_list):
+-    """
+-      Put messages back in processing_node=0 .
+-    """
+-    if len(uid_list):
+-      activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
+-
+   def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+     """
+       Reserve unreserved messages matching given line.
+@@ -191,202 +155,7 @@ def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+       raise
+     return uid_list
+ 
+-  def getProcessableMessageList(self, activity_tool, processing_node):
+-    """
+-      Always true:
+-        For each reserved message, delete redundant messages when it gets
+-        reserved (definitely lost, but they are expandable since redundant).
+-
+-      - reserve a message
+-      - set reserved message to processing=1 state
+-      - if this message has a group_method_id:
+-        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+-        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+-          - get one message from the reserved bunch (this messages will be
+-            "needed")
+-          - increase the number of impacted object
+-        - set "needed" reserved messages to processing=1 state
+-        - unreserve "unneeded" messages
+-      - return still-reserved message list and a group_method_id
+-
+-      If any error happens in above described process, try to unreserve all
+-      messages already reserved in that process.
+-      If it fails, complain loudly that some messages might still be in an
+-      unclean state.
+-
+-      Returned values:
+-        4-tuple:
+-          - list of messages
+-          - impacted object count
+-          - group_method_id
+-          - uid_to_duplicate_uid_list_dict
+-    """
+-    def getReservedMessageList(limit, group_method_id=None):
+-      line_list = self.getReservedMessageList(activity_tool=activity_tool,
+-                                              date=now_date,
+-                                              processing_node=processing_node,
+-                                              limit=limit,
+-                                              group_method_id=group_method_id)
+-      if len(line_list):
+-        LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
+-      return line_list
+-    def getDuplicateMessageUidList(line):
+-      uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool, 
+-        line=line, processing_node=processing_node)
+-      if len(uid_list):
+-        LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
+-      return uid_list
+-    def makeMessageListAvailable(uid_list):
+-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+-    BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
+-    now_date = self.getNow(activity_tool)
+-    message_list = []
+-    count = 0
+-    group_method_id = None
+-    try:
+-      result = getReservedMessageList(limit=1)
+-      uid_to_duplicate_uid_list_dict = {}
+-      if len(result) > 0:
+-        line = result[0]
+-        uid = line.uid
+-        m = self.loadMessage(line.message, uid=uid, line=line)
+-        message_list.append(m)
+-        group_method_id = line.group_method_id
+-        activity_tool.SQLDict_processMessage(uid=[uid])
+-        uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
+-          .extend(getDuplicateMessageUidList(line))
+-        if group_method_id not in (None, '', '\0'):
+-          # Count the number of objects to prevent too many objects.
+-          count += len(m.getObjectList(activity_tool))
+-          if count < MAX_GROUPED_OBJECTS:
+-            # Retrieve objects which have the same group method.
+-            result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
+-            path_and_method_id_dict = {}
+-            unreserve_uid_list = []
+-            for line in result:
+-              if line.uid == uid:
+-                continue
+-              # All fetched lines have the same group_method_id and
+-              # processing_node.
+-              # Their dates are lower-than or equal-to now_date.
+-              # We read each line once so lines have distinct uids.
+-              # So what remains to be filtered on are path, method_id and
+-              # order_validation_text.
+-              key = (line.path, line.method_id, line.order_validation_text)
+-              original_uid = path_and_method_id_dict.get(key)
+-              if original_uid is not None:
+-                uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
+-                continue
+-              path_and_method_id_dict[key] = line.uid
+-              uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
+-              if count < MAX_GROUPED_OBJECTS:
+-                m = self.loadMessage(line.message, uid=line.uid, line=line)
+-                count += len(m.getObjectList(activity_tool))
+-                message_list.append(m)
+-              else:
+-                unreserve_uid_list.append(line.uid)
+-            activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
+-            # Unreserve extra messages as soon as possible.
+-            makeMessageListAvailable(unreserve_uid_list)
+-      return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
+-    except:
+-      LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
+-      if len(message_list):
+-        to_free_uid_list = [m.uid for m in message_list]
+-        try:
+-          makeMessageListAvailable(to_free_uid_list)
+-        except:
+-          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+-        else:
+-          if len(to_free_uid_list):
+-            LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+-      else:
+-        LOG('SQLDict', TRACE, '(no message was reserved)')
+-      return [], 0, None, {}
+-
+-  # Queue semantic
+-  def dequeueMessage(self, activity_tool, processing_node):
+-    def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
+-      final_uid_list = []
+-      for uid in uid_list:
+-        final_uid_list.append(uid)
+-        final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
+-    message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
+-      self.getProcessableMessageList(activity_tool, processing_node)
+-    if message_list:
+-      # Remove group_id parameter from group_method_id
+-      if group_method_id is not None:
+-        group_method_id = group_method_id.split('\0')[0]
+-      if group_method_id not in (None, ""):
+-        method  = activity_tool.invokeGroup
+-        args = (group_method_id, message_list)
+-        activity_runtime_environment = ActivityRuntimeEnvironment(None)
+-      else:
+-        method = activity_tool.invoke
+-        message = message_list[0]
+-        args = (message, )
+-        activity_runtime_environment = ActivityRuntimeEnvironment(message)
+-      # Commit right before executing messages.
+-      # As MySQL transaction does not start exactly at the same time as ZODB
+-      # transactions but a bit later, messages available might be called
+-      # on objects which are not available - or available in an old
+-      # version - to ZODB connector.
+-      # So all connectors must be committed now that we have selected
+-      # everything needed from MySQL to get a fresh view of ZODB objects.
+-      get_transaction().commit()
+-      tv = getTransactionalVariable(None)
+-      tv['activity_runtime_environment'] = activity_runtime_environment
+-      # Try to invoke
+-      try:
+-        method(*args)
+-      except:
+-        LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
+-        try:
+-          abortTransactionSynchronously()
+-        except:
+-          # Unfortunately, database adapters may raise an exception against abort.
+-          LOG('SQLDict', PANIC,
+-              'abort failed, thus some objects may be modified accidentally')
+-          raise
+-        # XXX Is it still useful to free messages now that this node is able
+-        #     to reselect them ?
+-        to_free_uid_list = [x.uid for x in message_list]
+-        try:
+-          makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
+-        except:
+-          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+-        else:
+-          LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
+-      # Abort if something failed.
+-      if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
+-        endTransaction = abortTransactionSynchronously
+-      else:
+-        endTransaction = get_transaction().commit
+-      try:
+-        endTransaction()
+-      except:
+-        LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
+-        if endTransaction == abortTransactionSynchronously:
+-          LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
+-        else:
+-          try:
+-            abortTransactionSynchronously()
+-          except:
+-            LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
+-            raise
+-        exc_info = sys.exc_info()
+-        for m in message_list:
+-          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
+-        try:
+-          makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
+-        except:
+-          LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
+-        else:
+-          LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
+-      self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
+-    get_transaction().commit()
+-    return not message_list
++  dequeueMessage = SQLBase.dequeueMessage
+ 
+   def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
+     hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
+@@ -533,9 +302,10 @@ def distribute(self, activity_tool, node_count):
+               message_list.sort(key=sort_message_key)
+               deletable_uid_list += [m.uid for m in message_list[1:]]
+             message = message_list[0]
+-            distributable_uid_set.add(message.uid)
+             serialization_tag = message.activity_kw.get('serialization_tag')
+-            if serialization_tag is not None:
++            if serialization_tag is None:
++              distributable_uid_set.add(message.uid)
++            else:
+               serialization_tag_dict.setdefault(serialization_tag,
+                                                 []).append(message)
+           # Don't let through if there is the same serialization tag in the
+@@ -545,15 +315,15 @@ def distribute(self, activity_tool, node_count):
+           # does not stop validating together. Because those messages should
+           # be processed together at once.
+           for message_list in serialization_tag_dict.itervalues():
+-            if len(message_list) == 1:
+-              continue
+             # Sort list of messages to validate the message with highest score
+             message_list.sort(key=sort_message_key)
++            distributable_uid_set.add(message_list[0].uid)
+             group_method_id = message_list[0].activity_kw.get('group_method_id')
++            if group_method_id is None:
++              continue
+             for message in message_list[1:]:
+-              if group_method_id is None or \
+-                 group_method_id != message.activity_kw.get('group_method_id'):
+-                distributable_uid_set.remove(message.uid)
++              if group_method_id == message.activity_kw.get('group_method_id'):
++                distributable_uid_set.add(message.uid)
+           if deletable_uid_list:
+             activity_tool.SQLBase_delMessage(table=self.sql_table,
+                                              uid=deletable_uid_list)
+diff --git a/products/CMFActivity/Activity/SQLQueue.py b/products/CMFActivity/Activity/SQLQueue.py
+index 4cd415d..bf2d217 100644
+--- a/products/CMFActivity/Activity/SQLQueue.py
++++ b/products/CMFActivity/Activity/SQLQueue.py
+@@ -28,16 +28,10 @@
+ 
+ from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
+ from RAMQueue import RAMQueue
+-from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
+-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
++from Queue import VALID, INVALID_PATH
+ from Products.CMFActivity.Errors import ActivityFlushError
+ from ZODB.POSException import ConflictError
+-from types import ClassType
+-import sys
+-from time import time
+ from SQLBase import SQLBase, sort_message_key
+-from Products.CMFActivity.ActivityRuntimeEnvironment import (
+-  ActivityRuntimeEnvironment, getTransactionalVariable)
+ from zExceptions import ExceptionFormatter
+ 
+ try:
+@@ -51,17 +45,6 @@
+ MAX_VALIDATED_LIMIT = 1000
+ # Read this many messages to validate.
+ READ_MESSAGE_LIMIT = 1000
+-# Process this many messages in each dequeueMessage call.
+-# Downside of setting to a "small" value: the cost of reserving a batch of
+-# few messages increases relatively to the cost of executing activities,
+-# making CMFActivity overhead significant.
+-# Downside of setting to a "big" value: if there are many slow activities in
+-# a multi-activity-node environment, multiple slow activities will be reserved
+-# by a single node, making a suboptimal use of the parallelisation offered by
+-# the cluster.
+-# Before increasing this value, consider using SQLDict with group methods
+-# first.
+-MESSAGE_BUNDLE_SIZE = 1
+ 
+ MAX_MESSAGE_LIST_SIZE = 100
+ 
+@@ -86,6 +69,8 @@ def prepareQueueMessageList(self, activity_tool, message_list):
+       method_id_list = [m.method_id for m in registered_message_list]
+       priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
+       date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
++      group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
++                              for message in registered_message_list]
+       tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
+       serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
+       dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
+@@ -95,6 +80,7 @@ def prepareQueueMessageList(self, activity_tool, message_list):
+                                               method_id_list=method_id_list,
+                                               priority_list=priority_list,
+                                               message_list=dumped_message_list,
++                                              group_method_id_list = group_method_id_list,
+                                               date_list=date_list,
+                                               tag_list=tag_list,
+                                               processing_node_list=None,
+@@ -113,162 +99,14 @@ def finishDeleteMessage(self, activity_tool_path, m):
+     # Nothing to do in SQLQueue.
+     pass
+ 
+-  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
++  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+     """
+-      Get and reserve a list of messages.
+-      limit
+-        Maximum number of messages to fetch.
+-        This number is not garanted to be reached, because of:
+-         - not enough messages being pending execution
+-         - race condition (other nodes reserving the same messages at the same
+-           time)
+-        This number is guaranted not to be exceeded.
+-        If None (or not given) no limit apply.
++      Reserve unreserved messages matching given line.
++      Return their uids.
+     """
+-    result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
+-    if len(result) == 0:
+-      activity_tool.SQLQueue_reserveMessageList(count=limit, processing_node=processing_node, to_date=date)
+-      result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
+-    return result
+-
+-  def makeMessageListAvailable(self, activity_tool, uid_list):
+-    """
+-      Put messages back in processing_node=0 .
+-    """
+-    if len(uid_list):
+-      activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
+-
+-  def getProcessableMessageList(self, activity_tool, processing_node):
+-    """
+-      Always true:
+-        For each reserved message, delete redundant messages when it gets
+-        reserved (definitely lost, but they are expandable since redundant).
+-
+-      - reserve a message
+-      - set reserved message to processing=1 state
+-      - if this message has a group_method_id:
+-        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+-        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+-          - get one message from the reserved bunch (this messages will be
+-            "needed")
+-          - increase the number of impacted object
+-        - set "needed" reserved messages to processing=1 state
+-        - unreserve "unneeded" messages
+-      - return still-reserved message list
+-
+-      If any error happens in above described process, try to unreserve all
+-      messages already reserved in that process.
+-      If it fails, complain loudly that some messages might still be in an
+-      unclean state.
+-
+-      Returned values:
+-        list of messages
+-    """
+-    def getReservedMessageList(limit):
+-      line_list = self.getReservedMessageList(activity_tool=activity_tool,
+-                                              date=now_date,
+-                                              processing_node=processing_node,
+-                                              limit=limit)
+-      if len(line_list):
+-        LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
+-      return line_list
+-    def makeMessageListAvailable(uid_list):
+-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+-    now_date = self.getNow(activity_tool)
+-    message_list = []
+-    try:
+-      result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
+-      for line in result:
+-        m = self.loadMessage(line.message, uid=line.uid, line=line)
+-        message_list.append(m)
+-      if len(message_list):
+-        activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
+-      return message_list
+-    except:
+-      LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
+-      if len(message_list):
+-        to_free_uid_list = [m.uid for m in message_list]
+-        try:
+-          makeMessageListAvailable(to_free_uid_list)
+-        except:
+-          LOG('SQLQueue', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+-        else:
+-          if len(to_free_uid_list):
+-            LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+-      else:
+-        LOG('SQLQueue', TRACE, '(no message was reserved)')
+-      return []
+-
+-  def dequeueMessage(self, activity_tool, processing_node):
+-    def makeMessageListAvailable(uid_list):
+-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+-    message_list = \
+-      self.getProcessableMessageList(activity_tool, processing_node)
+-    if message_list:
+-      processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
+-      processed_count = 0
+-      # Commit right before executing messages.
+-      # As MySQL transaction does not start exactly at the same time as ZODB
+-      # transactions but a bit later, messages available might be called
+-      # on objects which are not available - or available in an old
+-      # version - to ZODB connector.
+-      # So all connectors must be committed now that we have selected
+-      # everything needed from MySQL to get a fresh view of ZODB objects.
+-      get_transaction().commit()
+-      tv = getTransactionalVariable(None)
+-      for m in message_list:
+-        tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
+-        processed_count += 1
+-        # Try to invoke
+-        try:
+-          activity_tool.invoke(m)
+-          if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
+-            # Commit so that if a message raises it doesn't causes previous
+-            # successfull messages to be rolled back. This commit might fail,
+-            # so it is protected the same way as activity execution by the
+-            # same "try" block.
+-            get_transaction().commit()
+-          else:
+-            # This message failed, revert.
+-            abortTransactionSynchronously()
+-        except:
+-          value = m.uid, m.object_path, m.method_id
+-          LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
+-          try:
+-            abortTransactionSynchronously()
+-          except:
+-            # Unfortunately, database adapters may raise an exception against abort.
+-            LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
+-            raise
+-          # We must make sure that the message is not set as executed.
+-          # It is possible that the message is executed but the commit
+-          # of the transaction fails
+-          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
+-          # XXX Is it still useful to free message now that this node is able
+-          #     to reselect it ?
+-          try:
+-            makeMessageListAvailable([m.uid])
+-          except:
+-            LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
+-          else:
+-            LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
+-        if time() > processing_stop_time:
+-          LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
+-          break
+-      # Release all unprocessed messages
+-      to_free_uid_list = [m.uid for m in message_list[processed_count:]]
+-      if to_free_uid_list:
+-        try:
+-          makeMessageListAvailable(to_free_uid_list)
+-        except:
+-          LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+-        else:
+-          LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+-      self.finalizeMessageExecution(activity_tool,
+-                                    message_list[:processed_count])
+-    get_transaction().commit()
+-    return not message_list
++    return ()
+ 
++  dequeueMessage = SQLBase.dequeueMessage
+ 
+   def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
+     hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
+@@ -419,6 +257,12 @@ def distribute(self, activity_tool, node_count):
+             # Sort list of messages to validate the message with highest score
+             message_list.sort(key=sort_message_key)
+             distributable_uid_set.add(message_list[0].uid)
++            group_method_id = message_list[0].activity_kw.get('group_method_id')
++            if group_method_id is None:
++              continue
++            for message in message_list[1:]:
++              if group_method_id == message.activity_kw.get('group_method_id'):
++                distributable_uid_set.add(message.uid)
+           distributable_count = len(distributable_uid_set)
+           if distributable_count:
+             activity_tool.SQLBase_assignMessage(table=self.sql_table,
+diff --git a/products/CMFActivity/ActivityTool.py b/products/CMFActivity/ActivityTool.py
+index 1fe991e..adab3fb 100644
+--- a/products/CMFActivity/ActivityTool.py
++++ b/products/CMFActivity/ActivityTool.py
+@@ -1172,16 +1172,18 @@ def invoke(self, message):
+         for held in my_self.REQUEST._held:
+           self.REQUEST._hold(held)
+ 
+-    def invokeGroup(self, method_id, message_list):
++    def invokeGroup(self, method_id, message_list, activity):
+       if self.activity_tracking:
+-        activity_tracking_logger.info('invoking group messages: method_id=%s, paths=%s' % (method_id, ['/'.join(m.object_path) for m in message_list]))
++        activity_tracking_logger.info(
++          'invoking group messages: method_id=%s, paths=%s'
++          % (method_id, ['/'.join(m.object_path) for m in message_list]))
+       # Invoke a group method.
+-      object_list = []
+       expanded_object_list = []
+       new_message_list = []
+-      path_dict = {}
+-      # Filter the list of messages. If an object is not available, mark its message as non-executable.
+-      # In addition, expand an object if necessary, and make sure that no duplication happens.
++      path_set = set()
++      # Filter the list of messages. If an object is not available, mark its
++      # message as non-executable. In addition, expand an object if necessary,
++      # and make sure that no duplication happens.
+       for m in message_list:
+         # alternate method is used to segregate objects which cannot be grouped.
+         alternate_method_id = m.activity_kw.get('alternate_method_id')
+@@ -1195,42 +1197,29 @@ def invokeGroup(self, method_id, message_list):
+           m.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=self)
+           continue
+         try:
+-          i = len(new_message_list) # This is an index of this message in new_message_list.
+-          if m.hasExpandMethod():
+-            for subobj in m.getObjectList(self):
++          if activity == 'SQLQueue':
++            expanded_object_list.append((obj, m.args, m.kw))
++          else:
++            if m.hasExpandMethod():
++              subobject_list = m.getObjectList(self)
++            else:
++              subobject_list = obj,
++            for subobj in subobject_list:
+               path = subobj.getPath()
+-              if path not in path_dict:
+-                path_dict[path] = i
++              if path not in path_set:
++                path_set.add(path)
+                 if alternate_method_id is not None \
+                    and hasattr(aq_base(subobj), alternate_method_id):
+-                  # if this object is alternated, generate a new single active object.
++                  # if this object is alternated,
++                  # generate a new single active object
+                   activity_kw = m.activity_kw.copy()
+-                  if 'group_method_id' in activity_kw:
+-                    del activity_kw['group_method_id']
+-                  if 'group_id' in activity_kw:
+-                    del activity_kw['group_id']                    
+-                  active_obj = subobj.activate(**activity_kw)
++                  activity_kw.pop('group_method_id', None)
++                  activity_kw.pop('group_id', None)
++                  active_obj = subobj.activate(activity=activity, **activity_kw)
+                   getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
+                 else:
+-                  expanded_object_list.append(subobj)
+-          else:
+-            path = obj.getPath()
+-            if path not in path_dict:
+-              path_dict[path] = i
+-              if alternate_method_id is not None \
+-                  and hasattr(aq_base(obj), alternate_method_id):
+-                # if this object is alternated, generate a new single active object.
+-                activity_kw = m.activity_kw.copy()
+-                if 'group_method_id' in activity_kw:
+-                  del activity_kw['group_method_id']
+-                if 'group_id' in activity_kw:
+-                  del activity_kw['group_id']
+-                active_obj = obj.activate(**activity_kw)
+-                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
+-              else:
+-                expanded_object_list.append(obj)
+-          object_list.append(obj)
+-          new_message_list.append(m)
++                  expanded_object_list.append((subobj, m.args, m.kw))
++          new_message_list.append((m, obj))
+         except:
+           m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+ 
+@@ -1238,39 +1227,34 @@ def invokeGroup(self, method_id, message_list):
+         if len(expanded_object_list) > 0:
+           method = self.unrestrictedTraverse(method_id)
+           # FIXME: how to apply security here?
+-          # NOTE: expanded_object_list must be set to failed objects by the callee.
+-          #       If it fully succeeds, expanded_object_list must be empty when returning.
+-          result = method(expanded_object_list, **m.kw)
++          # NOTE: expanded_object_list must be set to failed objects by the
++          #       callee. If it fully succeeds, expanded_object_list must be
++          #       empty when returning.
++          result = method(expanded_object_list)
+         else:
+           result = None
+       except:
+         # In this case, the group method completely failed.
+         exc_info = sys.exc_info()
+-        for m in new_message_list:
+-          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
++        for m, obj in new_message_list:
++          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
+         LOG('WARNING ActivityTool', 0,
+             'Could not call method %s on objects %s' %
+-            (method_id, expanded_object_list), error=exc_info)
++            (method_id, [x[0] for x in expanded_object_list]), error=exc_info)
+         error_log = getattr(self, 'error_log', None)
+         if error_log is not None:
+           error_log.raising(exc_info)
+       else:
+-        # Obtain all indices of failed messages. Note that this can be a partial failure.
+-        failed_message_dict = {}
+-        for obj in expanded_object_list:
+-          path = obj.getPath()
+-          i = path_dict[path]
+-          failed_message_dict[i] = None
+-
++        # Obtain all indices of failed messages.
++        # Note that this can be a partial failure.
++        failed_message_set = set(id(x[2]) for x in expanded_object_list)
+         # Only for succeeded messages, an activity process is invoked (if any).
+-        for i in xrange(len(object_list)):
+-          object = object_list[i]
+-          m = new_message_list[i]
+-          if i in failed_message_dict:
++        for m, obj in new_message_list:
++          if id(m.kw) in failed_message_set:
+             m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+           else:
+             try:
+-              m.activateResult(self, result, object)
++              m.activateResult(self, result, obj)
+             except:
+               m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+             else:
+diff --git a/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql b/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
+new file mode 100644
+index 0000000..2ace125
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
+@@ -0,0 +1,20 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:0
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++uid</params>
++UPDATE
++  <dtml-var table>
++SET
++  processing_node=0,
++  processing=0
++WHERE
++  <dtml-sqltest uid type="int" multiple>
++<dtml-var sql_delimiter>
++COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql b/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
+new file mode 100644
+index 0000000..5aaa97e
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
+@@ -0,0 +1,20 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:1
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++uid</params>
++UPDATE
++  <dtml-var table>
++SET
++  processing_date = UTC_TIMESTAMP(),
++  processing = 1
++WHERE
++  <dtml-sqltest uid type="int" multiple>
++<dtml-var sql_delimiter>
++COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql b/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
+new file mode 100644
+index 0000000..246921d
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
+@@ -0,0 +1,39 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:0
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++processing_node
++to_date
++count
++group_method_id
++</params>
++UPDATE
++  <dtml-var table>
++SET
++  processing_node=<dtml-sqlvar processing_node type="int">
++WHERE
++  processing_node=0
++  AND date <= <dtml-sqlvar to_date type="datetime">
++  <dtml-if expr="group_method_id is not None">
++    AND group_method_id = <dtml-sqlvar group_method_id type="string">
++  </dtml-if>
++ORDER BY
++<dtml-comment>
++  Explanation of the order by:
++  - priority must be respected (it is a feature)
++  - when multiple nodes simultaneously try to fetch activities, they should not
++    be given the same set of lines as it would cause all minus one to wait for
++    a write lock (and be ultimately aborted), effectively serializing their
++    action (so breaking paralellism).
++    So we must force MySQL to update lines in a random order.
++</dtml-comment>
++  priority, RAND()
++LIMIT <dtml-sqlvar count type="int">
++<dtml-var sql_delimiter>
++COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql b/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
+new file mode 100644
+index 0000000..5292b03
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
+@@ -0,0 +1,21 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:0
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++processing_node
++count</params>
++SELECT
++  *
++FROM
++  <dtml-var table>
++WHERE
++  processing_node = <dtml-sqlvar processing_node type="int">
++<dtml-if expr="count is not None">
++  LIMIT <dtml-sqlvar count type="int">
++</dtml-if>
+diff --git a/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql b/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
+deleted file mode 100644
+index 663cbf8..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
++++ /dev/null
+@@ -1,21 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>uid_list</params>
+-UPDATE
+-  message
+-SET
+-  processing_node=0,
+-  processing=0
+-WHERE
+-  uid IN (
+-  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
+-  )
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql b/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
+deleted file mode 100644
+index 87b76fc..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
++++ /dev/null
+@@ -1,20 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:1
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>uid</params>
+-UPDATE message
+-SET
+-  processing_date = UTC_TIMESTAMP(),
+-  processing = 1
+-WHERE
+-  uid IN (
+-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
+-  )
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql b/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
+deleted file mode 100644
+index 1525791..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
++++ /dev/null
+@@ -1,36 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-to_date
+-count
+-group_method_id
+-</params>
+-UPDATE
+-  message
+-SET
+-  processing_node=<dtml-sqlvar processing_node type="int">
+-WHERE
+-  processing_node=0
+-  AND date <= <dtml-sqlvar to_date type="datetime">
+-  <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
+-ORDER BY
+-<dtml-comment>
+-  Explanation of the order by:
+-  - priority must be respected (it is a feature)
+-  - when multiple nodes simultaneously try to fetch activities, they should not
+-    be given the same set of lines as it would cause all minus one to wait for
+-    a write lock (and be ultimately aborted), effectively serializing their
+-    action (so breaking paralellism).
+-    So we must force MySQL to update lines in a random order.
+-</dtml-comment>
+-  priority, RAND()
+-LIMIT <dtml-sqlvar count type="int">
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql b/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
+deleted file mode 100644
+index 92cb20e..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
++++ /dev/null
+@@ -1,20 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-count</params>
+-SELECT
+-  *
+-FROM
+-  message
+-WHERE
+-  processing_node = <dtml-sqlvar processing_node type="int">
+-<dtml-if expr="count is not None">
+-  LIMIT <dtml-sqlvar count type="int">
+-</dtml-if>
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql b/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
+index 5c7eda2..763d240 100644
+--- a/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
++++ b/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
+@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
+   `processing` TINYINT NOT NULL DEFAULT 0,
+   `processing_date` DATETIME,
+   `priority` TINYINT NOT NULL DEFAULT 0,
++  `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
+   `tag` VARCHAR(255) NOT NULL,
+   `serialization_tag` VARCHAR(255) NOT NULL,
+   `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql b/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
+deleted file mode 100644
+index c83692a..0000000
+--- a/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
++++ /dev/null
+@@ -1,21 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>uid_list</params>
+-UPDATE
+-  message_queue
+-SET
+-  processing_node=0,
+-  processing=0
+-WHERE
+-  uid IN (
+-  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
+-  )
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql b/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
+deleted file mode 100644
+index 9d667c4..0000000
+--- a/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
++++ /dev/null
+@@ -1,34 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-to_date
+-count
+-</params>
+-UPDATE
+-  message_queue
+-SET
+-  processing_node=<dtml-sqlvar processing_node type="int">
+-WHERE
+-  processing_node=0
+-  AND date <= <dtml-sqlvar to_date type="datetime">
+-ORDER BY
+-<dtml-comment>
+-  Explanation of the order by:
+-  - priority must be respected (it is a feature)
+-  - when multiple nodes simultaneously try to fetch activities, they should not
+-    be given the same set of lines as it would cause all minus one to wait for
+-    a write lock (and be ultimately aborted), effectively serializing their
+-    action (so breaking paralellism).
+-    So we must force MySQL to update lines in a random order.
+-</dtml-comment>
+-  priority, RAND()
+-LIMIT <dtml-sqlvar count type="int">
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql b/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
+deleted file mode 100644
+index dcddfb5..0000000
+--- a/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
++++ /dev/null
+@@ -1,20 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-count</params>
+-SELECT
+-  *
+-FROM
+-  message_queue
+-WHERE
+-  processing_node = <dtml-sqlvar processing_node type="int">
+-<dtml-if expr="count is not None">
+-  LIMIT <dtml-sqlvar count type="int">
+-</dtml-if>
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql b/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
+index 17ca67a..8c22150 100644
+--- a/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
++++ b/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
+@@ -15,11 +15,12 @@ message_list
+ priority_list
+ processing_node_list
+ date_list
++group_method_id_list
+ tag_list
+ serialization_tag_list
+ </params>
+ INSERT INTO message_queue
+-(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, tag, serialization_tag, message)
++(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
+ VALUES
+ <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
+ <dtml-if sequence-start><dtml-else>,</dtml-if>
+@@ -32,6 +33,7 @@ VALUES
+   <dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
+   0,
+   <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
++  <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
+   <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
+   <dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
+   <dtml-sqlvar expr="message_list[loop_item]" type="string">
+diff --git a/products/CMFActivity/tests/testCMFActivity.py b/products/CMFActivity/tests/testCMFActivity.py
+index ccb5aec..34f2857 100644
+--- a/products/CMFActivity/tests/testCMFActivity.py
++++ b/products/CMFActivity/tests/testCMFActivity.py
+@@ -433,7 +433,7 @@ def TryActiveProcess(self, activity):
+   def TryActiveProcessInsideActivity(self, activity):
+     """
+     Try two levels with active_process, we create one first
+-    activity with an acitive process, then this new activity
++    activity with an active process, then this new activity
+     uses another active process
+     """
+     portal = self.getPortal()
+@@ -1883,26 +1883,26 @@ def test_79_AbortTransactionSynchronously(self, quiet=0, run=run_all_test):
+     getattr(organisation, 'uid')
+ 
+ 
+-  def test_80_CallWithGroupIdParamater(self, quiet=0, run=run_all_test):
+-    """
+-    Test that group_id parameter is used to separate execution of the same method
+-    """
++  def callWithGroupIdParamater(self, activity, quiet, run):
+     if not run: return
+     if not quiet:
+-      message = '\nTest Activity with group_id parameter'
++      message = '\nTest Activity with group_id parameter (%s)' % activity
+       ZopeTestCase._print(message)
+       LOG('Testing... ',0,message)
+ 
+     portal = self.getPortal()    
+     organisation =  portal.organisation._getOb(self.company_id)
+     # Defined a group method
+-    def setFoobar(self, object_list, number=1):
+-      for obj in object_list:
++    foobar_list = []
++    def setFoobar(self, object_list):
++      foobar_list.append(len(object_list))
++      for obj, args, kw in object_list:
++        number = kw.get('number', 1)
+         if getattr(obj,'foobar', None) is not None:
+           obj.foobar = obj.foobar + number
+         else:
+           obj.foobar = number
+-      object_list[:] = []
++      del object_list[:]
+     from Products.ERP5Type.Document.Folder import Folder
+     Folder.setFoobar = setFoobar    
+ 
+@@ -1915,46 +1915,65 @@ def getFoobar(self):
+ 
+     # Test group_method_id is working without group_id
+     for x in xrange(5):
+-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar").reindexObject(number=1)
++      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
+       get_transaction().commit()      
+ 
+     message_list = portal.portal_activities.getMessageList()
+     self.assertEquals(len(message_list),5)
+     portal.portal_activities.distribute()
+     portal.portal_activities.tic()
+-    self.assertEquals(1, organisation.getFoobar())
++    expected = dict(SQLDict=1, SQLQueue=5)[activity]
++    self.assertEquals(expected, organisation.getFoobar())
+ 
+ 
+     # Test group_method_id is working with one group_id defined
+     for x in xrange(5):
+-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
++      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+       get_transaction().commit()      
+ 
+     message_list = portal.portal_activities.getMessageList()
+     self.assertEquals(len(message_list),5)
+     portal.portal_activities.distribute()
+     portal.portal_activities.tic()
+-    self.assertEquals(2, organisation.getFoobar())
++    self.assertEquals(expected * 2, organisation.getFoobar())
++
++    self.assertEquals([expected, expected], foobar_list)
++    del foobar_list[:]
+ 
+     # Test group_method_id is working with many group_id defined
+     for x in xrange(5):
+-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
++      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+       get_transaction().commit()      
+-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
++      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
+       get_transaction().commit()
+-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
++      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+       get_transaction().commit()
+-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
++      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
+       get_transaction().commit()
+ 
+     message_list = portal.portal_activities.getMessageList()
+     self.assertEquals(len(message_list),20)
+     portal.portal_activities.distribute()
+     portal.portal_activities.tic()
+-    self.assertEquals(11, organisation.getFoobar())
++    self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
++                      organisation.getFoobar())
++    self.assertEquals(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity],
++                      sorted(foobar_list))
+     message_list = portal.portal_activities.getMessageList()
+     self.assertEquals(len(message_list), 0)
+-    
++
++  def test_80a_CallWithGroupIdParamaterSQLDict(self, quiet=0, run=run_all_test):
++    """
++    Test that group_id parameter is used to separate execution of the same method
++    """
++    self.callWithGroupIdParamater('SQLDict', quiet=quiet, run=run)
++
++  def test_80b_CallWithGroupIdParamaterSQLQueue(self, quiet=0,
++                                                run=run_all_test):
++    """
++    Test that group_id parameter is used to separate execution of the same method
++    """
++    self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
+ 
+   def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
+     """
+@@ -2020,7 +2039,7 @@ def test_83_ActivityModificationsViaCMFActivityConnectionRolledBackOnErrorSQLDic
+     get_transaction().commit()
+     self.tic()
+     activity_tool = self.getActivityTool()
+-    def modifySQLAndFail(self, object_list, **kw):
++    def modifySQLAndFail(self, object_list):
+       # Only create the dummy activity if none is present: we would just
+       # generate missleading errors (duplicate uid).
+       if activity_tool.countMessage(method_id='dummy_activity') == 0:
+@@ -2448,7 +2467,7 @@ def test_94_ActivityToolCommitFailureDoesNotCommitCMFActivitySQLConnectionSQLDic
+     get_transaction().commit()
+     self.tic()
+     activity_tool = self.getActivityTool()
+-    def modifySQL(self, object_list, *arg, **kw):
++    def modifySQL(self, object_list):
+       # Only create the dummy activity if none is present: we would just
+       # generate missleading errors (duplicate uid).
+       if activity_tool.countMessage(method_id='dummy_activity') == 0:
+@@ -3158,7 +3177,7 @@ def checkMessage(message, exception_type):
+     self.assertEqual(len(message_list), 1)
+     message = message_list[0]
+     portal.organisation_module._delOb(organisation.id)
+-    activity_tool.invokeGroup('getTitle', [message])
++    activity_tool.invokeGroup('getTitle', [message], 'SQLDict')
+     checkMessage(message, KeyError)
+     activity_tool.manageCancel(message.object_path, message.method_id)
+     # 2: activity method does not exist when activity is executed
+@@ -3167,7 +3186,8 @@ def checkMessage(message, exception_type):
+     message_list = activity_tool.getMessageList()
+     self.assertEqual(len(message_list), 1)
+     message = message_list[0]
+-    activity_tool.invokeGroup('this_method_does_not_exist', [message])
++    activity_tool.invokeGroup('this_method_does_not_exist',
++                              [message], 'SQLDict')
+     checkMessage(message, KeyError)
+     activity_tool.manageCancel(message.object_path, message.method_id)
+ 
+@@ -3749,7 +3769,6 @@ def test_122_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLDict
+       LOG('Testing... ',0,message)
+     self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict')
+ 
+-  @expectedFailure
+   def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test):
+     if not run: return
+     if not quiet:
+@@ -3790,7 +3809,49 @@ def doSomething(self):
+       activity_tool.manageClearActivities(keep=0)
+     finally:
+       SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
+-  
++
++  def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self):
++    activity_tool = self.portal.portal_activities
++    obj1 = activity_tool.newActiveProcess()
++    obj2 = activity_tool.newActiveProcess()
++    get_transaction().commit()
++    self.tic()
++    group_method_call_list = []
++    def doSomething(self, message_list):
++      group_method_call_list.append(sorted((obj.getPath(), args, kw)
++                                           for obj, args, kw in message_list))
++      del message_list[:]
++    activity_tool.__class__.doSomething = doSomething
++    try:
++      for activity in 'SQLDict', 'SQLQueue':
++        activity_kw = dict(activity=activity, serialization_tag=self.id(),
++                           group_method_id='portal_activities/doSomething')
++        obj1.activate(**activity_kw).dummy(1, x=None)
++        obj2.activate(**activity_kw).dummy(2, y=None)
++        get_transaction().commit()
++        activity_tool.distribute()
++        activity_tool.tic()
++        self.assertEqual(group_method_call_list.pop(),
++                         sorted([(obj1.getPath(), (1,), dict(x=None)),
++                                 (obj2.getPath(), (2,), dict(y=None))]))
++        self.assertFalse(group_method_call_list)
++        self.assertFalse(activity_tool.getMessageList())
++        obj1.activate(priority=2, **activity_kw).dummy1(1, x=None)
++        obj1.activate(priority=1, **activity_kw).dummy2(2, y=None)
++        message1 = obj1.getPath(), (1,), dict(x=None)
++        message2 = obj1.getPath(), (2,), dict(y=None)
++        get_transaction().commit()
++        activity_tool.distribute()
++        self.assertEqual(len(activity_tool.getMessageList()), 2)
++        activity_tool.tic()
++        self.assertEqual(group_method_call_list.pop(),
++                         dict(SQLDict=[message2],
++                              SQLQueue=[message1, message2])[activity])
++        self.assertFalse(group_method_call_list)
++        self.assertFalse(activity_tool.getMessageList())
++    finally:
++      del activity_tool.__class__.doSomething
++
+ def test_suite():
+   suite = unittest.TestSuite()
+   suite.addTest(unittest.makeSuite(TestCMFActivity))
+diff --git a/products/ERP5Catalog/CatalogTool.py b/products/ERP5Catalog/CatalogTool.py
+index a726027..ce01545 100644
+--- a/products/ERP5Catalog/CatalogTool.py
++++ b/products/ERP5Catalog/CatalogTool.py
+@@ -759,6 +759,23 @@ def reindexObject(self, object, idxs=None, sql_catalog_id=None,**kw):
+         self.catalog_object(object, url, idxs=idxs, sql_catalog_id=sql_catalog_id,**kw)
+ 
+ 
++    def catalogObjectList(self, object_list, *args, **kw):
++        """Catalog a list of objects"""
++        if type(object_list[0]) is tuple:
++          tmp_object_list = [x[0] for x in object_list]
++          ZCatalog.catalogObjectList(self, tmp_object_list, **x[2])
++          # keep failed objects in 'object_list'
++          object_list[:] = [x for x in object_list if x[0] in tmp_object_list]
++        else:
++          ZCatalog.catalogObjectList(self, object_list, *args, **kw)
++
++    security.declarePrivate('uncatalogObjectList')
++    def uncatalogObjectList(self, message_list):
++      """Uncatalog a list of objects"""
++      for obj, args, kw in message_list:
++        self.unindexObject(*args, **kw)
++      del message_list[:]
++
+     security.declarePrivate('unindexObject')
+     def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None):
+         """
+diff --git a/products/ERP5Type/CopySupport.py b/products/ERP5Type/CopySupport.py
+index f3404f6..b8f6058 100644
+--- a/products/ERP5Type/CopySupport.py
++++ b/products/ERP5Type/CopySupport.py
+@@ -375,14 +375,12 @@ def unindexObject(self, path=None):
+           catalog.beforeUnindexObject(None,path=path,uid=uid)
+           # Then start activity in order to remove lines in catalog,
+           # sql wich generate locks
+-          if path is None:
+-            path = self.getPath()
+           # - serialization_tag is used in order to prevent unindexation to 
+           # happen before/in parallel with reindexations of the same object.
+           catalog.activate(activity='SQLQueue',
+                            tag='%s' % uid,
+-                           serialization_tag=self.getRootDocumentPath()).unindexObject(None, 
+-                                           path=path,uid=uid)
++                           group_method_id='portal_catalog/uncatalogObjectList',
++                           serialization_tag=self.getRootDocumentPath()).unindexObject(uid=uid)
+ 
+   security.declareProtected(Permissions.ModifyPortalContent, 'moveObject')
+   def moveObject(self, idxs=None):
+diff --git a/products/ERP5Type/tests/testCopySupport.py b/products/ERP5Type/tests/testCopySupport.py
+index 3d1ba1d..3b5e57b 100644
+--- a/products/ERP5Type/tests/testCopySupport.py
++++ b/products/ERP5Type/tests/testCopySupport.py
+@@ -31,6 +31,7 @@
+ 
+ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
+ #from AccessControl.SecurityManagement import newSecurityManager
++from Products.CMFActivity.ActivityTool import ActivityTool
+ from Products.CMFActivity.Errors import ActivityPendingError
+ 
+ class TestCopySupport(ERP5TypeTestCase):
+@@ -100,13 +101,49 @@ def test_02_unindexObjectDependency(self):
+     # Currently, the test passes only because ActivityTool.distribute always
+     # iterates on queues in the same order: SQLQueue before SQLDict.
+     # If Python returned dictionary values in a different order,
+-    # reindex activities fail with the following error:
++    # reindex activities would fail with the following error:
+     #   uid of <Products.ERP5Catalog.CatalogTool.IndexableObjectWrapper for
+     #   /erp5/person_module/1/old_address> is 599L and is already assigned
+     #   to deleted in catalog !!! This can be fatal.
+     # This test would also fail if SQLDict was used for 'unindexObject'.
+     self.tic()
+ 
++  def test_03_unindexObjectGrouping(self):
++    person = self.portal.person_module.newContent(portal_type='Person',
++                                                  address_city='Lille',
++                                                  email_text='foo at bar.com')
++    transaction.commit()
++    self.tic()
++    search_catalog = self.portal.portal_catalog.unrestrictedSearchResults
++    uid_list = [person.getUid(),
++                person.default_address.getUid(),
++                person.default_email.getUid()]
++    uid_list.sort()
++    self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
++    self.portal.person_module._delObject(person.getId())
++    del person
++    transaction.commit()
++    self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
++    activity_tool = self.portal.portal_activities
++    self.assertEqual(len(activity_tool.getMessageList()), len(uid_list))
++
++    ActivityTool_invokeGroup = ActivityTool.invokeGroup
++    invokeGroup_list = []
++    def invokeGroup(self, method_id, message_list, activity):
++      invokeGroup_list.extend((method_id,
++                               sorted(m.kw.get('uid') for m in message_list),
++                               activity))
++      return ActivityTool_invokeGroup(self, method_id, message_list, activity)
++    try:
++      ActivityTool.invokeGroup = invokeGroup
++      self.tic()
++    finally:
++      ActivityTool.invokeGroup = ActivityTool_invokeGroup
++    self.assertEqual(invokeGroup_list,
++      ['portal_catalog/uncatalogObjectList', uid_list, 'SQLQueue'])
++    self.assertEqual(len(search_catalog(uid=uid_list)), 0)
++
++
+ def test_suite():
+   suite = unittest.TestSuite()
+   suite.addTest(unittest.makeSuite(TestCopySupport))




More information about the Erp5-report mailing list