[Erp5-report] r36919 rafael - in /erp5/release/candidate: ./ local-eggs/ profiles/ release-...
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Jul 7 00:34:54 CEST 2010
Author: rafael
Date: Wed Jul 7 00:34:52 2010
New Revision: 36919
URL: http://svn.erp5.org?rev=36919&view=rev
Log:
Set Release Candidate Signature, versions, revisions and patches are defined now.
Added:
erp5/release/candidate/CANDIDATE.txt (with props)
erp5/release/candidate/profiles/release.cfg
erp5/release/candidate/release-patch/
erp5/release/candidate/release-patch/unindexObject.patch
Modified:
erp5/release/candidate/buildout.cfg
erp5/release/candidate/local-eggs/ (props changed)
erp5/release/candidate/local-eggs/EXTERNALS.TXT
erp5/release/candidate/profiles/versions.cfg
Added: erp5/release/candidate/CANDIDATE.txt
URL: http://svn.erp5.org/erp5/release/candidate/CANDIDATE.txt?rev=36919&view=auto
==============================================================================
--- erp5/release/candidate/CANDIDATE.txt (added)
+++ erp5/release/candidate/CANDIDATE.txt [utf8] Wed Jul 7 00:34:52 2010
@@ -0,0 +1,8 @@
+RELEASE CANDIDATE NOTES FOR 5.4.6 RELEASE
+==========================================
+
+This release candidate buildout is temporary. This folder will be
+kept during the tests and revision, and removed after release be
+ready.
+
+DON'T USE THIS BUILDOUT FOR PRODUCTION ONLY FOR TESTING PROPOSE.
Propchange: erp5/release/candidate/CANDIDATE.txt
------------------------------------------------------------------------------
svn:eol-style = native
Modified: erp5/release/candidate/buildout.cfg
URL: http://svn.erp5.org/erp5/release/candidate/buildout.cfg?rev=36919&r1=36918&r2=36919&view=diff
==============================================================================
--- erp5/release/candidate/buildout.cfg [utf8] (original)
+++ erp5/release/candidate/buildout.cfg [utf8] Wed Jul 7 00:34:52 2010
@@ -1,2 +1,2 @@
[buildout]
-extends = profiles/official.cfg
+extends = profiles/release.cfg
Propchange: erp5/release/candidate/local-eggs/
------------------------------------------------------------------------------
--- svn:externals (original)
+++ svn:externals Wed Jul 7 00:34:52 2010
@@ -3,6 +3,6 @@
# BEWARE: Any addition might be removed without further notice.
# To play with eggs use mr.developer and publish eggs in proper places
-z3c.recipe.openoffice http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
-Acquisition svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
-Products.DCWorkflow svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow
+z3c.recipe.openoffice -r114272 http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
+Acquisition -r114272 svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
+Products.DCWorkflow -r114272 svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow
Modified: erp5/release/candidate/local-eggs/EXTERNALS.TXT
URL: http://svn.erp5.org/erp5/release/candidate/local-eggs/EXTERNALS.TXT?rev=36919&r1=36918&r2=36919&view=diff
==============================================================================
--- erp5/release/candidate/local-eggs/EXTERNALS.TXT [utf8] (original)
+++ erp5/release/candidate/local-eggs/EXTERNALS.TXT [utf8] Wed Jul 7 00:34:52 2010
@@ -3,6 +3,6 @@
# BEWARE: Any addition might be removed without further notice.
# To play with eggs use mr.developer and publish eggs in proper places
-z3c.recipe.openoffice http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
-Acquisition svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
-Products.DCWorkflow svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow
+z3c.recipe.openoffice -r114272 http://svn.zope.org/repos/main/z3c.recipe.openoffice/branches/erp5-downloadcache
+Acquisition -r114272 svn://svn.zope.org/repos/main/Acquisition/branches/erp5-aq_dynamic
+Products.DCWorkflow -r114272 svn://svn.zope.org/repos/main/Sandbox/lra/branches/restore-WorkflowMethod-Products.DCWorkflow
Added: erp5/release/candidate/profiles/release.cfg
URL: http://svn.erp5.org/erp5/release/candidate/profiles/release.cfg?rev=36919&view=auto
==============================================================================
--- erp5/release/candidate/profiles/release.cfg (added)
+++ erp5/release/candidate/profiles/release.cfg [utf8] Wed Jul 7 00:34:52 2010
@@ -0,0 +1,9 @@
+[buildout]
+extends = ../profiles/official.cfg
+
+parts += release-patch
+
+[release-patch]
+recipe = plone.recipe.command
+command = cd ${buildout:directory}/parts/products-erp5/ && patch -p2 < ${buildout:directory}/release-patch/unindexObject.patch
+update-command = echo "Do nothing"
Modified: erp5/release/candidate/profiles/versions.cfg
URL: http://svn.erp5.org/erp5/release/candidate/profiles/versions.cfg?rev=36919&r1=36918&r2=36919&view=diff
==============================================================================
--- erp5/release/candidate/profiles/versions.cfg [utf8] (original)
+++ erp5/release/candidate/profiles/versions.cfg [utf8] Wed Jul 7 00:34:52 2010
@@ -1,16 +1,58 @@
-# taken from https://svn.erp5.org/repos/public/experimental/erp5.buildout/profiles/versions.cfg
[versions]
-python-memcached = 1.45
+# taken from https://svn.erp5.org/repos/public/experimental/erp5.buildout/profiles/versions.cfg
+ClientForm = 0.2.10
+MySQL-python = 1.2.3
+Products.ExternalEditor = 1.0a2
PyXML = 0.8.4
+SOAPpy = 0.12.0.1
+cElementTree = 1.0.5-20051216
+collective.recipe.supervisor = 0.10
+collective.recipe.template = 1.8
+elementtree = 1.2.7-20070827-preview
+erp5.recipe.memcachedserver = 0.0.4
+erp5.recipe.mysqldatabase = 1.0.1
+erp5.recipe.mysqlserver = 1.1.2
+erp5.recipe.ooodinstance = 0.0.1
+erp5.recipe.softwarebuild = 0.1.1
+erp5.recipe.standaloneinstance = 0.4.1
+erp5.recipe.testrunner = 1.0.4
+erp5.recipe.zope2install = 1.0
+erp5_bt5_revision = ${:erp5_products_revision}
+erp5_products_revision = @36706
+erp5diff = 0.8.0
+fpconst = 0.7.2
+hexagonit.recipe.cmmi = 1.3.0
+hexagonit.recipe.download = 1.4.1
+infrae.subversion = 1.4.5
+ipdb = 0.1
ipython = 0.10
itools = 0.20.8
+lxml = 2.2.6
+mechanize = 0.2.1
+meld3 = 0.6.6
numpy = 1.3.0
+paramiko = 1.7.6
+plone.recipe.command = 1.1
+plone.recipe.distros = 1.5
+plone.recipe.zope2install = 3.2
plone.recipe.zope2instance = 3.6
-erp5.recipe.standaloneinstance >= 0.4
-erp5.recipe.mysqlserver >= 1.1.1
+ply = 3.3
+py = 1.3.1
+pycrypto = 2.1.0
pysvn = 1.7.2
-xml-marshaller = 0.9a
+python-ldap = 2.3.11
+python-memcached = 1.45
+pytz = 2010h
rdiff-backup = 1.0.5
-erp5_products_revision =
-erp5_bt5_revision = ${:erp5_products_revision}
-lxml = 2.2.6
+setuptools = 0.6c11
+simplejson = 2.1.1
+supervisor = 3.0a8
+threadframe = 0.2
+timerserver = 2.0
+uuid = 1.30
+xml-marshaller = 0.9a
+xupdate-processor = 0.1
+zc.buildout = 1.5.0b2
+zc.recipe.cmmi = 1.3.1
+zc.recipe.egg = 1.2.3b2
+zope.testbrowser = 3.9.0
Added: erp5/release/candidate/release-patch/unindexObject.patch
URL: http://svn.erp5.org/erp5/release/candidate/release-patch/unindexObject.patch?rev=36919&view=auto
==============================================================================
--- erp5/release/candidate/release-patch/unindexObject.patch (added)
+++ erp5/release/candidate/release-patch/unindexObject.patch [utf8] Wed Jul 7 00:34:52 2010
@@ -0,0 +1,1675 @@
+diff --git a/products/CMFActivity/Activity/SQLBase.py b/products/CMFActivity/Activity/SQLBase.py
+index 0ff6520..8f9317f 100644
+--- a/products/CMFActivity/Activity/SQLBase.py
++++ b/products/CMFActivity/Activity/SQLBase.py
+@@ -33,8 +33,18 @@
+ MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
+ from Products.CMFActivity.ActiveObject import (
+ INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
+-from Queue import VALIDATION_ERROR_DELAY
++from Products.CMFActivity.ActivityRuntimeEnvironment import (
++ ActivityRuntimeEnvironment, getTransactionalVariable)
++from Queue import VALIDATION_ERROR_DELAY, abortTransactionSynchronously
+
++try:
++ from transaction import get as get_transaction
++except ImportError:
++ pass
++
++# Stop electing more messages for processing if more than this number of
++# objects are impacted by elected messages.
++MAX_GROUPED_OBJECTS = 100
+
+ def sort_message_key(message):
+ # same sort key as in SQL{Dict,Queue}_readMessageList
+@@ -129,6 +139,256 @@ def _log(self, severity, summary):
+ LOG(self.__class__.__name__, severity, summary,
+ error=severity>INFO and sys.exc_info() or None)
+
++ def getReservedMessageList(self, activity_tool, date, processing_node,
++ limit=None, group_method_id=None):
++ """
++ Get and reserve a list of messages.
++ limit
++ Maximum number of messages to fetch.
++ This number is not garanted to be reached, because of:
++ - not enough messages being pending execution
++ - race condition (other nodes reserving the same messages at the same
++ time)
++ This number is guaranted not to be exceeded.
++ If None (or not given) no limit apply.
++ """
++ select = activity_tool.SQLBase_selectReservedMessageList
++ result = not group_method_id and select(table=self.sql_table, count=limit,
++ processing_node=processing_node)
++ if not result:
++ activity_tool.SQLBase_reserveMessageList(table=self.sql_table,
++ count=limit, processing_node=processing_node, to_date=date,
++ group_method_id=group_method_id)
++ result = select(table=self.sql_table,
++ processing_node=processing_node, count=limit)
++ return result
++
++ def makeMessageListAvailable(self, activity_tool, uid_list):
++ """
++ Put messages back in processing_node=0 .
++ """
++ if len(uid_list):
++ activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
++ uid=uid_list)
++
++ def getProcessableMessageList(self, activity_tool, processing_node):
++ """
++ Always true:
++ For each reserved message, delete redundant messages when it gets
++ reserved (definitely lost, but they are expandable since redundant).
++
++ - reserve a message
++ - set reserved message to processing=1 state
++ - if this message has a group_method_id:
++ - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
++ - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
++ - get one message from the reserved bunch (this messages will be
++ "needed")
++ - increase the number of impacted object
++ - set "needed" reserved messages to processing=1 state
++ - unreserve "unneeded" messages
++ - return still-reserved message list and a group_method_id
++
++ If any error happens in above described process, try to unreserve all
++ messages already reserved in that process.
++ If it fails, complain loudly that some messages might still be in an
++ unclean state.
++
++ Returned values:
++ 4-tuple:
++ - list of messages
++ - impacted object count
++ - group_method_id
++ - uid_to_duplicate_uid_list_dict
++ """
++ def getReservedMessageList(limit, group_method_id=None):
++ line_list = self.getReservedMessageList(activity_tool=activity_tool,
++ date=now_date,
++ processing_node=processing_node,
++ limit=limit,
++ group_method_id=group_method_id)
++ if len(line_list):
++ self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
++ return line_list
++ def getDuplicateMessageUidList(line):
++ uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
++ line=line, processing_node=processing_node)
++ if len(uid_list):
++ self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
++ return uid_list
++ BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
++ now_date = self.getNow(activity_tool)
++ message_list = []
++ count = 0
++ group_method_id = None
++ try:
++ result = getReservedMessageList(limit=1)
++ uid_to_duplicate_uid_list_dict = {}
++ if len(result) > 0:
++ line = result[0]
++ uid = line.uid
++ m = self.loadMessage(line.message, uid=uid, line=line)
++ message_list.append(m)
++ group_method_id = line.group_method_id
++ activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
++ uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
++ .extend(getDuplicateMessageUidList(line))
++ if group_method_id not in (None, '', '\0'):
++ # Count the number of objects to prevent too many objects.
++ count += len(m.getObjectList(activity_tool))
++ if count < MAX_GROUPED_OBJECTS:
++ # Retrieve objects which have the same group method.
++ result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT,
++ group_method_id=group_method_id)
++ path_and_method_id_dict = {}
++ unreserve_uid_list = []
++ for line in result:
++ if line.uid == uid:
++ continue
++ # All fetched lines have the same group_method_id and
++ # processing_node.
++ # Their dates are lower-than or equal-to now_date.
++ # We read each line once so lines have distinct uids.
++ # So what remains to be filtered on are path, method_id and
++ # order_validation_text.
++ try:
++ key = line.path, line.method_id, line.order_validation_text
++ except AttributeError:
++ pass # message_queue does not have 'order_validation_text'
++ else:
++ original_uid = path_and_method_id_dict.get(key)
++ if original_uid is not None:
++ uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
++ .append(line.uid)
++ continue
++ path_and_method_id_dict[key] = line.uid
++ uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
++ .extend(getDuplicateMessageUidList(line))
++ if count < MAX_GROUPED_OBJECTS:
++ m = self.loadMessage(line.message, uid=line.uid, line=line)
++ count += len(m.getObjectList(activity_tool))
++ message_list.append(m)
++ else:
++ unreserve_uid_list.append(line.uid)
++ activity_tool.SQLBase_processMessage(table=self.sql_table,
++ uid=[m.uid for m in message_list])
++ # Unreserve extra messages as soon as possible.
++ self.makeMessageListAvailable(activity_tool=activity_tool,
++ uid_list=unreserve_uid_list)
++ return (message_list, count, group_method_id,
++ uid_to_duplicate_uid_list_dict)
++ except:
++ self._log(WARNING, 'Exception while reserving messages.')
++ if len(message_list):
++ to_free_uid_list = [m.uid for m in message_list]
++ try:
++ self.makeMessageListAvailable(activity_tool=activity_tool,
++ uid_list=to_free_uid_list)
++ except:
++ self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
++ else:
++ if len(to_free_uid_list):
++ self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
++ else:
++ self._log(TRACE, '(no message was reserved)')
++ return [], 0, None, {}
++
++ # Queue semantic
++ def dequeueMessage(self, activity_tool, processing_node):
++ def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
++ final_uid_list = []
++ for uid in uid_list:
++ final_uid_list.append(uid)
++ final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
++ self.makeMessageListAvailable(activity_tool=activity_tool,
++ uid_list=final_uid_list)
++ message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
++ self.getProcessableMessageList(activity_tool, processing_node)
++ if message_list:
++ # Remove group_id parameter from group_method_id
++ if group_method_id is not None:
++ group_method_id = group_method_id.split('\0')[0]
++ if group_method_id not in (None, ""):
++ method = activity_tool.invokeGroup
++ args = (group_method_id, message_list, self.__class__.__name__)
++ activity_runtime_environment = ActivityRuntimeEnvironment(None)
++ else:
++ method = activity_tool.invoke
++ message = message_list[0]
++ args = (message, )
++ activity_runtime_environment = ActivityRuntimeEnvironment(message)
++ # Commit right before executing messages.
++ # As MySQL transaction does not start exactly at the same time as ZODB
++ # transactions but a bit later, messages available might be called
++ # on objects which are not available - or available in an old
++ # version - to ZODB connector.
++ # So all connectors must be committed now that we have selected
++ # everything needed from MySQL to get a fresh view of ZODB objects.
++ get_transaction().commit()
++ tv = getTransactionalVariable(None)
++ tv['activity_runtime_environment'] = activity_runtime_environment
++ # Try to invoke
++ try:
++ method(*args)
++ except:
++ self._log(WARNING,
++ 'Exception raised when invoking messages (uid, path, method_id) %r'
++ % [(m.uid, m.object_path, m.method_id) for m in message_list])
++ try:
++ abortTransactionSynchronously()
++ except:
++ # Unfortunately, database adapters may raise an exception against
++ # abort.
++ self._log(PANIC,
++ 'abort failed, thus some objects may be modified accidentally')
++ raise
++ # XXX Is it still useful to free messages now that this node is able
++ # to reselect them ?
++ to_free_uid_list = [x.uid for x in message_list]
++ try:
++ makeMessageListAvailable(to_free_uid_list,
++ uid_to_duplicate_uid_list_dict)
++ except:
++ self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
++ else:
++ self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
++ # Abort if something failed.
++ if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
++ endTransaction = abortTransactionSynchronously
++ else:
++ endTransaction = get_transaction().commit
++ try:
++ endTransaction()
++ except:
++ self._log(WARNING,
++ 'Failed to end transaction for messages (uid, path, method_id) %r'
++ % [(m.uid, m.object_path, m.method_id) for m in message_list])
++ if endTransaction == abortTransactionSynchronously:
++ self._log(PANIC, 'Failed to abort executed messages.'
++ ' Some objects may be modified accidentally.')
++ else:
++ try:
++ abortTransactionSynchronously()
++ except:
++ self._log(PANIC, 'Failed to abort executed messages which also'
++ ' failed to commit. Some objects may be modified accidentally.')
++ raise
++ exc_info = sys.exc_info()
++ for m in message_list:
++ m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
++ try:
++ makeMessageListAvailable([x.uid for x in message_list],
++ uid_to_duplicate_uid_list_dict)
++ except:
++ self._log(ERROR, 'Failed to free remaining messages: %r'
++ % (message_list, ))
++ else:
++ self._log(TRACE, 'Freed messages %r' % (message_list, ))
++ self.finalizeMessageExecution(activity_tool, message_list,
++ uid_to_duplicate_uid_list_dict)
++ get_transaction().commit()
++ return not message_list
++
+ def finalizeMessageExecution(self, activity_tool, message_list,
+ uid_to_duplicate_uid_list_dict=None):
+ """
+diff --git a/products/CMFActivity/Activity/SQLDict.py b/products/CMFActivity/Activity/SQLDict.py
+index 6cf5b7f..43e72a8 100644
+--- a/products/CMFActivity/Activity/SQLDict.py
++++ b/products/CMFActivity/Activity/SQLDict.py
+@@ -27,18 +27,12 @@
+ ##############################################################################
+
+ from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
+-from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
++from Queue import VALID, INVALID_PATH
+ from RAMDict import RAMDict
+-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
+ from Products.CMFActivity.Errors import ActivityFlushError
+-from ZODB.POSException import ConflictError
+ import sys
+-from types import ClassType
+ #from time import time
+ from SQLBase import SQLBase, sort_message_key
+-from Products.CMFActivity.ActivityRuntimeEnvironment import (
+- ActivityRuntimeEnvironment, getTransactionalVariable)
+-from zExceptions import ExceptionFormatter
+
+ try:
+ from transaction import get as get_transaction
+@@ -51,9 +45,6 @@
+ MAX_VALIDATED_LIMIT = 1000
+ # Read up to this number of messages to validate.
+ READ_MESSAGE_LIMIT = 1000
+-# Stop electing more messages for processing if more than this number of
+-# objects are impacted by elected messages.
+-MAX_GROUPED_OBJECTS = 100
+
+ MAX_MESSAGE_LIST_SIZE = 100
+
+@@ -134,33 +125,6 @@ def getRegisteredMessageList(self, activity_buffer, activity_tool):
+ message_list = activity_buffer.getMessageList(self)
+ return [m for m in message_list if m.is_registered]
+
+- def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
+- """
+- Get and reserve a list of messages.
+- limit
+- Maximum number of messages to fetch.
+- This number is not garanted to be reached, because of:
+- - not enough messages being pending execution
+- - race condition (other nodes reserving the same messages at the same
+- time)
+- This number is guaranted not to be exceeded.
+- If None (or not given) no limit apply.
+- """
+- result = not group_method_id and \
+- activity_tool.SQLDict_selectReservedMessageList(
+- processing_node=processing_node, count=limit)
+- if not result:
+- activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
+- result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
+- return result
+-
+- def makeMessageListAvailable(self, activity_tool, uid_list):
+- """
+- Put messages back in processing_node=0 .
+- """
+- if len(uid_list):
+- activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
+-
+ def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+ """
+ Reserve unreserved messages matching given line.
+@@ -191,202 +155,7 @@ def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+ raise
+ return uid_list
+
+- def getProcessableMessageList(self, activity_tool, processing_node):
+- """
+- Always true:
+- For each reserved message, delete redundant messages when it gets
+- reserved (definitely lost, but they are expandable since redundant).
+-
+- - reserve a message
+- - set reserved message to processing=1 state
+- - if this message has a group_method_id:
+- - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+- - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+- - get one message from the reserved bunch (this messages will be
+- "needed")
+- - increase the number of impacted object
+- - set "needed" reserved messages to processing=1 state
+- - unreserve "unneeded" messages
+- - return still-reserved message list and a group_method_id
+-
+- If any error happens in above described process, try to unreserve all
+- messages already reserved in that process.
+- If it fails, complain loudly that some messages might still be in an
+- unclean state.
+-
+- Returned values:
+- 4-tuple:
+- - list of messages
+- - impacted object count
+- - group_method_id
+- - uid_to_duplicate_uid_list_dict
+- """
+- def getReservedMessageList(limit, group_method_id=None):
+- line_list = self.getReservedMessageList(activity_tool=activity_tool,
+- date=now_date,
+- processing_node=processing_node,
+- limit=limit,
+- group_method_id=group_method_id)
+- if len(line_list):
+- LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
+- return line_list
+- def getDuplicateMessageUidList(line):
+- uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
+- line=line, processing_node=processing_node)
+- if len(uid_list):
+- LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
+- return uid_list
+- def makeMessageListAvailable(uid_list):
+- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+- BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
+- now_date = self.getNow(activity_tool)
+- message_list = []
+- count = 0
+- group_method_id = None
+- try:
+- result = getReservedMessageList(limit=1)
+- uid_to_duplicate_uid_list_dict = {}
+- if len(result) > 0:
+- line = result[0]
+- uid = line.uid
+- m = self.loadMessage(line.message, uid=uid, line=line)
+- message_list.append(m)
+- group_method_id = line.group_method_id
+- activity_tool.SQLDict_processMessage(uid=[uid])
+- uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
+- .extend(getDuplicateMessageUidList(line))
+- if group_method_id not in (None, '', '\0'):
+- # Count the number of objects to prevent too many objects.
+- count += len(m.getObjectList(activity_tool))
+- if count < MAX_GROUPED_OBJECTS:
+- # Retrieve objects which have the same group method.
+- result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
+- path_and_method_id_dict = {}
+- unreserve_uid_list = []
+- for line in result:
+- if line.uid == uid:
+- continue
+- # All fetched lines have the same group_method_id and
+- # processing_node.
+- # Their dates are lower-than or equal-to now_date.
+- # We read each line once so lines have distinct uids.
+- # So what remains to be filtered on are path, method_id and
+- # order_validation_text.
+- key = (line.path, line.method_id, line.order_validation_text)
+- original_uid = path_and_method_id_dict.get(key)
+- if original_uid is not None:
+- uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
+- continue
+- path_and_method_id_dict[key] = line.uid
+- uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
+- if count < MAX_GROUPED_OBJECTS:
+- m = self.loadMessage(line.message, uid=line.uid, line=line)
+- count += len(m.getObjectList(activity_tool))
+- message_list.append(m)
+- else:
+- unreserve_uid_list.append(line.uid)
+- activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
+- # Unreserve extra messages as soon as possible.
+- makeMessageListAvailable(unreserve_uid_list)
+- return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
+- except:
+- LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
+- if len(message_list):
+- to_free_uid_list = [m.uid for m in message_list]
+- try:
+- makeMessageListAvailable(to_free_uid_list)
+- except:
+- LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+- else:
+- if len(to_free_uid_list):
+- LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+- else:
+- LOG('SQLDict', TRACE, '(no message was reserved)')
+- return [], 0, None, {}
+-
+- # Queue semantic
+- def dequeueMessage(self, activity_tool, processing_node):
+- def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
+- final_uid_list = []
+- for uid in uid_list:
+- final_uid_list.append(uid)
+- final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
+- message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
+- self.getProcessableMessageList(activity_tool, processing_node)
+- if message_list:
+- # Remove group_id parameter from group_method_id
+- if group_method_id is not None:
+- group_method_id = group_method_id.split('\0')[0]
+- if group_method_id not in (None, ""):
+- method = activity_tool.invokeGroup
+- args = (group_method_id, message_list)
+- activity_runtime_environment = ActivityRuntimeEnvironment(None)
+- else:
+- method = activity_tool.invoke
+- message = message_list[0]
+- args = (message, )
+- activity_runtime_environment = ActivityRuntimeEnvironment(message)
+- # Commit right before executing messages.
+- # As MySQL transaction does not start exactly at the same time as ZODB
+- # transactions but a bit later, messages available might be called
+- # on objects which are not available - or available in an old
+- # version - to ZODB connector.
+- # So all connectors must be committed now that we have selected
+- # everything needed from MySQL to get a fresh view of ZODB objects.
+- get_transaction().commit()
+- tv = getTransactionalVariable(None)
+- tv['activity_runtime_environment'] = activity_runtime_environment
+- # Try to invoke
+- try:
+- method(*args)
+- except:
+- LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
+- try:
+- abortTransactionSynchronously()
+- except:
+- # Unfortunately, database adapters may raise an exception against abort.
+- LOG('SQLDict', PANIC,
+- 'abort failed, thus some objects may be modified accidentally')
+- raise
+- # XXX Is it still useful to free messages now that this node is able
+- # to reselect them ?
+- to_free_uid_list = [x.uid for x in message_list]
+- try:
+- makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
+- except:
+- LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+- else:
+- LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
+- # Abort if something failed.
+- if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
+- endTransaction = abortTransactionSynchronously
+- else:
+- endTransaction = get_transaction().commit
+- try:
+- endTransaction()
+- except:
+- LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
+- if endTransaction == abortTransactionSynchronously:
+- LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
+- else:
+- try:
+- abortTransactionSynchronously()
+- except:
+- LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
+- raise
+- exc_info = sys.exc_info()
+- for m in message_list:
+- m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
+- try:
+- makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
+- except:
+- LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
+- else:
+- LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
+- self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
+- get_transaction().commit()
+- return not message_list
++ dequeueMessage = SQLBase.dequeueMessage
+
+ def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
+ hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
+@@ -533,9 +302,10 @@ def distribute(self, activity_tool, node_count):
+ message_list.sort(key=sort_message_key)
+ deletable_uid_list += [m.uid for m in message_list[1:]]
+ message = message_list[0]
+- distributable_uid_set.add(message.uid)
+ serialization_tag = message.activity_kw.get('serialization_tag')
+- if serialization_tag is not None:
++ if serialization_tag is None:
++ distributable_uid_set.add(message.uid)
++ else:
+ serialization_tag_dict.setdefault(serialization_tag,
+ []).append(message)
+ # Don't let through if there is the same serialization tag in the
+@@ -545,15 +315,15 @@ def distribute(self, activity_tool, node_count):
+ # does not stop validating together. Because those messages should
+ # be processed together at once.
+ for message_list in serialization_tag_dict.itervalues():
+- if len(message_list) == 1:
+- continue
+ # Sort list of messages to validate the message with highest score
+ message_list.sort(key=sort_message_key)
++ distributable_uid_set.add(message_list[0].uid)
+ group_method_id = message_list[0].activity_kw.get('group_method_id')
++ if group_method_id is None:
++ continue
+ for message in message_list[1:]:
+- if group_method_id is None or \
+- group_method_id != message.activity_kw.get('group_method_id'):
+- distributable_uid_set.remove(message.uid)
++ if group_method_id == message.activity_kw.get('group_method_id'):
++ distributable_uid_set.add(message.uid)
+ if deletable_uid_list:
+ activity_tool.SQLBase_delMessage(table=self.sql_table,
+ uid=deletable_uid_list)
+diff --git a/products/CMFActivity/Activity/SQLQueue.py b/products/CMFActivity/Activity/SQLQueue.py
+index 4cd415d..bf2d217 100644
+--- a/products/CMFActivity/Activity/SQLQueue.py
++++ b/products/CMFActivity/Activity/SQLQueue.py
+@@ -28,16 +28,10 @@
+
+ from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
+ from RAMQueue import RAMQueue
+-from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
+-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
++from Queue import VALID, INVALID_PATH
+ from Products.CMFActivity.Errors import ActivityFlushError
+ from ZODB.POSException import ConflictError
+-from types import ClassType
+-import sys
+-from time import time
+ from SQLBase import SQLBase, sort_message_key
+-from Products.CMFActivity.ActivityRuntimeEnvironment import (
+- ActivityRuntimeEnvironment, getTransactionalVariable)
+ from zExceptions import ExceptionFormatter
+
+ try:
+@@ -51,17 +45,6 @@
+ MAX_VALIDATED_LIMIT = 1000
+ # Read this many messages to validate.
+ READ_MESSAGE_LIMIT = 1000
+-# Process this many messages in each dequeueMessage call.
+-# Downside of setting to a "small" value: the cost of reserving a batch of
+-# few messages increases relatively to the cost of executing activities,
+-# making CMFActivity overhead significant.
+-# Downside of setting to a "big" value: if there are many slow activities in
+-# a multi-activity-node environment, multiple slow activities will be reserved
+-# by a single node, making a suboptimal use of the parallelisation offered by
+-# the cluster.
+-# Before increasing this value, consider using SQLDict with group methods
+-# first.
+-MESSAGE_BUNDLE_SIZE = 1
+
+ MAX_MESSAGE_LIST_SIZE = 100
+
+@@ -86,6 +69,8 @@ def prepareQueueMessageList(self, activity_tool, message_list):
+ method_id_list = [m.method_id for m in registered_message_list]
+ priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
+ date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
++ group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
++ for message in registered_message_list]
+ tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
+ serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
+ dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
+@@ -95,6 +80,7 @@ def prepareQueueMessageList(self, activity_tool, message_list):
+ method_id_list=method_id_list,
+ priority_list=priority_list,
+ message_list=dumped_message_list,
++ group_method_id_list = group_method_id_list,
+ date_list=date_list,
+ tag_list=tag_list,
+ processing_node_list=None,
+@@ -113,162 +99,14 @@ def finishDeleteMessage(self, activity_tool_path, m):
+ # Nothing to do in SQLQueue.
+ pass
+
+- def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
++ def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+ """
+- Get and reserve a list of messages.
+- limit
+- Maximum number of messages to fetch.
+- This number is not garanted to be reached, because of:
+- - not enough messages being pending execution
+- - race condition (other nodes reserving the same messages at the same
+- time)
+- This number is guaranted not to be exceeded.
+- If None (or not given) no limit apply.
++ Reserve unreserved messages matching given line.
++ Return their uids.
+ """
+- result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
+- if len(result) == 0:
+- activity_tool.SQLQueue_reserveMessageList(count=limit, processing_node=processing_node, to_date=date)
+- result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
+- return result
+-
+- def makeMessageListAvailable(self, activity_tool, uid_list):
+- """
+- Put messages back in processing_node=0 .
+- """
+- if len(uid_list):
+- activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
+-
+- def getProcessableMessageList(self, activity_tool, processing_node):
+- """
+- Always true:
+- For each reserved message, delete redundant messages when it gets
+- reserved (definitely lost, but they are expandable since redundant).
+-
+- - reserve a message
+- - set reserved message to processing=1 state
+- - if this message has a group_method_id:
+- - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+- - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+- - get one message from the reserved bunch (this messages will be
+- "needed")
+- - increase the number of impacted object
+- - set "needed" reserved messages to processing=1 state
+- - unreserve "unneeded" messages
+- - return still-reserved message list
+-
+- If any error happens in above described process, try to unreserve all
+- messages already reserved in that process.
+- If it fails, complain loudly that some messages might still be in an
+- unclean state.
+-
+- Returned values:
+- list of messages
+- """
+- def getReservedMessageList(limit):
+- line_list = self.getReservedMessageList(activity_tool=activity_tool,
+- date=now_date,
+- processing_node=processing_node,
+- limit=limit)
+- if len(line_list):
+- LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
+- return line_list
+- def makeMessageListAvailable(uid_list):
+- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+- now_date = self.getNow(activity_tool)
+- message_list = []
+- try:
+- result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
+- for line in result:
+- m = self.loadMessage(line.message, uid=line.uid, line=line)
+- message_list.append(m)
+- if len(message_list):
+- activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
+- return message_list
+- except:
+- LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
+- if len(message_list):
+- to_free_uid_list = [m.uid for m in message_list]
+- try:
+- makeMessageListAvailable(to_free_uid_list)
+- except:
+- LOG('SQLQueue', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+- else:
+- if len(to_free_uid_list):
+- LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+- else:
+- LOG('SQLQueue', TRACE, '(no message was reserved)')
+- return []
+-
+- def dequeueMessage(self, activity_tool, processing_node):
+- def makeMessageListAvailable(uid_list):
+- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+- message_list = \
+- self.getProcessableMessageList(activity_tool, processing_node)
+- if message_list:
+- processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
+- processed_count = 0
+- # Commit right before executing messages.
+- # As MySQL transaction does not start exactly at the same time as ZODB
+- # transactions but a bit later, messages available might be called
+- # on objects which are not available - or available in an old
+- # version - to ZODB connector.
+- # So all connectors must be committed now that we have selected
+- # everything needed from MySQL to get a fresh view of ZODB objects.
+- get_transaction().commit()
+- tv = getTransactionalVariable(None)
+- for m in message_list:
+- tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
+- processed_count += 1
+- # Try to invoke
+- try:
+- activity_tool.invoke(m)
+- if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
+- # Commit so that if a message raises it doesn't causes previous
+- # successfull messages to be rolled back. This commit might fail,
+- # so it is protected the same way as activity execution by the
+- # same "try" block.
+- get_transaction().commit()
+- else:
+- # This message failed, revert.
+- abortTransactionSynchronously()
+- except:
+- value = m.uid, m.object_path, m.method_id
+- LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
+- try:
+- abortTransactionSynchronously()
+- except:
+- # Unfortunately, database adapters may raise an exception against abort.
+- LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
+- raise
+- # We must make sure that the message is not set as executed.
+- # It is possible that the message is executed but the commit
+- # of the transaction fails
+- m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
+- # XXX Is it still useful to free message now that this node is able
+- # to reselect it ?
+- try:
+- makeMessageListAvailable([m.uid])
+- except:
+- LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
+- else:
+- LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
+- if time() > processing_stop_time:
+- LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
+- break
+- # Release all unprocessed messages
+- to_free_uid_list = [m.uid for m in message_list[processed_count:]]
+- if to_free_uid_list:
+- try:
+- makeMessageListAvailable(to_free_uid_list)
+- except:
+- LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+- else:
+- LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+- self.finalizeMessageExecution(activity_tool,
+- message_list[:processed_count])
+- get_transaction().commit()
+- return not message_list
++ return ()
+
++ dequeueMessage = SQLBase.dequeueMessage
+
+ def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
+ hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
+@@ -419,6 +257,12 @@ def distribute(self, activity_tool, node_count):
+ # Sort list of messages to validate the message with highest score
+ message_list.sort(key=sort_message_key)
+ distributable_uid_set.add(message_list[0].uid)
++ group_method_id = message_list[0].activity_kw.get('group_method_id')
++ if group_method_id is None:
++ continue
++ for message in message_list[1:]:
++ if group_method_id == message.activity_kw.get('group_method_id'):
++ distributable_uid_set.add(message.uid)
+ distributable_count = len(distributable_uid_set)
+ if distributable_count:
+ activity_tool.SQLBase_assignMessage(table=self.sql_table,
+diff --git a/products/CMFActivity/ActivityTool.py b/products/CMFActivity/ActivityTool.py
+index 1fe991e..adab3fb 100644
+--- a/products/CMFActivity/ActivityTool.py
++++ b/products/CMFActivity/ActivityTool.py
+@@ -1172,16 +1172,18 @@ def invoke(self, message):
+ for held in my_self.REQUEST._held:
+ self.REQUEST._hold(held)
+
+- def invokeGroup(self, method_id, message_list):
++ def invokeGroup(self, method_id, message_list, activity):
+ if self.activity_tracking:
+- activity_tracking_logger.info('invoking group messages: method_id=%s, paths=%s' % (method_id, ['/'.join(m.object_path) for m in message_list]))
++ activity_tracking_logger.info(
++ 'invoking group messages: method_id=%s, paths=%s'
++ % (method_id, ['/'.join(m.object_path) for m in message_list]))
+ # Invoke a group method.
+- object_list = []
+ expanded_object_list = []
+ new_message_list = []
+- path_dict = {}
+- # Filter the list of messages. If an object is not available, mark its message as non-executable.
+- # In addition, expand an object if necessary, and make sure that no duplication happens.
++ path_set = set()
++ # Filter the list of messages. If an object is not available, mark its
++ # message as non-executable. In addition, expand an object if necessary,
++ # and make sure that no duplication happens.
+ for m in message_list:
+ # alternate method is used to segregate objects which cannot be grouped.
+ alternate_method_id = m.activity_kw.get('alternate_method_id')
+@@ -1195,42 +1197,29 @@ def invokeGroup(self, method_id, message_list):
+ m.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=self)
+ continue
+ try:
+- i = len(new_message_list) # This is an index of this message in new_message_list.
+- if m.hasExpandMethod():
+- for subobj in m.getObjectList(self):
++ if activity == 'SQLQueue':
++ expanded_object_list.append((obj, m.args, m.kw))
++ else:
++ if m.hasExpandMethod():
++ subobject_list = m.getObjectList(self)
++ else:
++ subobject_list = obj,
++ for subobj in subobject_list:
+ path = subobj.getPath()
+- if path not in path_dict:
+- path_dict[path] = i
++ if path not in path_set:
++ path_set.add(path)
+ if alternate_method_id is not None \
+ and hasattr(aq_base(subobj), alternate_method_id):
+- # if this object is alternated, generate a new single active object.
++ # if this object is alternated,
++ # generate a new single active object
+ activity_kw = m.activity_kw.copy()
+- if 'group_method_id' in activity_kw:
+- del activity_kw['group_method_id']
+- if 'group_id' in activity_kw:
+- del activity_kw['group_id']
+- active_obj = subobj.activate(**activity_kw)
++ activity_kw.pop('group_method_id', None)
++ activity_kw.pop('group_id', None)
++ active_obj = subobj.activate(activity=activity, **activity_kw)
+ getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
+ else:
+- expanded_object_list.append(subobj)
+- else:
+- path = obj.getPath()
+- if path not in path_dict:
+- path_dict[path] = i
+- if alternate_method_id is not None \
+- and hasattr(aq_base(obj), alternate_method_id):
+- # if this object is alternated, generate a new single active object.
+- activity_kw = m.activity_kw.copy()
+- if 'group_method_id' in activity_kw:
+- del activity_kw['group_method_id']
+- if 'group_id' in activity_kw:
+- del activity_kw['group_id']
+- active_obj = obj.activate(**activity_kw)
+- getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
+- else:
+- expanded_object_list.append(obj)
+- object_list.append(obj)
+- new_message_list.append(m)
++ expanded_object_list.append((subobj, m.args, m.kw))
++ new_message_list.append((m, obj))
+ except:
+ m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+
+@@ -1238,39 +1227,34 @@ def invokeGroup(self, method_id, message_list):
+ if len(expanded_object_list) > 0:
+ method = self.unrestrictedTraverse(method_id)
+ # FIXME: how to apply security here?
+- # NOTE: expanded_object_list must be set to failed objects by the callee.
+- # If it fully succeeds, expanded_object_list must be empty when returning.
+- result = method(expanded_object_list, **m.kw)
++ # NOTE: expanded_object_list must be set to failed objects by the
++ # callee. If it fully succeeds, expanded_object_list must be
++ # empty when returning.
++ result = method(expanded_object_list)
+ else:
+ result = None
+ except:
+ # In this case, the group method completely failed.
+ exc_info = sys.exc_info()
+- for m in new_message_list:
+- m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
++ for m, obj in new_message_list:
++ m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
+ LOG('WARNING ActivityTool', 0,
+ 'Could not call method %s on objects %s' %
+- (method_id, expanded_object_list), error=exc_info)
++ (method_id, [x[0] for x in expanded_object_list]), error=exc_info)
+ error_log = getattr(self, 'error_log', None)
+ if error_log is not None:
+ error_log.raising(exc_info)
+ else:
+- # Obtain all indices of failed messages. Note that this can be a partial failure.
+- failed_message_dict = {}
+- for obj in expanded_object_list:
+- path = obj.getPath()
+- i = path_dict[path]
+- failed_message_dict[i] = None
+-
++ # Obtain all indices of failed messages.
++ # Note that this can be a partial failure.
++ failed_message_set = set(id(x[2]) for x in expanded_object_list)
+ # Only for succeeded messages, an activity process is invoked (if any).
+- for i in xrange(len(object_list)):
+- object = object_list[i]
+- m = new_message_list[i]
+- if i in failed_message_dict:
++ for m, obj in new_message_list:
++ if id(m.kw) in failed_message_set:
+ m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+ else:
+ try:
+- m.activateResult(self, result, object)
++ m.activateResult(self, result, obj)
+ except:
+ m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+ else:
+diff --git a/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql b/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
+new file mode 100644
+index 0000000..2ace125
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
+@@ -0,0 +1,20 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:0
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++uid</params>
++UPDATE
++ <dtml-var table>
++SET
++ processing_node=0,
++ processing=0
++WHERE
++ <dtml-sqltest uid type="int" multiple>
++<dtml-var sql_delimiter>
++COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql b/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
+new file mode 100644
+index 0000000..5aaa97e
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
+@@ -0,0 +1,20 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:1
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++uid</params>
++UPDATE
++ <dtml-var table>
++SET
++ processing_date = UTC_TIMESTAMP(),
++ processing = 1
++WHERE
++ <dtml-sqltest uid type="int" multiple>
++<dtml-var sql_delimiter>
++COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql b/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
+new file mode 100644
+index 0000000..246921d
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
+@@ -0,0 +1,39 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:0
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++processing_node
++to_date
++count
++group_method_id
++</params>
++UPDATE
++ <dtml-var table>
++SET
++ processing_node=<dtml-sqlvar processing_node type="int">
++WHERE
++ processing_node=0
++ AND date <= <dtml-sqlvar to_date type="datetime">
++ <dtml-if expr="group_method_id is not None">
++ AND group_method_id = <dtml-sqlvar group_method_id type="string">
++ </dtml-if>
++ORDER BY
++<dtml-comment>
++ Explanation of the order by:
++ - priority must be respected (it is a feature)
++ - when multiple nodes simultaneously try to fetch activities, they should not
++ be given the same set of lines as it would cause all minus one to wait for
++ a write lock (and be ultimately aborted), effectively serializing their
++ action (so breaking paralellism).
++ So we must force MySQL to update lines in a random order.
++</dtml-comment>
++ priority, RAND()
++LIMIT <dtml-sqlvar count type="int">
++<dtml-var sql_delimiter>
++COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql b/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
+new file mode 100644
+index 0000000..5292b03
+--- /dev/null
++++ b/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
+@@ -0,0 +1,21 @@
++<dtml-comment>
++title:
++connection_id:cmf_activity_sql_connection
++max_rows:0
++max_cache:0
++cache_time:0
++class_name:
++class_file:
++</dtml-comment>
++<params>table
++processing_node
++count</params>
++SELECT
++ *
++FROM
++ <dtml-var table>
++WHERE
++ processing_node = <dtml-sqlvar processing_node type="int">
++<dtml-if expr="count is not None">
++ LIMIT <dtml-sqlvar count type="int">
++</dtml-if>
+diff --git a/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql b/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
+deleted file mode 100644
+index 663cbf8..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
++++ /dev/null
+@@ -1,21 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>uid_list</params>
+-UPDATE
+- message
+-SET
+- processing_node=0,
+- processing=0
+-WHERE
+- uid IN (
+- <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
+- )
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql b/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
+deleted file mode 100644
+index 87b76fc..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
++++ /dev/null
+@@ -1,20 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:1
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>uid</params>
+-UPDATE message
+-SET
+- processing_date = UTC_TIMESTAMP(),
+- processing = 1
+-WHERE
+- uid IN (
+-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
+- )
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql b/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
+deleted file mode 100644
+index 1525791..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
++++ /dev/null
+@@ -1,36 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-to_date
+-count
+-group_method_id
+-</params>
+-UPDATE
+- message
+-SET
+- processing_node=<dtml-sqlvar processing_node type="int">
+-WHERE
+- processing_node=0
+- AND date <= <dtml-sqlvar to_date type="datetime">
+- <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
+-ORDER BY
+-<dtml-comment>
+- Explanation of the order by:
+- - priority must be respected (it is a feature)
+- - when multiple nodes simultaneously try to fetch activities, they should not
+- be given the same set of lines as it would cause all minus one to wait for
+- a write lock (and be ultimately aborted), effectively serializing their
+- action (so breaking paralellism).
+- So we must force MySQL to update lines in a random order.
+-</dtml-comment>
+- priority, RAND()
+-LIMIT <dtml-sqlvar count type="int">
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql b/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
+deleted file mode 100644
+index 92cb20e..0000000
+--- a/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
++++ /dev/null
+@@ -1,20 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-count</params>
+-SELECT
+- *
+-FROM
+- message
+-WHERE
+- processing_node = <dtml-sqlvar processing_node type="int">
+-<dtml-if expr="count is not None">
+- LIMIT <dtml-sqlvar count type="int">
+-</dtml-if>
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql b/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
+index 5c7eda2..763d240 100644
+--- a/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
++++ b/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
+@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
+ `processing` TINYINT NOT NULL DEFAULT 0,
+ `processing_date` DATETIME,
+ `priority` TINYINT NOT NULL DEFAULT 0,
++ `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
+ `tag` VARCHAR(255) NOT NULL,
+ `serialization_tag` VARCHAR(255) NOT NULL,
+ `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql b/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
+deleted file mode 100644
+index c83692a..0000000
+--- a/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
++++ /dev/null
+@@ -1,21 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>uid_list</params>
+-UPDATE
+- message_queue
+-SET
+- processing_node=0,
+- processing=0
+-WHERE
+- uid IN (
+- <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
+- )
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql b/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
+deleted file mode 100644
+index 9d667c4..0000000
+--- a/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
++++ /dev/null
+@@ -1,34 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-to_date
+-count
+-</params>
+-UPDATE
+- message_queue
+-SET
+- processing_node=<dtml-sqlvar processing_node type="int">
+-WHERE
+- processing_node=0
+- AND date <= <dtml-sqlvar to_date type="datetime">
+-ORDER BY
+-<dtml-comment>
+- Explanation of the order by:
+- - priority must be respected (it is a feature)
+- - when multiple nodes simultaneously try to fetch activities, they should not
+- be given the same set of lines as it would cause all minus one to wait for
+- a write lock (and be ultimately aborted), effectively serializing their
+- action (so breaking paralellism).
+- So we must force MySQL to update lines in a random order.
+-</dtml-comment>
+- priority, RAND()
+-LIMIT <dtml-sqlvar count type="int">
+-<dtml-var sql_delimiter>
+-COMMIT
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql b/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
+deleted file mode 100644
+index dcddfb5..0000000
+--- a/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
++++ /dev/null
+@@ -1,20 +0,0 @@
+-<dtml-comment>
+-title:
+-connection_id:cmf_activity_sql_connection
+-max_rows:0
+-max_cache:0
+-cache_time:0
+-class_name:
+-class_file:
+-</dtml-comment>
+-<params>processing_node
+-count</params>
+-SELECT
+- *
+-FROM
+- message_queue
+-WHERE
+- processing_node = <dtml-sqlvar processing_node type="int">
+-<dtml-if expr="count is not None">
+- LIMIT <dtml-sqlvar count type="int">
+-</dtml-if>
+diff --git a/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql b/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
+index 17ca67a..8c22150 100644
+--- a/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
++++ b/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
+@@ -15,11 +15,12 @@ message_list
+ priority_list
+ processing_node_list
+ date_list
++group_method_id_list
+ tag_list
+ serialization_tag_list
+ </params>
+ INSERT INTO message_queue
+-(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, tag, serialization_tag, message)
++(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
+ VALUES
+ <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
+ <dtml-if sequence-start><dtml-else>,</dtml-if>
+@@ -32,6 +33,7 @@ VALUES
+ <dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
+ 0,
+ <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
++ <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
+ <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
+ <dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
+ <dtml-sqlvar expr="message_list[loop_item]" type="string">
+diff --git a/products/CMFActivity/tests/testCMFActivity.py b/products/CMFActivity/tests/testCMFActivity.py
+index ccb5aec..34f2857 100644
+--- a/products/CMFActivity/tests/testCMFActivity.py
++++ b/products/CMFActivity/tests/testCMFActivity.py
+@@ -433,7 +433,7 @@ def TryActiveProcess(self, activity):
+ def TryActiveProcessInsideActivity(self, activity):
+ """
+ Try two levels with active_process, we create one first
+- activity with an acitive process, then this new activity
++ activity with an active process, then this new activity
+ uses another active process
+ """
+ portal = self.getPortal()
+@@ -1883,26 +1883,26 @@ def test_79_AbortTransactionSynchronously(self, quiet=0, run=run_all_test):
+ getattr(organisation, 'uid')
+
+
+- def test_80_CallWithGroupIdParamater(self, quiet=0, run=run_all_test):
+- """
+- Test that group_id parameter is used to separate execution of the same method
+- """
++ def callWithGroupIdParamater(self, activity, quiet, run):
+ if not run: return
+ if not quiet:
+- message = '\nTest Activity with group_id parameter'
++ message = '\nTest Activity with group_id parameter (%s)' % activity
+ ZopeTestCase._print(message)
+ LOG('Testing... ',0,message)
+
+ portal = self.getPortal()
+ organisation = portal.organisation._getOb(self.company_id)
+ # Defined a group method
+- def setFoobar(self, object_list, number=1):
+- for obj in object_list:
++ foobar_list = []
++ def setFoobar(self, object_list):
++ foobar_list.append(len(object_list))
++ for obj, args, kw in object_list:
++ number = kw.get('number', 1)
+ if getattr(obj,'foobar', None) is not None:
+ obj.foobar = obj.foobar + number
+ else:
+ obj.foobar = number
+- object_list[:] = []
++ del object_list[:]
+ from Products.ERP5Type.Document.Folder import Folder
+ Folder.setFoobar = setFoobar
+
+@@ -1915,46 +1915,65 @@ def getFoobar(self):
+
+ # Test group_method_id is working without group_id
+ for x in xrange(5):
+- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar").reindexObject(number=1)
++ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
+ get_transaction().commit()
+
+ message_list = portal.portal_activities.getMessageList()
+ self.assertEquals(len(message_list),5)
+ portal.portal_activities.distribute()
+ portal.portal_activities.tic()
+- self.assertEquals(1, organisation.getFoobar())
++ expected = dict(SQLDict=1, SQLQueue=5)[activity]
++ self.assertEquals(expected, organisation.getFoobar())
+
+
+ # Test group_method_id is working with one group_id defined
+ for x in xrange(5):
+- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
++ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ get_transaction().commit()
+
+ message_list = portal.portal_activities.getMessageList()
+ self.assertEquals(len(message_list),5)
+ portal.portal_activities.distribute()
+ portal.portal_activities.tic()
+- self.assertEquals(2, organisation.getFoobar())
++ self.assertEquals(expected * 2, organisation.getFoobar())
++
++ self.assertEquals([expected, expected], foobar_list)
++ del foobar_list[:]
+
+ # Test group_method_id is working with many group_id defined
+ for x in xrange(5):
+- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
++ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ get_transaction().commit()
+- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
++ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
+ get_transaction().commit()
+- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
++ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ get_transaction().commit()
+- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
++ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
+ get_transaction().commit()
+
+ message_list = portal.portal_activities.getMessageList()
+ self.assertEquals(len(message_list),20)
+ portal.portal_activities.distribute()
+ portal.portal_activities.tic()
+- self.assertEquals(11, organisation.getFoobar())
++ self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
++ organisation.getFoobar())
++ self.assertEquals(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity],
++ sorted(foobar_list))
+ message_list = portal.portal_activities.getMessageList()
+ self.assertEquals(len(message_list), 0)
+-
++
++ def test_80a_CallWithGroupIdParamaterSQLDict(self, quiet=0, run=run_all_test):
++ """
++ Test that group_id parameter is used to separate execution of the same method
++ """
++ self.callWithGroupIdParamater('SQLDict', quiet=quiet, run=run)
++
++ def test_80b_CallWithGroupIdParamaterSQLQueue(self, quiet=0,
++ run=run_all_test):
++ """
++ Test that group_id parameter is used to separate execution of the same method
++ """
++ self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
+
+ def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
+ """
+@@ -2020,7 +2039,7 @@ def test_83_ActivityModificationsViaCMFActivityConnectionRolledBackOnErrorSQLDic
+ get_transaction().commit()
+ self.tic()
+ activity_tool = self.getActivityTool()
+- def modifySQLAndFail(self, object_list, **kw):
++ def modifySQLAndFail(self, object_list):
+ # Only create the dummy activity if none is present: we would just
+ # generate missleading errors (duplicate uid).
+ if activity_tool.countMessage(method_id='dummy_activity') == 0:
+@@ -2448,7 +2467,7 @@ def test_94_ActivityToolCommitFailureDoesNotCommitCMFActivitySQLConnectionSQLDic
+ get_transaction().commit()
+ self.tic()
+ activity_tool = self.getActivityTool()
+- def modifySQL(self, object_list, *arg, **kw):
++ def modifySQL(self, object_list):
+ # Only create the dummy activity if none is present: we would just
+ # generate missleading errors (duplicate uid).
+ if activity_tool.countMessage(method_id='dummy_activity') == 0:
+@@ -3158,7 +3177,7 @@ def checkMessage(message, exception_type):
+ self.assertEqual(len(message_list), 1)
+ message = message_list[0]
+ portal.organisation_module._delOb(organisation.id)
+- activity_tool.invokeGroup('getTitle', [message])
++ activity_tool.invokeGroup('getTitle', [message], 'SQLDict')
+ checkMessage(message, KeyError)
+ activity_tool.manageCancel(message.object_path, message.method_id)
+ # 2: activity method does not exist when activity is executed
+@@ -3167,7 +3186,8 @@ def checkMessage(message, exception_type):
+ message_list = activity_tool.getMessageList()
+ self.assertEqual(len(message_list), 1)
+ message = message_list[0]
+- activity_tool.invokeGroup('this_method_does_not_exist', [message])
++ activity_tool.invokeGroup('this_method_does_not_exist',
++ [message], 'SQLDict')
+ checkMessage(message, KeyError)
+ activity_tool.manageCancel(message.object_path, message.method_id)
+
+@@ -3749,7 +3769,6 @@ def test_122_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLDict
+ LOG('Testing... ',0,message)
+ self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict')
+
+- @expectedFailure
+ def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test):
+ if not run: return
+ if not quiet:
+@@ -3790,7 +3809,49 @@ def doSomething(self):
+ activity_tool.manageClearActivities(keep=0)
+ finally:
+ SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
+-
++
++ def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self):
++ activity_tool = self.portal.portal_activities
++ obj1 = activity_tool.newActiveProcess()
++ obj2 = activity_tool.newActiveProcess()
++ get_transaction().commit()
++ self.tic()
++ group_method_call_list = []
++ def doSomething(self, message_list):
++ group_method_call_list.append(sorted((obj.getPath(), args, kw)
++ for obj, args, kw in message_list))
++ del message_list[:]
++ activity_tool.__class__.doSomething = doSomething
++ try:
++ for activity in 'SQLDict', 'SQLQueue':
++ activity_kw = dict(activity=activity, serialization_tag=self.id(),
++ group_method_id='portal_activities/doSomething')
++ obj1.activate(**activity_kw).dummy(1, x=None)
++ obj2.activate(**activity_kw).dummy(2, y=None)
++ get_transaction().commit()
++ activity_tool.distribute()
++ activity_tool.tic()
++ self.assertEqual(group_method_call_list.pop(),
++ sorted([(obj1.getPath(), (1,), dict(x=None)),
++ (obj2.getPath(), (2,), dict(y=None))]))
++ self.assertFalse(group_method_call_list)
++ self.assertFalse(activity_tool.getMessageList())
++ obj1.activate(priority=2, **activity_kw).dummy1(1, x=None)
++ obj1.activate(priority=1, **activity_kw).dummy2(2, y=None)
++ message1 = obj1.getPath(), (1,), dict(x=None)
++ message2 = obj1.getPath(), (2,), dict(y=None)
++ get_transaction().commit()
++ activity_tool.distribute()
++ self.assertEqual(len(activity_tool.getMessageList()), 2)
++ activity_tool.tic()
++ self.assertEqual(group_method_call_list.pop(),
++ dict(SQLDict=[message2],
++ SQLQueue=[message1, message2])[activity])
++ self.assertFalse(group_method_call_list)
++ self.assertFalse(activity_tool.getMessageList())
++ finally:
++ del activity_tool.__class__.doSomething
++
+ def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestCMFActivity))
+diff --git a/products/ERP5Catalog/CatalogTool.py b/products/ERP5Catalog/CatalogTool.py
+index a726027..ce01545 100644
+--- a/products/ERP5Catalog/CatalogTool.py
++++ b/products/ERP5Catalog/CatalogTool.py
+@@ -759,6 +759,23 @@ def reindexObject(self, object, idxs=None, sql_catalog_id=None,**kw):
+ self.catalog_object(object, url, idxs=idxs, sql_catalog_id=sql_catalog_id,**kw)
+
+
++ def catalogObjectList(self, object_list, *args, **kw):
++ """Catalog a list of objects"""
++ if type(object_list[0]) is tuple:
++ tmp_object_list = [x[0] for x in object_list]
++ ZCatalog.catalogObjectList(self, tmp_object_list, **x[2])
++ # keep failed objects in 'object_list'
++ object_list[:] = [x for x in object_list if x[0] in tmp_object_list]
++ else:
++ ZCatalog.catalogObjectList(self, object_list, *args, **kw)
++
++ security.declarePrivate('uncatalogObjectList')
++ def uncatalogObjectList(self, message_list):
++ """Uncatalog a list of objects"""
++ for obj, args, kw in message_list:
++ self.unindexObject(*args, **kw)
++ del message_list[:]
++
+ security.declarePrivate('unindexObject')
+ def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None):
+ """
+diff --git a/products/ERP5Type/CopySupport.py b/products/ERP5Type/CopySupport.py
+index f3404f6..b8f6058 100644
+--- a/products/ERP5Type/CopySupport.py
++++ b/products/ERP5Type/CopySupport.py
+@@ -375,14 +375,12 @@ def unindexObject(self, path=None):
+ catalog.beforeUnindexObject(None,path=path,uid=uid)
+ # Then start activity in order to remove lines in catalog,
+ # sql wich generate locks
+- if path is None:
+- path = self.getPath()
+ # - serialization_tag is used in order to prevent unindexation to
+ # happen before/in parallel with reindexations of the same object.
+ catalog.activate(activity='SQLQueue',
+ tag='%s' % uid,
+- serialization_tag=self.getRootDocumentPath()).unindexObject(None,
+- path=path,uid=uid)
++ group_method_id='portal_catalog/uncatalogObjectList',
++ serialization_tag=self.getRootDocumentPath()).unindexObject(uid=uid)
+
+ security.declareProtected(Permissions.ModifyPortalContent, 'moveObject')
+ def moveObject(self, idxs=None):
+diff --git a/products/ERP5Type/tests/testCopySupport.py b/products/ERP5Type/tests/testCopySupport.py
+index 3d1ba1d..3b5e57b 100644
+--- a/products/ERP5Type/tests/testCopySupport.py
++++ b/products/ERP5Type/tests/testCopySupport.py
+@@ -31,6 +31,7 @@
+
+ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
+ #from AccessControl.SecurityManagement import newSecurityManager
++from Products.CMFActivity.ActivityTool import ActivityTool
+ from Products.CMFActivity.Errors import ActivityPendingError
+
+ class TestCopySupport(ERP5TypeTestCase):
+@@ -100,13 +101,49 @@ def test_02_unindexObjectDependency(self):
+ # Currently, the test passes only because ActivityTool.distribute always
+ # iterates on queues in the same order: SQLQueue before SQLDict.
+ # If Python returned dictionary values in a different order,
+- # reindex activities fail with the following error:
++ # reindex activities would fail with the following error:
+ # uid of <Products.ERP5Catalog.CatalogTool.IndexableObjectWrapper for
+ # /erp5/person_module/1/old_address> is 599L and is already assigned
+ # to deleted in catalog !!! This can be fatal.
+ # This test would also fail if SQLDict was used for 'unindexObject'.
+ self.tic()
+
++ def test_03_unindexObjectGrouping(self):
++ person = self.portal.person_module.newContent(portal_type='Person',
++ address_city='Lille',
++ email_text='foo at bar.com')
++ transaction.commit()
++ self.tic()
++ search_catalog = self.portal.portal_catalog.unrestrictedSearchResults
++ uid_list = [person.getUid(),
++ person.default_address.getUid(),
++ person.default_email.getUid()]
++ uid_list.sort()
++ self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
++ self.portal.person_module._delObject(person.getId())
++ del person
++ transaction.commit()
++ self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
++ activity_tool = self.portal.portal_activities
++ self.assertEqual(len(activity_tool.getMessageList()), len(uid_list))
++
++ ActivityTool_invokeGroup = ActivityTool.invokeGroup
++ invokeGroup_list = []
++ def invokeGroup(self, method_id, message_list, activity):
++ invokeGroup_list.extend((method_id,
++ sorted(m.kw.get('uid') for m in message_list),
++ activity))
++ return ActivityTool_invokeGroup(self, method_id, message_list, activity)
++ try:
++ ActivityTool.invokeGroup = invokeGroup
++ self.tic()
++ finally:
++ ActivityTool.invokeGroup = ActivityTool_invokeGroup
++ self.assertEqual(invokeGroup_list,
++ ['portal_catalog/uncatalogObjectList', uid_list, 'SQLQueue'])
++ self.assertEqual(len(search_catalog(uid=uid_list)), 0)
++
++
+ def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestCopySupport))
More information about the Erp5-report
mailing list